view auth/persistent.go @ 199:ddc7ef95c247

Implemented Add of persistent sessions.
author Sascha L. Teichmann <teichmann@intevation.de>
date Fri, 20 Jul 2018 23:06:53 +0200
parents e85413e5befa
children 8426a92fda00
line wrap: on
line source

package auth

import (
	"bytes"
	"database/sql"
	"log"
	"time"

	bolt "github.com/coreos/bbolt"
)

type PersistentConnectionPool struct {
	db    *bolt.DB
	conns map[string]*Connection
	cmds  chan func(*PersistentConnectionPool)
}

var sessionsBucket = []byte("sessions")

func NewPersistentConnectionPool(filename string) (*PersistentConnectionPool, error) {

	db, err := bolt.Open(filename, 0600, nil)
	if err != nil {
		return nil, err
	}

	conns := make(map[string]*Connection)
	err = db.Update(func(tx *bolt.Tx) error {
		b, err := tx.CreateBucketIfNotExists(sessionsBucket)
		if err != nil {
			return err
		}

		// pre-load sessions
		c := b.Cursor()

		for k, v := c.First(); k != nil; k, v = c.Next() {
			var conn Connection
			if err := conn.deserialize(bytes.NewReader(v)); err != nil {
				return err
			}
			conns[string(k)] = &conn
		}

		return nil
	})

	if err != nil {
		db.Close()
		return nil, err
	}

	pcp := &PersistentConnectionPool{
		db:    db,
		conns: conns,
		cmds:  make(chan func(*PersistentConnectionPool)),
	}
	go pcp.run()
	return pcp, nil
}

func (pcp *PersistentConnectionPool) run() {
	for {
		select {
		case cmd := <-pcp.cmds:
			cmd(pcp)
		case <-time.After(time.Minute):
			pcp.cleanDB()
		case <-time.After(time.Minute * 5):
			pcp.cleanToken()
		}
	}
}

func (pcp *PersistentConnectionPool) cleanDB() {
	log.Println("cleanDB: Not implemented, yet.")
}

func (pcp *PersistentConnectionPool) cleanToken() {
	log.Println("cleanToken: Not implemented, yet.")
}

func (pcp *PersistentConnectionPool) Delete(token string) bool {
	res := make(chan bool)
	pcp.cmds <- func(pcp *PersistentConnectionPool) {
		conn, found := pcp.conns[token]
		if !found {
			res <- false
			return
		}
		conn.close()
		delete(pcp.conns, token)
		err := pcp.db.Update(func(tx *bolt.Tx) error {
			b := tx.Bucket(sessionsBucket)
			return b.Delete([]byte(token))
		})
		if err != nil {
			log.Printf("error: %v\n", err)
		}
		res <- true
	}
	return <-res
}

func (pcp *PersistentConnectionPool) Add(token string, session *Session) *Connection {
	res := make(chan *Connection)

	pcp.cmds <- func(cp *PersistentConnectionPool) {
		con := pcp.conns[token]
		if con == nil {
			con = &Connection{}
			pcp.conns[token] = con
		}
		con.set(session)
		err := pcp.db.Update(func(tx *bolt.Tx) error {
			b := tx.Bucket(sessionsBucket)
			var buf bytes.Buffer
			if err := con.serialize(&buf); err != nil {
				return err
			}
			return b.Put([]byte(token), buf.Bytes())
		})
		if err != nil {
			log.Printf("error: %v\n", err)
		}
		res <- con
	}

	con := <-res
	return con
}

func (pcp *PersistentConnectionPool) Renew(token string) (string, error) {
	log.Println("Renew: Not implemented, yet.")
	return "", nil
}

func (pcp *PersistentConnectionPool) Do(token string, fn func(*sql.DB) error) error {
	log.Println("Do: Not implemented, yet.")
	return nil
}

func (pcp *PersistentConnectionPool) Session(token string) *Session {
	log.Println("Session: Not implemented, yet.")
	return nil
}

func (pcp *PersistentConnectionPool) Shutdown() error {
	log.Println("info: shutdown persistent connection pool.")
	if db := pcp.db; db != nil {
		pcp.db = nil
		return db.Close()
	}
	return nil
}