changeset 204:3d0988d9f867

De-virtualize the connection pool implementation. The persistent connection pool implementation is a superset of the in-memory implementation. The only diffence in fact is that the in-memory variant does not store anything to file. So the persistent implementation is refactored in a way that it checks before each persistence operation if storage is given or not.
author Sascha L. Teichmann <teichmann@intevation.de>
date Sun, 22 Jul 2018 10:23:03 +0200
parents 6a802aed7f99
children 2a152816fc38
files auth/connection.go auth/inmemory.go auth/persistent.go
diffstat 3 files changed, 64 insertions(+), 260 deletions(-) [+]
line wrap: on
line diff
--- a/auth/connection.go	Sun Jul 22 09:42:47 2018 +0200
+++ b/auth/connection.go	Sun Jul 22 10:23:03 2018 +0200
@@ -14,24 +14,12 @@
 
 var ErrNoSuchToken = errors.New("No such token")
 
-type ConnectionPool interface {
-	Delete(token string) bool
-	Add(token string, session *Session) *Connection
-	Renew(token string) (string, error)
-	Do(token string, fn func(*sql.DB) error) error
-	Session(token string) *Session
-	Shutdown() error
-}
-
-var ConnPool = func() ConnectionPool {
-	if config.Config.SessionStore != "" {
-		cp, err := NewPersistentConnectionPool(config.Config.SessionStore)
-		if err != nil {
-			log.Panicf("Error with session store: %v\n", err)
-		}
-		return cp
+var ConnPool = func() *ConnectionPool {
+	cp, err := NewConnectionPool(config.Config.SessionStore)
+	if err != nil {
+		log.Panicf("Error with session store: %v\n", err)
 	}
-	return NewInMemoryConnectionPool()
+	return cp
 }()
 
 const (
--- a/auth/inmemory.go	Sun Jul 22 09:42:47 2018 +0200
+++ /dev/null	Thu Jan 01 00:00:00 1970 +0000
@@ -1,206 +0,0 @@
-package auth
-
-import (
-	"database/sql"
-	"log"
-	"time"
-)
-
-type InMemoryConnectionPool struct {
-	conns map[string]*Connection
-	cmds  chan func(*InMemoryConnectionPool)
-}
-
-func NewInMemoryConnectionPool() *InMemoryConnectionPool {
-	cp := &InMemoryConnectionPool{
-		conns: make(map[string]*Connection),
-		cmds:  make(chan func(*InMemoryConnectionPool)),
-	}
-	go cp.run()
-	return cp
-}
-
-func (cp *InMemoryConnectionPool) run() {
-	for {
-		select {
-		case cmd := <-cp.cmds:
-			cmd(cp)
-		case <-time.After(time.Minute):
-			cp.cleanDB()
-		case <-time.After(time.Minute * 5):
-			cp.cleanToken()
-		}
-	}
-}
-
-func (cp *InMemoryConnectionPool) cleanDB() {
-	valid := time.Now().Add(-maxDBIdle)
-	for _, con := range cp.conns {
-		if con.refCount <= 0 && con.last().Before(valid) {
-			con.close()
-		}
-	}
-}
-
-func (cp *InMemoryConnectionPool) cleanToken() {
-	now := time.Now()
-	for token, con := range cp.conns {
-		expires := time.Unix(con.session.ExpiresAt, 0)
-		if expires.Before(now) {
-			// TODO: Be more graceful here?
-			con.close()
-			delete(cp.conns, token)
-		}
-	}
-}
-
-func (cp *InMemoryConnectionPool) Delete(token string) bool {
-	res := make(chan bool)
-	cp.cmds <- func(cp *InMemoryConnectionPool) {
-		conn, found := cp.conns[token]
-		if !found {
-			res <- false
-			return
-		}
-		conn.close()
-		delete(cp.conns, token)
-		res <- true
-	}
-	return <-res
-}
-
-func (cp *InMemoryConnectionPool) Add(token string, session *Session) *Connection {
-	res := make(chan *Connection)
-
-	cp.cmds <- func(cp *InMemoryConnectionPool) {
-		con := cp.conns[token]
-		if con == nil {
-			con = &Connection{}
-			cp.conns[token] = con
-		}
-		con.set(session)
-		res <- con
-	}
-
-	con := <-res
-	return con
-}
-
-func (cp *InMemoryConnectionPool) Renew(token string) (string, error) {
-
-	type result struct {
-		newToken string
-		err      error
-	}
-
-	resCh := make(chan result)
-
-	cp.cmds <- func(cp *InMemoryConnectionPool) {
-		con := cp.conns[token]
-		if con == nil {
-			resCh <- result{err: ErrNoSuchToken}
-		} else {
-			delete(cp.conns, token)
-			newToken := GenerateSessionKey()
-			// TODO: Ensure that this is not racy!
-			con.session.ExpiresAt = time.Now().Add(maxTokenValid).Unix()
-			cp.conns[newToken] = con
-			resCh <- result{newToken: newToken}
-		}
-	}
-
-	r := <-resCh
-	return r.newToken, r.err
-}
-
-func (cp *InMemoryConnectionPool) trim(conn *Connection) {
-
-	conn.refCount--
-
-	for {
-		least := time.Now()
-		var count int
-		var oldest *Connection
-
-		for _, con := range cp.conns {
-			if con.db != nil && con.refCount <= 0 {
-				if last := con.last(); last.Before(least) {
-					least = last
-					oldest = con
-				}
-				count++
-			}
-		}
-		if count <= maxOpen {
-			break
-		}
-		oldest.close()
-	}
-}
-
-func (cp *InMemoryConnectionPool) Do(token string, fn func(*sql.DB) error) error {
-
-	type result struct {
-		con *Connection
-		err error
-	}
-
-	res := make(chan result)
-
-	cp.cmds <- func(cp *InMemoryConnectionPool) {
-		con := cp.conns[token]
-		if con == nil {
-			res <- result{err: ErrNoSuchToken}
-			return
-		}
-		con.touch()
-		if con.db != nil {
-			con.refCount++
-			res <- result{con: con}
-			return
-		}
-
-		session := con.session
-		db, err := opendb(session.User, session.Password)
-		if err != nil {
-			res <- result{err: err}
-			return
-		}
-		con.db = db
-		con.refCount++
-		res <- result{con: con}
-	}
-
-	r := <-res
-
-	if r.err != nil {
-		return r.err
-	}
-
-	defer func() {
-		cp.cmds <- func(cp *InMemoryConnectionPool) {
-			cp.trim(r.con)
-		}
-	}()
-
-	return fn(r.con.db)
-}
-
-func (cp *InMemoryConnectionPool) Session(token string) *Session {
-	res := make(chan *Session)
-	cp.cmds <- func(cp *InMemoryConnectionPool) {
-		con := cp.conns[token]
-		if con == nil {
-			res <- nil
-		} else {
-			con.touch()
-			res <- con.session
-		}
-	}
-	return <-res
-}
-
-func (cp *InMemoryConnectionPool) Shutdown() error {
-	log.Println("info: shutdown in-memory connection pool.")
-	return nil
-}
--- a/auth/persistent.go	Sun Jul 22 09:42:47 2018 +0200
+++ b/auth/persistent.go	Sun Jul 22 10:23:03 2018 +0200
@@ -9,19 +9,37 @@
 	bolt "github.com/coreos/bbolt"
 )
 
-type PersistentConnectionPool struct {
-	db    *bolt.DB
-	conns map[string]*Connection
-	cmds  chan func(*PersistentConnectionPool)
+type ConnectionPool struct {
+	storage *bolt.DB
+	conns   map[string]*Connection
+	cmds    chan func(*ConnectionPool)
 }
 
 var sessionsBucket = []byte("sessions")
 
-func NewPersistentConnectionPool(filename string) (*PersistentConnectionPool, error) {
+func NewConnectionPool(filename string) (*ConnectionPool, error) {
+
+	pcp := &ConnectionPool{
+		cmds: make(chan func(*ConnectionPool)),
+	}
+	if err := pcp.openStorage(filename); err != nil {
+		return nil, err
+	}
+	go pcp.run()
+	return pcp, nil
+}
+
+// openStorage opens a storage file.
+func (pcp *ConnectionPool) openStorage(filename string) error {
+
+	// No file, nothing to restore/persist.
+	if filename == "" {
+		return nil
+	}
 
 	db, err := bolt.Open(filename, 0600, nil)
 	if err != nil {
-		return nil, err
+		return err
 	}
 
 	conns := make(map[string]*Connection)
@@ -47,19 +65,16 @@
 
 	if err != nil {
 		db.Close()
-		return nil, err
+		return err
 	}
 
-	pcp := &PersistentConnectionPool{
-		db:    db,
-		conns: conns,
-		cmds:  make(chan func(*PersistentConnectionPool)),
-	}
-	go pcp.run()
-	return pcp, nil
+	pcp.storage = db
+	pcp.conns = conns
+
+	return nil
 }
 
-func (pcp *PersistentConnectionPool) run() {
+func (pcp *ConnectionPool) run() {
 	for {
 		select {
 		case cmd := <-pcp.cmds:
@@ -72,7 +87,7 @@
 	}
 }
 
-func (pcp *PersistentConnectionPool) cleanDB() {
+func (pcp *ConnectionPool) cleanDB() {
 	valid := time.Now().Add(-maxDBIdle)
 	for _, con := range pcp.conns {
 		if con.refCount <= 0 && con.last().Before(valid) {
@@ -81,7 +96,7 @@
 	}
 }
 
-func (pcp *PersistentConnectionPool) cleanToken() {
+func (pcp *ConnectionPool) cleanToken() {
 	now := time.Now()
 	for token, con := range pcp.conns {
 		expires := time.Unix(con.session.ExpiresAt, 0)
@@ -94,8 +109,11 @@
 	}
 }
 
-func (pcp *PersistentConnectionPool) remove(token string) {
-	err := pcp.db.Update(func(tx *bolt.Tx) error {
+func (pcp *ConnectionPool) remove(token string) {
+	if pcp.storage == nil {
+		return
+	}
+	err := pcp.storage.Update(func(tx *bolt.Tx) error {
 		b := tx.Bucket(sessionsBucket)
 		return b.Delete([]byte(token))
 	})
@@ -104,9 +122,9 @@
 	}
 }
 
-func (pcp *PersistentConnectionPool) Delete(token string) bool {
+func (pcp *ConnectionPool) Delete(token string) bool {
 	res := make(chan bool)
-	pcp.cmds <- func(pcp *PersistentConnectionPool) {
+	pcp.cmds <- func(pcp *ConnectionPool) {
 		conn, found := pcp.conns[token]
 		if !found {
 			res <- false
@@ -120,8 +138,11 @@
 	return <-res
 }
 
-func (pcp *PersistentConnectionPool) store(token string, con *Connection) {
-	err := pcp.db.Update(func(tx *bolt.Tx) error {
+func (pcp *ConnectionPool) store(token string, con *Connection) {
+	if pcp.storage == nil {
+		return
+	}
+	err := pcp.storage.Update(func(tx *bolt.Tx) error {
 		b := tx.Bucket(sessionsBucket)
 		var buf bytes.Buffer
 		if err := con.serialize(&buf); err != nil {
@@ -134,10 +155,10 @@
 	}
 }
 
-func (pcp *PersistentConnectionPool) Add(token string, session *Session) *Connection {
+func (pcp *ConnectionPool) Add(token string, session *Session) *Connection {
 	res := make(chan *Connection)
 
-	pcp.cmds <- func(cp *PersistentConnectionPool) {
+	pcp.cmds <- func(cp *ConnectionPool) {
 		con := pcp.conns[token]
 		if con == nil {
 			con = &Connection{}
@@ -152,7 +173,7 @@
 	return con
 }
 
-func (pcp *PersistentConnectionPool) Renew(token string) (string, error) {
+func (pcp *ConnectionPool) Renew(token string) (string, error) {
 
 	type result struct {
 		newToken string
@@ -161,7 +182,7 @@
 
 	resCh := make(chan result)
 
-	pcp.cmds <- func(cp *PersistentConnectionPool) {
+	pcp.cmds <- func(cp *ConnectionPool) {
 		con := pcp.conns[token]
 		if con == nil {
 			resCh <- result{err: ErrNoSuchToken}
@@ -181,7 +202,7 @@
 	return r.newToken, r.err
 }
 
-func (pcp *PersistentConnectionPool) trim(conn *Connection) {
+func (pcp *ConnectionPool) trim(conn *Connection) {
 
 	conn.refCount--
 
@@ -206,7 +227,7 @@
 	}
 }
 
-func (pcp *PersistentConnectionPool) Do(token string, fn func(*sql.DB) error) error {
+func (pcp *ConnectionPool) Do(token string, fn func(*sql.DB) error) error {
 
 	type result struct {
 		con *Connection
@@ -215,7 +236,7 @@
 
 	res := make(chan result)
 
-	pcp.cmds <- func(pcp *PersistentConnectionPool) {
+	pcp.cmds <- func(pcp *ConnectionPool) {
 		con := pcp.conns[token]
 		if con == nil {
 			res <- result{err: ErrNoSuchToken}
@@ -251,7 +272,7 @@
 	}
 
 	defer func() {
-		pcp.cmds <- func(pcp *PersistentConnectionPool) {
+		pcp.cmds <- func(pcp *ConnectionPool) {
 			pcp.trim(r.con)
 		}
 	}()
@@ -259,9 +280,9 @@
 	return fn(r.con.db)
 }
 
-func (pcp *PersistentConnectionPool) Session(token string) *Session {
+func (pcp *ConnectionPool) Session(token string) *Session {
 	res := make(chan *Session)
-	pcp.cmds <- func(pcp *PersistentConnectionPool) {
+	pcp.cmds <- func(pcp *ConnectionPool) {
 		con := pcp.conns[token]
 		if con == nil {
 			res <- nil
@@ -274,11 +295,12 @@
 	return <-res
 }
 
-func (pcp *PersistentConnectionPool) Shutdown() error {
-	log.Println("info: shutdown persistent connection pool.")
-	if db := pcp.db; db != nil {
-		pcp.db = nil
+func (pcp *ConnectionPool) Shutdown() error {
+	if db := pcp.storage; db != nil {
+		log.Println("info: shutdown persistent connection pool.")
+		pcp.storage = nil
 		return db.Close()
 	}
+	log.Println("info: shutdown in-memory connection pool.")
 	return nil
 }