Mercurial > gemma
changeset 204:3d0988d9f867
De-virtualize the connection pool implementation.
The persistent connection pool implementation is
a superset of the in-memory implementation. The only
diffence in fact is that the in-memory variant does not
store anything to file. So the persistent implementation
is refactored in a way that it checks before each
persistence operation if storage is given or not.
author | Sascha L. Teichmann <teichmann@intevation.de> |
---|---|
date | Sun, 22 Jul 2018 10:23:03 +0200 |
parents | 6a802aed7f99 |
children | 2a152816fc38 |
files | auth/connection.go auth/inmemory.go auth/persistent.go |
diffstat | 3 files changed, 64 insertions(+), 260 deletions(-) [+] |
line wrap: on
line diff
--- a/auth/connection.go Sun Jul 22 09:42:47 2018 +0200 +++ b/auth/connection.go Sun Jul 22 10:23:03 2018 +0200 @@ -14,24 +14,12 @@ var ErrNoSuchToken = errors.New("No such token") -type ConnectionPool interface { - Delete(token string) bool - Add(token string, session *Session) *Connection - Renew(token string) (string, error) - Do(token string, fn func(*sql.DB) error) error - Session(token string) *Session - Shutdown() error -} - -var ConnPool = func() ConnectionPool { - if config.Config.SessionStore != "" { - cp, err := NewPersistentConnectionPool(config.Config.SessionStore) - if err != nil { - log.Panicf("Error with session store: %v\n", err) - } - return cp +var ConnPool = func() *ConnectionPool { + cp, err := NewConnectionPool(config.Config.SessionStore) + if err != nil { + log.Panicf("Error with session store: %v\n", err) } - return NewInMemoryConnectionPool() + return cp }() const (
--- a/auth/inmemory.go Sun Jul 22 09:42:47 2018 +0200 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,206 +0,0 @@ -package auth - -import ( - "database/sql" - "log" - "time" -) - -type InMemoryConnectionPool struct { - conns map[string]*Connection - cmds chan func(*InMemoryConnectionPool) -} - -func NewInMemoryConnectionPool() *InMemoryConnectionPool { - cp := &InMemoryConnectionPool{ - conns: make(map[string]*Connection), - cmds: make(chan func(*InMemoryConnectionPool)), - } - go cp.run() - return cp -} - -func (cp *InMemoryConnectionPool) run() { - for { - select { - case cmd := <-cp.cmds: - cmd(cp) - case <-time.After(time.Minute): - cp.cleanDB() - case <-time.After(time.Minute * 5): - cp.cleanToken() - } - } -} - -func (cp *InMemoryConnectionPool) cleanDB() { - valid := time.Now().Add(-maxDBIdle) - for _, con := range cp.conns { - if con.refCount <= 0 && con.last().Before(valid) { - con.close() - } - } -} - -func (cp *InMemoryConnectionPool) cleanToken() { - now := time.Now() - for token, con := range cp.conns { - expires := time.Unix(con.session.ExpiresAt, 0) - if expires.Before(now) { - // TODO: Be more graceful here? - con.close() - delete(cp.conns, token) - } - } -} - -func (cp *InMemoryConnectionPool) Delete(token string) bool { - res := make(chan bool) - cp.cmds <- func(cp *InMemoryConnectionPool) { - conn, found := cp.conns[token] - if !found { - res <- false - return - } - conn.close() - delete(cp.conns, token) - res <- true - } - return <-res -} - -func (cp *InMemoryConnectionPool) Add(token string, session *Session) *Connection { - res := make(chan *Connection) - - cp.cmds <- func(cp *InMemoryConnectionPool) { - con := cp.conns[token] - if con == nil { - con = &Connection{} - cp.conns[token] = con - } - con.set(session) - res <- con - } - - con := <-res - return con -} - -func (cp *InMemoryConnectionPool) Renew(token string) (string, error) { - - type result struct { - newToken string - err error - } - - resCh := make(chan result) - - cp.cmds <- func(cp *InMemoryConnectionPool) { - con := cp.conns[token] - if con == nil { - resCh <- result{err: ErrNoSuchToken} - } else { - delete(cp.conns, token) - newToken := GenerateSessionKey() - // TODO: Ensure that this is not racy! - con.session.ExpiresAt = time.Now().Add(maxTokenValid).Unix() - cp.conns[newToken] = con - resCh <- result{newToken: newToken} - } - } - - r := <-resCh - return r.newToken, r.err -} - -func (cp *InMemoryConnectionPool) trim(conn *Connection) { - - conn.refCount-- - - for { - least := time.Now() - var count int - var oldest *Connection - - for _, con := range cp.conns { - if con.db != nil && con.refCount <= 0 { - if last := con.last(); last.Before(least) { - least = last - oldest = con - } - count++ - } - } - if count <= maxOpen { - break - } - oldest.close() - } -} - -func (cp *InMemoryConnectionPool) Do(token string, fn func(*sql.DB) error) error { - - type result struct { - con *Connection - err error - } - - res := make(chan result) - - cp.cmds <- func(cp *InMemoryConnectionPool) { - con := cp.conns[token] - if con == nil { - res <- result{err: ErrNoSuchToken} - return - } - con.touch() - if con.db != nil { - con.refCount++ - res <- result{con: con} - return - } - - session := con.session - db, err := opendb(session.User, session.Password) - if err != nil { - res <- result{err: err} - return - } - con.db = db - con.refCount++ - res <- result{con: con} - } - - r := <-res - - if r.err != nil { - return r.err - } - - defer func() { - cp.cmds <- func(cp *InMemoryConnectionPool) { - cp.trim(r.con) - } - }() - - return fn(r.con.db) -} - -func (cp *InMemoryConnectionPool) Session(token string) *Session { - res := make(chan *Session) - cp.cmds <- func(cp *InMemoryConnectionPool) { - con := cp.conns[token] - if con == nil { - res <- nil - } else { - con.touch() - res <- con.session - } - } - return <-res -} - -func (cp *InMemoryConnectionPool) Shutdown() error { - log.Println("info: shutdown in-memory connection pool.") - return nil -}
--- a/auth/persistent.go Sun Jul 22 09:42:47 2018 +0200 +++ b/auth/persistent.go Sun Jul 22 10:23:03 2018 +0200 @@ -9,19 +9,37 @@ bolt "github.com/coreos/bbolt" ) -type PersistentConnectionPool struct { - db *bolt.DB - conns map[string]*Connection - cmds chan func(*PersistentConnectionPool) +type ConnectionPool struct { + storage *bolt.DB + conns map[string]*Connection + cmds chan func(*ConnectionPool) } var sessionsBucket = []byte("sessions") -func NewPersistentConnectionPool(filename string) (*PersistentConnectionPool, error) { +func NewConnectionPool(filename string) (*ConnectionPool, error) { + + pcp := &ConnectionPool{ + cmds: make(chan func(*ConnectionPool)), + } + if err := pcp.openStorage(filename); err != nil { + return nil, err + } + go pcp.run() + return pcp, nil +} + +// openStorage opens a storage file. +func (pcp *ConnectionPool) openStorage(filename string) error { + + // No file, nothing to restore/persist. + if filename == "" { + return nil + } db, err := bolt.Open(filename, 0600, nil) if err != nil { - return nil, err + return err } conns := make(map[string]*Connection) @@ -47,19 +65,16 @@ if err != nil { db.Close() - return nil, err + return err } - pcp := &PersistentConnectionPool{ - db: db, - conns: conns, - cmds: make(chan func(*PersistentConnectionPool)), - } - go pcp.run() - return pcp, nil + pcp.storage = db + pcp.conns = conns + + return nil } -func (pcp *PersistentConnectionPool) run() { +func (pcp *ConnectionPool) run() { for { select { case cmd := <-pcp.cmds: @@ -72,7 +87,7 @@ } } -func (pcp *PersistentConnectionPool) cleanDB() { +func (pcp *ConnectionPool) cleanDB() { valid := time.Now().Add(-maxDBIdle) for _, con := range pcp.conns { if con.refCount <= 0 && con.last().Before(valid) { @@ -81,7 +96,7 @@ } } -func (pcp *PersistentConnectionPool) cleanToken() { +func (pcp *ConnectionPool) cleanToken() { now := time.Now() for token, con := range pcp.conns { expires := time.Unix(con.session.ExpiresAt, 0) @@ -94,8 +109,11 @@ } } -func (pcp *PersistentConnectionPool) remove(token string) { - err := pcp.db.Update(func(tx *bolt.Tx) error { +func (pcp *ConnectionPool) remove(token string) { + if pcp.storage == nil { + return + } + err := pcp.storage.Update(func(tx *bolt.Tx) error { b := tx.Bucket(sessionsBucket) return b.Delete([]byte(token)) }) @@ -104,9 +122,9 @@ } } -func (pcp *PersistentConnectionPool) Delete(token string) bool { +func (pcp *ConnectionPool) Delete(token string) bool { res := make(chan bool) - pcp.cmds <- func(pcp *PersistentConnectionPool) { + pcp.cmds <- func(pcp *ConnectionPool) { conn, found := pcp.conns[token] if !found { res <- false @@ -120,8 +138,11 @@ return <-res } -func (pcp *PersistentConnectionPool) store(token string, con *Connection) { - err := pcp.db.Update(func(tx *bolt.Tx) error { +func (pcp *ConnectionPool) store(token string, con *Connection) { + if pcp.storage == nil { + return + } + err := pcp.storage.Update(func(tx *bolt.Tx) error { b := tx.Bucket(sessionsBucket) var buf bytes.Buffer if err := con.serialize(&buf); err != nil { @@ -134,10 +155,10 @@ } } -func (pcp *PersistentConnectionPool) Add(token string, session *Session) *Connection { +func (pcp *ConnectionPool) Add(token string, session *Session) *Connection { res := make(chan *Connection) - pcp.cmds <- func(cp *PersistentConnectionPool) { + pcp.cmds <- func(cp *ConnectionPool) { con := pcp.conns[token] if con == nil { con = &Connection{} @@ -152,7 +173,7 @@ return con } -func (pcp *PersistentConnectionPool) Renew(token string) (string, error) { +func (pcp *ConnectionPool) Renew(token string) (string, error) { type result struct { newToken string @@ -161,7 +182,7 @@ resCh := make(chan result) - pcp.cmds <- func(cp *PersistentConnectionPool) { + pcp.cmds <- func(cp *ConnectionPool) { con := pcp.conns[token] if con == nil { resCh <- result{err: ErrNoSuchToken} @@ -181,7 +202,7 @@ return r.newToken, r.err } -func (pcp *PersistentConnectionPool) trim(conn *Connection) { +func (pcp *ConnectionPool) trim(conn *Connection) { conn.refCount-- @@ -206,7 +227,7 @@ } } -func (pcp *PersistentConnectionPool) Do(token string, fn func(*sql.DB) error) error { +func (pcp *ConnectionPool) Do(token string, fn func(*sql.DB) error) error { type result struct { con *Connection @@ -215,7 +236,7 @@ res := make(chan result) - pcp.cmds <- func(pcp *PersistentConnectionPool) { + pcp.cmds <- func(pcp *ConnectionPool) { con := pcp.conns[token] if con == nil { res <- result{err: ErrNoSuchToken} @@ -251,7 +272,7 @@ } defer func() { - pcp.cmds <- func(pcp *PersistentConnectionPool) { + pcp.cmds <- func(pcp *ConnectionPool) { pcp.trim(r.con) } }() @@ -259,9 +280,9 @@ return fn(r.con.db) } -func (pcp *PersistentConnectionPool) Session(token string) *Session { +func (pcp *ConnectionPool) Session(token string) *Session { res := make(chan *Session) - pcp.cmds <- func(pcp *PersistentConnectionPool) { + pcp.cmds <- func(pcp *ConnectionPool) { con := pcp.conns[token] if con == nil { res <- nil @@ -274,11 +295,12 @@ return <-res } -func (pcp *PersistentConnectionPool) Shutdown() error { - log.Println("info: shutdown persistent connection pool.") - if db := pcp.db; db != nil { - pcp.db = nil +func (pcp *ConnectionPool) Shutdown() error { + if db := pcp.storage; db != nil { + log.Println("info: shutdown persistent connection pool.") + pcp.storage = nil return db.Close() } + log.Println("info: shutdown in-memory connection pool.") return nil }