view pkg/auth/store.go @ 498:22e1bf563a04 metamorph-for-all

Throw away the connection level for sessions. This is not needed any more because the db connection are not bound to the sessions any more.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Fri, 24 Aug 2018 15:12:22 +0200
parents 5c08afd15ce7
children b6796cd91604
line wrap: on
line source

package auth

import (
	"bytes"
	"errors"
	"log"
	"time"

	bolt "github.com/coreos/bbolt"
)

var ErrNoSuchToken = errors.New("No such token")

// Sessions is the global connection pool.
var Sessions *SessionStore

type SessionStore struct {
	storage  *bolt.DB
	sessions map[string]*Session
	cmds     chan func(*SessionStore)
}

var sessionsBucket = []byte("sessions")

func NewSessionStore(filename string) (*SessionStore, error) {

	pcp := &SessionStore{
		sessions: make(map[string]*Session),
		cmds:     make(chan func(*SessionStore)),
	}
	if err := pcp.openStorage(filename); err != nil {
		return nil, err
	}
	go pcp.run()
	return pcp, nil
}

// openStorage opens a storage file.
func (pcp *SessionStore) openStorage(filename string) error {

	// No file, nothing to restore/persist.
	if filename == "" {
		return nil
	}

	db, err := bolt.Open(filename, 0600, nil)
	if err != nil {
		return err
	}

	err = db.Update(func(tx *bolt.Tx) error {
		b, err := tx.CreateBucketIfNotExists(sessionsBucket)
		if err != nil {
			return err
		}

		// pre-load sessions
		c := b.Cursor()

		for k, v := c.First(); k != nil; k, v = c.Next() {
			var session Session
			if err := session.deserialize(bytes.NewReader(v)); err != nil {
				return err
			}
			pcp.sessions[string(k)] = &session
		}

		return nil
	})

	if err != nil {
		db.Close()
		return err
	}

	pcp.storage = db
	return nil
}

func (pcp *SessionStore) run() {
	for {
		select {
		case cmd := <-pcp.cmds:
			cmd(pcp)
		case <-time.After(time.Minute * 5):
			pcp.cleanToken()
		}
	}
}

func (pcp *SessionStore) cleanToken() {
	now := time.Now()
	for token, session := range pcp.sessions {
		expires := time.Unix(session.ExpiresAt, 0)
		if expires.Before(now) {
			delete(pcp.sessions, token)
			pcp.remove(token)
		}
	}
}

func (pcp *SessionStore) remove(token string) {
	if pcp.storage == nil {
		return
	}
	err := pcp.storage.Update(func(tx *bolt.Tx) error {
		b := tx.Bucket(sessionsBucket)
		return b.Delete([]byte(token))
	})
	if err != nil {
		log.Printf("error: %v\n", err)
	}
}

func (pcp *SessionStore) Delete(token string) bool {
	res := make(chan bool)
	pcp.cmds <- func(pcp *SessionStore) {
		if _, found := pcp.sessions[token]; !found {
			res <- false
			return
		}
		delete(pcp.sessions, token)
		pcp.remove(token)
		res <- true
	}
	return <-res
}

func (pcp *SessionStore) store(token string, session *Session) {
	if pcp.storage == nil {
		return
	}
	err := pcp.storage.Update(func(tx *bolt.Tx) error {
		b := tx.Bucket(sessionsBucket)
		var buf bytes.Buffer
		if err := session.serialize(&buf); err != nil {
			return err
		}
		return b.Put([]byte(token), buf.Bytes())
	})
	if err != nil {
		log.Printf("error: %v\n", err)
	}
}

func (pcp *SessionStore) Add(token string, session *Session) *Session {
	res := make(chan *Session)

	pcp.cmds <- func(cp *SessionStore) {
		s := pcp.sessions[token]
		if s == nil {
			s = session
			pcp.sessions[token] = session
		}
		s.touch()
		pcp.store(token, s)
		res <- s
	}

	s := <-res
	return s
}

func (pcp *SessionStore) Renew(token string) (string, error) {

	type result struct {
		newToken string
		err      error
	}

	resCh := make(chan result)

	pcp.cmds <- func(cp *SessionStore) {
		session := pcp.sessions[token]
		if session == nil {
			resCh <- result{err: ErrNoSuchToken}
		} else {
			delete(pcp.sessions, token)
			pcp.remove(token)
			newToken := GenerateSessionKey()
			// TODO: Ensure that this is not racy!
			session.ExpiresAt = time.Now().Add(maxTokenValid).Unix()
			pcp.sessions[newToken] = session
			pcp.store(newToken, session)
			resCh <- result{newToken: newToken}
		}
	}

	r := <-resCh
	return r.newToken, r.err
}

func (pcp *SessionStore) Session(token string) *Session {
	res := make(chan *Session)
	pcp.cmds <- func(pcp *SessionStore) {
		session := pcp.sessions[token]
		if session == nil {
			res <- nil
		} else {
			session.touch()
			pcp.store(token, session)
			res <- session
		}
	}
	return <-res
}

func (pcp *SessionStore) Logout(user string) {
	pcp.cmds <- func(pcp *SessionStore) {
		for token, session := range pcp.sessions {
			if session.User == user {
				delete(pcp.sessions, token)
				pcp.remove(token)
			}
		}
	}
}

func (pcp *SessionStore) Shutdown() error {
	if db := pcp.storage; db != nil {
		log.Println("info: shutdown persistent connection pool.")
		pcp.storage = nil
		return db.Close()
	}
	log.Println("info: shutdown in-memory connection pool.")
	return nil
}