Mercurial > gemma
diff auth/persistent.go @ 204:3d0988d9f867
De-virtualize the connection pool implementation.
The persistent connection pool implementation is
a superset of the in-memory implementation. The only
diffence in fact is that the in-memory variant does not
store anything to file. So the persistent implementation
is refactored in a way that it checks before each
persistence operation if storage is given or not.
author | Sascha L. Teichmann <teichmann@intevation.de> |
---|---|
date | Sun, 22 Jul 2018 10:23:03 +0200 |
parents | 6a802aed7f99 |
children |
line wrap: on
line diff
--- a/auth/persistent.go Sun Jul 22 09:42:47 2018 +0200 +++ b/auth/persistent.go Sun Jul 22 10:23:03 2018 +0200 @@ -9,19 +9,37 @@ bolt "github.com/coreos/bbolt" ) -type PersistentConnectionPool struct { - db *bolt.DB - conns map[string]*Connection - cmds chan func(*PersistentConnectionPool) +type ConnectionPool struct { + storage *bolt.DB + conns map[string]*Connection + cmds chan func(*ConnectionPool) } var sessionsBucket = []byte("sessions") -func NewPersistentConnectionPool(filename string) (*PersistentConnectionPool, error) { +func NewConnectionPool(filename string) (*ConnectionPool, error) { + + pcp := &ConnectionPool{ + cmds: make(chan func(*ConnectionPool)), + } + if err := pcp.openStorage(filename); err != nil { + return nil, err + } + go pcp.run() + return pcp, nil +} + +// openStorage opens a storage file. +func (pcp *ConnectionPool) openStorage(filename string) error { + + // No file, nothing to restore/persist. + if filename == "" { + return nil + } db, err := bolt.Open(filename, 0600, nil) if err != nil { - return nil, err + return err } conns := make(map[string]*Connection) @@ -47,19 +65,16 @@ if err != nil { db.Close() - return nil, err + return err } - pcp := &PersistentConnectionPool{ - db: db, - conns: conns, - cmds: make(chan func(*PersistentConnectionPool)), - } - go pcp.run() - return pcp, nil + pcp.storage = db + pcp.conns = conns + + return nil } -func (pcp *PersistentConnectionPool) run() { +func (pcp *ConnectionPool) run() { for { select { case cmd := <-pcp.cmds: @@ -72,7 +87,7 @@ } } -func (pcp *PersistentConnectionPool) cleanDB() { +func (pcp *ConnectionPool) cleanDB() { valid := time.Now().Add(-maxDBIdle) for _, con := range pcp.conns { if con.refCount <= 0 && con.last().Before(valid) { @@ -81,7 +96,7 @@ } } -func (pcp *PersistentConnectionPool) cleanToken() { +func (pcp *ConnectionPool) cleanToken() { now := time.Now() for token, con := range pcp.conns { expires := time.Unix(con.session.ExpiresAt, 0) @@ -94,8 +109,11 @@ } } -func (pcp *PersistentConnectionPool) remove(token string) { - err := pcp.db.Update(func(tx *bolt.Tx) error { +func (pcp *ConnectionPool) remove(token string) { + if pcp.storage == nil { + return + } + err := pcp.storage.Update(func(tx *bolt.Tx) error { b := tx.Bucket(sessionsBucket) return b.Delete([]byte(token)) }) @@ -104,9 +122,9 @@ } } -func (pcp *PersistentConnectionPool) Delete(token string) bool { +func (pcp *ConnectionPool) Delete(token string) bool { res := make(chan bool) - pcp.cmds <- func(pcp *PersistentConnectionPool) { + pcp.cmds <- func(pcp *ConnectionPool) { conn, found := pcp.conns[token] if !found { res <- false @@ -120,8 +138,11 @@ return <-res } -func (pcp *PersistentConnectionPool) store(token string, con *Connection) { - err := pcp.db.Update(func(tx *bolt.Tx) error { +func (pcp *ConnectionPool) store(token string, con *Connection) { + if pcp.storage == nil { + return + } + err := pcp.storage.Update(func(tx *bolt.Tx) error { b := tx.Bucket(sessionsBucket) var buf bytes.Buffer if err := con.serialize(&buf); err != nil { @@ -134,10 +155,10 @@ } } -func (pcp *PersistentConnectionPool) Add(token string, session *Session) *Connection { +func (pcp *ConnectionPool) Add(token string, session *Session) *Connection { res := make(chan *Connection) - pcp.cmds <- func(cp *PersistentConnectionPool) { + pcp.cmds <- func(cp *ConnectionPool) { con := pcp.conns[token] if con == nil { con = &Connection{} @@ -152,7 +173,7 @@ return con } -func (pcp *PersistentConnectionPool) Renew(token string) (string, error) { +func (pcp *ConnectionPool) Renew(token string) (string, error) { type result struct { newToken string @@ -161,7 +182,7 @@ resCh := make(chan result) - pcp.cmds <- func(cp *PersistentConnectionPool) { + pcp.cmds <- func(cp *ConnectionPool) { con := pcp.conns[token] if con == nil { resCh <- result{err: ErrNoSuchToken} @@ -181,7 +202,7 @@ return r.newToken, r.err } -func (pcp *PersistentConnectionPool) trim(conn *Connection) { +func (pcp *ConnectionPool) trim(conn *Connection) { conn.refCount-- @@ -206,7 +227,7 @@ } } -func (pcp *PersistentConnectionPool) Do(token string, fn func(*sql.DB) error) error { +func (pcp *ConnectionPool) Do(token string, fn func(*sql.DB) error) error { type result struct { con *Connection @@ -215,7 +236,7 @@ res := make(chan result) - pcp.cmds <- func(pcp *PersistentConnectionPool) { + pcp.cmds <- func(pcp *ConnectionPool) { con := pcp.conns[token] if con == nil { res <- result{err: ErrNoSuchToken} @@ -251,7 +272,7 @@ } defer func() { - pcp.cmds <- func(pcp *PersistentConnectionPool) { + pcp.cmds <- func(pcp *ConnectionPool) { pcp.trim(r.con) } }() @@ -259,9 +280,9 @@ return fn(r.con.db) } -func (pcp *PersistentConnectionPool) Session(token string) *Session { +func (pcp *ConnectionPool) Session(token string) *Session { res := make(chan *Session) - pcp.cmds <- func(pcp *PersistentConnectionPool) { + pcp.cmds <- func(pcp *ConnectionPool) { con := pcp.conns[token] if con == nil { res <- nil @@ -274,11 +295,12 @@ return <-res } -func (pcp *PersistentConnectionPool) Shutdown() error { - log.Println("info: shutdown persistent connection pool.") - if db := pcp.db; db != nil { - pcp.db = nil +func (pcp *ConnectionPool) Shutdown() error { + if db := pcp.storage; db != nil { + log.Println("info: shutdown persistent connection pool.") + pcp.storage = nil return db.Close() } + log.Println("info: shutdown in-memory connection pool.") return nil }