diff auth/persistent.go @ 193:1585c334e8a7

More on persisting sessions.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Fri, 20 Jul 2018 18:32:11 +0200
parents 3457a60fb12d
children e85413e5befa
line wrap: on
line diff
--- a/auth/persistent.go	Fri Jul 20 17:11:57 2018 +0200
+++ b/auth/persistent.go	Fri Jul 20 18:32:11 2018 +0200
@@ -1,26 +1,105 @@
 package auth
 
 import (
+	"bytes"
 	"database/sql"
 	"log"
+	"time"
+
+	bolt "github.com/coreos/bbolt"
 )
 
 type PersistentConnectionPool struct {
-	filename string
+	db    *bolt.DB
+	conns map[string]*Connection
+	cmds  chan func(*PersistentConnectionPool)
 }
 
+var sessionsBucket = []byte("sessions")
+
 func NewPersistentConnectionPool(filename string) (*PersistentConnectionPool, error) {
 
-	log.Println("NewInMemoryConnectionPool: Not implemented, yet.")
+	db, err := bolt.Open(filename, 0600, nil)
+	if err != nil {
+		return nil, err
+	}
+
+	conns := make(map[string]*Connection)
+	err = db.Update(func(tx *bolt.Tx) error {
+		b, err := tx.CreateBucketIfNotExists(sessionsBucket)
+		if err != nil {
+			return err
+		}
+
+		// pre-load sessions
+		c := b.Cursor()
+
+		for k, v := c.First(); k != nil; k, v = c.Next() {
+			var conn Connection
+			if err := conn.unserialize(bytes.NewReader(v)); err != nil {
+				return err
+			}
+			conns[string(k)] = &conn
+		}
+
+		return nil
+	})
+
+	if err != nil {
+		db.Close()
+		return nil, err
+	}
+
 	pcp := &PersistentConnectionPool{
-		filename: filename,
+		db:    db,
+		conns: conns,
+		cmds:  make(chan func(*PersistentConnectionPool)),
 	}
+	go pcp.run()
 	return pcp, nil
 }
 
+func (pcp *PersistentConnectionPool) run() {
+	for {
+		select {
+		case cmd := <-pcp.cmds:
+			cmd(pcp)
+		case <-time.After(time.Minute):
+			pcp.cleanDB()
+		case <-time.After(time.Minute * 5):
+			pcp.cleanToken()
+		}
+	}
+}
+
+func (pcp *PersistentConnectionPool) cleanDB() {
+	log.Println("cleanDB: Not implemented, yet.")
+}
+
+func (pcp *PersistentConnectionPool) cleanToken() {
+	log.Println("cleanToken: Not implemented, yet.")
+}
+
 func (pcp *PersistentConnectionPool) Delete(token string) bool {
-	log.Println("Delete: Not implemented, yet.")
-	return false
+	res := make(chan bool)
+	pcp.cmds <- func(pcp *PersistentConnectionPool) {
+		conn, found := pcp.conns[token]
+		if !found {
+			res <- false
+			return
+		}
+		conn.close()
+		delete(pcp.conns, token)
+		err := pcp.db.Update(func(tx *bolt.Tx) error {
+			b := tx.Bucket(sessionsBucket)
+			return b.Delete([]byte(token))
+		})
+		if err != nil {
+			log.Printf("error: %v\n", err)
+		}
+		res <- true
+	}
+	return <-res
 }
 
 func (pcp *PersistentConnectionPool) Add(token string, session *Session) *Connection {
@@ -44,6 +123,10 @@
 }
 
 func (pcp *PersistentConnectionPool) Shutdown() error {
-	log.Println("Shutdown: Not implemented, yet.")
+	log.Println("info: shutdown persistent connection pool.")
+	if db := pcp.db; db != nil {
+		pcp.db = nil
+		return db.Close()
+	}
 	return nil
 }