Mercurial > gemma
changeset 148:0116aae1071b
Made ConnectionPool an interface and use current in-memory implementation.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Mon, 02 Jul 2018 11:00:19 +0200 |
parents | 5ca6f436d0b7 |
children | 83d798ea9f58 |
files | 3rdpartylibs.sh auth/connection.go auth/inmemory.go |
diffstat | 3 files changed, 210 insertions(+), 195 deletions(-) [+] |
line wrap: on
line diff
--- a/3rdpartylibs.sh Mon Jul 02 10:34:56 2018 +0200 +++ b/3rdpartylibs.sh Mon Jul 02 11:00:19 2018 +0200 @@ -1,2 +1,3 @@ #!/bin/sh go get -u -v github.com/jackc/pgx +go get -u -v github.com/coreos/bbolt/...
--- a/auth/connection.go Mon Jul 02 10:34:56 2018 +0200 +++ b/auth/connection.go Mon Jul 02 11:00:19 2018 +0200 @@ -10,7 +10,15 @@ var ErrNoSuchToken = errors.New("No such token") -var ConnPool = NewConnectionPool() +type ConnectionPool interface { + Delete(token string) bool + Add(token string, session *Session) *Connection + Renew(token string) (string, error) + Do(token string, fn func(*sql.DB) error) error + Session(token string) *Session +} + +var ConnPool ConnectionPool = NewInMemoryConnectionPool() const ( maxOpen = 16 @@ -53,197 +61,3 @@ c.db = nil } } - -type ConnectionPool struct { - conns map[string]*Connection - cmds chan func(*ConnectionPool) -} - -func NewConnectionPool() *ConnectionPool { - cp := &ConnectionPool{ - conns: make(map[string]*Connection), - cmds: make(chan func(*ConnectionPool)), - } - go cp.run() - return cp -} - -func (cp *ConnectionPool) run() { - for { - select { - case cmd := <-cp.cmds: - cmd(cp) - case <-time.After(time.Minute): - cp.cleanDB() - case <-time.After(time.Minute * 5): - cp.cleanToken() - } - } -} - -func (cp *ConnectionPool) cleanDB() { - valid := time.Now().Add(-maxDBIdle) - for _, con := range cp.conns { - if con.refCount <= 0 && con.last().Before(valid) { - con.close() - } - } -} - -func (cp *ConnectionPool) cleanToken() { - now := time.Now() - for token, con := range cp.conns { - expires := time.Unix(con.session.ExpiresAt, 0) - if expires.Before(now) { - // TODO: Be more graceful here? - con.close() - delete(cp.conns, token) - } - } -} - -func (cp *ConnectionPool) Delete(token string) bool { - res := make(chan bool) - cp.cmds <- func(cp *ConnectionPool) { - conn, found := cp.conns[token] - if !found { - res <- false - return - } - conn.close() - delete(cp.conns, token) - res <- true - } - return <-res -} - -func (cp *ConnectionPool) Add(token string, session *Session) *Connection { - res := make(chan *Connection) - - cp.cmds <- func(cp *ConnectionPool) { - con := cp.conns[token] - if con == nil { - con = &Connection{} - cp.conns[token] = con - } - con.set(session) - res <- con - } - - con := <-res - return con -} - -func (cp ConnectionPool) Renew(token string) (string, error) { - - type result struct { - newToken string - err error - } - - resCh := make(chan result) - - cp.cmds <- func(cp *ConnectionPool) { - con := cp.conns[token] - if con == nil { - resCh <- result{err: ErrNoSuchToken} - } else { - delete(cp.conns, token) - newToken := GenerateSessionKey() - // TODO: Ensure that this is not racy! - con.session.ExpiresAt = time.Now().Add(maxTokenValid).Unix() - cp.conns[newToken] = con - resCh <- result{newToken: newToken} - } - } - - r := <-resCh - return r.newToken, r.err -} - -func (cp *ConnectionPool) trim(conn *Connection) { - - conn.refCount-- - - for { - least := time.Now() - var count int - var oldest *Connection - - for _, con := range cp.conns { - if con.db != nil && con.refCount <= 0 { - if last := con.last(); last.Before(least) { - least = last - oldest = con - } - count++ - } - } - if count <= maxOpen { - break - } - oldest.close() - } -} - -func (cp *ConnectionPool) Do(token string, fn func(*sql.DB) error) error { - - type result struct { - con *Connection - err error - } - - res := make(chan result) - - cp.cmds <- func(cp *ConnectionPool) { - con := cp.conns[token] - if con == nil { - res <- result{err: ErrNoSuchToken} - return - } - con.touch() - if con.db != nil { - con.refCount++ - res <- result{con: con} - return - } - - session := con.session - db, err := opendb(session.User, session.Password) - if err != nil { - res <- result{err: err} - return - } - con.db = db - con.refCount++ - res <- result{con: con} - } - - r := <-res - - if r.err != nil { - return r.err - } - - defer func() { - cp.cmds <- func(cp *ConnectionPool) { - cp.trim(r.con) - } - }() - - return fn(r.con.db) -} - -func (cp *ConnectionPool) Session(token string) *Session { - res := make(chan *Session) - cp.cmds <- func(cp *ConnectionPool) { - con := cp.conns[token] - if con == nil { - res <- nil - } else { - con.touch() - res <- con.session - } - } - return <-res -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/auth/inmemory.go Mon Jul 02 11:00:19 2018 +0200 @@ -0,0 +1,200 @@ +package auth + +import ( + "database/sql" + "time" +) + +type InMemoryConnectionPool struct { + conns map[string]*Connection + cmds chan func(*InMemoryConnectionPool) +} + +func NewInMemoryConnectionPool() *InMemoryConnectionPool { + cp := &InMemoryConnectionPool{ + conns: make(map[string]*Connection), + cmds: make(chan func(*InMemoryConnectionPool)), + } + go cp.run() + return cp +} + +func (cp *InMemoryConnectionPool) run() { + for { + select { + case cmd := <-cp.cmds: + cmd(cp) + case <-time.After(time.Minute): + cp.cleanDB() + case <-time.After(time.Minute * 5): + cp.cleanToken() + } + } +} + +func (cp *InMemoryConnectionPool) cleanDB() { + valid := time.Now().Add(-maxDBIdle) + for _, con := range cp.conns { + if con.refCount <= 0 && con.last().Before(valid) { + con.close() + } + } +} + +func (cp *InMemoryConnectionPool) cleanToken() { + now := time.Now() + for token, con := range cp.conns { + expires := time.Unix(con.session.ExpiresAt, 0) + if expires.Before(now) { + // TODO: Be more graceful here? + con.close() + delete(cp.conns, token) + } + } +} + +func (cp *InMemoryConnectionPool) Delete(token string) bool { + res := make(chan bool) + cp.cmds <- func(cp *InMemoryConnectionPool) { + conn, found := cp.conns[token] + if !found { + res <- false + return + } + conn.close() + delete(cp.conns, token) + res <- true + } + return <-res +} + +func (cp *InMemoryConnectionPool) Add(token string, session *Session) *Connection { + res := make(chan *Connection) + + cp.cmds <- func(cp *InMemoryConnectionPool) { + con := cp.conns[token] + if con == nil { + con = &Connection{} + cp.conns[token] = con + } + con.set(session) + res <- con + } + + con := <-res + return con +} + +func (cp *InMemoryConnectionPool) Renew(token string) (string, error) { + + type result struct { + newToken string + err error + } + + resCh := make(chan result) + + cp.cmds <- func(cp *InMemoryConnectionPool) { + con := cp.conns[token] + if con == nil { + resCh <- result{err: ErrNoSuchToken} + } else { + delete(cp.conns, token) + newToken := GenerateSessionKey() + // TODO: Ensure that this is not racy! + con.session.ExpiresAt = time.Now().Add(maxTokenValid).Unix() + cp.conns[newToken] = con + resCh <- result{newToken: newToken} + } + } + + r := <-resCh + return r.newToken, r.err +} + +func (cp *InMemoryConnectionPool) trim(conn *Connection) { + + conn.refCount-- + + for { + least := time.Now() + var count int + var oldest *Connection + + for _, con := range cp.conns { + if con.db != nil && con.refCount <= 0 { + if last := con.last(); last.Before(least) { + least = last + oldest = con + } + count++ + } + } + if count <= maxOpen { + break + } + oldest.close() + } +} + +func (cp *InMemoryConnectionPool) Do(token string, fn func(*sql.DB) error) error { + + type result struct { + con *Connection + err error + } + + res := make(chan result) + + cp.cmds <- func(cp *InMemoryConnectionPool) { + con := cp.conns[token] + if con == nil { + res <- result{err: ErrNoSuchToken} + return + } + con.touch() + if con.db != nil { + con.refCount++ + res <- result{con: con} + return + } + + session := con.session + db, err := opendb(session.User, session.Password) + if err != nil { + res <- result{err: err} + return + } + con.db = db + con.refCount++ + res <- result{con: con} + } + + r := <-res + + if r.err != nil { + return r.err + } + + defer func() { + cp.cmds <- func(cp *InMemoryConnectionPool) { + cp.trim(r.con) + } + }() + + return fn(r.con.db) +} + +func (cp *InMemoryConnectionPool) Session(token string) *Session { + res := make(chan *Session) + cp.cmds <- func(cp *InMemoryConnectionPool) { + con := cp.conns[token] + if con == nil { + res <- nil + } else { + con.touch() + res <- con.session + } + } + return <-res +}