comparison auth/pool.go @ 205:2a152816fc38

Renamed the file containing the connection pool to a more suited one.
author Sascha L. Teichmann <teichmann@intevation.de>
date Sun, 22 Jul 2018 10:24:28 +0200
parents auth/persistent.go@3d0988d9f867
children 2fad2931a5a6
comparison
equal deleted inserted replaced
204:3d0988d9f867 205:2a152816fc38
1 package auth
2
3 import (
4 "bytes"
5 "database/sql"
6 "log"
7 "time"
8
9 bolt "github.com/coreos/bbolt"
10 )
11
12 type ConnectionPool struct {
13 storage *bolt.DB
14 conns map[string]*Connection
15 cmds chan func(*ConnectionPool)
16 }
17
18 var sessionsBucket = []byte("sessions")
19
20 func NewConnectionPool(filename string) (*ConnectionPool, error) {
21
22 pcp := &ConnectionPool{
23 cmds: make(chan func(*ConnectionPool)),
24 }
25 if err := pcp.openStorage(filename); err != nil {
26 return nil, err
27 }
28 go pcp.run()
29 return pcp, nil
30 }
31
32 // openStorage opens a storage file.
33 func (pcp *ConnectionPool) openStorage(filename string) error {
34
35 // No file, nothing to restore/persist.
36 if filename == "" {
37 return nil
38 }
39
40 db, err := bolt.Open(filename, 0600, nil)
41 if err != nil {
42 return err
43 }
44
45 conns := make(map[string]*Connection)
46 err = db.Update(func(tx *bolt.Tx) error {
47 b, err := tx.CreateBucketIfNotExists(sessionsBucket)
48 if err != nil {
49 return err
50 }
51
52 // pre-load sessions
53 c := b.Cursor()
54
55 for k, v := c.First(); k != nil; k, v = c.Next() {
56 var conn Connection
57 if err := conn.deserialize(bytes.NewReader(v)); err != nil {
58 return err
59 }
60 conns[string(k)] = &conn
61 }
62
63 return nil
64 })
65
66 if err != nil {
67 db.Close()
68 return err
69 }
70
71 pcp.storage = db
72 pcp.conns = conns
73
74 return nil
75 }
76
77 func (pcp *ConnectionPool) run() {
78 for {
79 select {
80 case cmd := <-pcp.cmds:
81 cmd(pcp)
82 case <-time.After(time.Minute):
83 pcp.cleanDB()
84 case <-time.After(time.Minute * 5):
85 pcp.cleanToken()
86 }
87 }
88 }
89
90 func (pcp *ConnectionPool) cleanDB() {
91 valid := time.Now().Add(-maxDBIdle)
92 for _, con := range pcp.conns {
93 if con.refCount <= 0 && con.last().Before(valid) {
94 con.close()
95 }
96 }
97 }
98
99 func (pcp *ConnectionPool) cleanToken() {
100 now := time.Now()
101 for token, con := range pcp.conns {
102 expires := time.Unix(con.session.ExpiresAt, 0)
103 if expires.Before(now) {
104 // TODO: Be more graceful here?
105 con.close()
106 delete(pcp.conns, token)
107 pcp.remove(token)
108 }
109 }
110 }
111
112 func (pcp *ConnectionPool) remove(token string) {
113 if pcp.storage == nil {
114 return
115 }
116 err := pcp.storage.Update(func(tx *bolt.Tx) error {
117 b := tx.Bucket(sessionsBucket)
118 return b.Delete([]byte(token))
119 })
120 if err != nil {
121 log.Printf("error: %v\n", err)
122 }
123 }
124
125 func (pcp *ConnectionPool) Delete(token string) bool {
126 res := make(chan bool)
127 pcp.cmds <- func(pcp *ConnectionPool) {
128 conn, found := pcp.conns[token]
129 if !found {
130 res <- false
131 return
132 }
133 conn.close()
134 delete(pcp.conns, token)
135 pcp.remove(token)
136 res <- true
137 }
138 return <-res
139 }
140
141 func (pcp *ConnectionPool) store(token string, con *Connection) {
142 if pcp.storage == nil {
143 return
144 }
145 err := pcp.storage.Update(func(tx *bolt.Tx) error {
146 b := tx.Bucket(sessionsBucket)
147 var buf bytes.Buffer
148 if err := con.serialize(&buf); err != nil {
149 return err
150 }
151 return b.Put([]byte(token), buf.Bytes())
152 })
153 if err != nil {
154 log.Printf("error: %v\n", err)
155 }
156 }
157
158 func (pcp *ConnectionPool) Add(token string, session *Session) *Connection {
159 res := make(chan *Connection)
160
161 pcp.cmds <- func(cp *ConnectionPool) {
162 con := pcp.conns[token]
163 if con == nil {
164 con = &Connection{}
165 pcp.conns[token] = con
166 }
167 con.set(session)
168 pcp.store(token, con)
169 res <- con
170 }
171
172 con := <-res
173 return con
174 }
175
176 func (pcp *ConnectionPool) Renew(token string) (string, error) {
177
178 type result struct {
179 newToken string
180 err error
181 }
182
183 resCh := make(chan result)
184
185 pcp.cmds <- func(cp *ConnectionPool) {
186 con := pcp.conns[token]
187 if con == nil {
188 resCh <- result{err: ErrNoSuchToken}
189 } else {
190 delete(pcp.conns, token)
191 pcp.remove(token)
192 newToken := GenerateSessionKey()
193 // TODO: Ensure that this is not racy!
194 con.session.ExpiresAt = time.Now().Add(maxTokenValid).Unix()
195 pcp.conns[newToken] = con
196 pcp.store(newToken, con)
197 resCh <- result{newToken: newToken}
198 }
199 }
200
201 r := <-resCh
202 return r.newToken, r.err
203 }
204
205 func (pcp *ConnectionPool) trim(conn *Connection) {
206
207 conn.refCount--
208
209 for {
210 least := time.Now()
211 var count int
212 var oldest *Connection
213
214 for _, con := range pcp.conns {
215 if con.db != nil && con.refCount <= 0 {
216 if last := con.last(); last.Before(least) {
217 least = last
218 oldest = con
219 }
220 count++
221 }
222 }
223 if count <= maxOpen {
224 break
225 }
226 oldest.close()
227 }
228 }
229
230 func (pcp *ConnectionPool) Do(token string, fn func(*sql.DB) error) error {
231
232 type result struct {
233 con *Connection
234 err error
235 }
236
237 res := make(chan result)
238
239 pcp.cmds <- func(pcp *ConnectionPool) {
240 con := pcp.conns[token]
241 if con == nil {
242 res <- result{err: ErrNoSuchToken}
243 return
244 }
245 con.touch()
246 // store the session here. The ref counting for
247 // open db connections is irrelevant for persistence
248 // as they all come up closed when the system reboots.
249 pcp.store(token, con)
250
251 if con.db != nil {
252 con.refCount++
253 res <- result{con: con}
254 return
255 }
256
257 session := con.session
258 db, err := opendb(session.User, session.Password)
259 if err != nil {
260 res <- result{err: err}
261 return
262 }
263 con.db = db
264 con.refCount++
265 res <- result{con: con}
266 }
267
268 r := <-res
269
270 if r.err != nil {
271 return r.err
272 }
273
274 defer func() {
275 pcp.cmds <- func(pcp *ConnectionPool) {
276 pcp.trim(r.con)
277 }
278 }()
279
280 return fn(r.con.db)
281 }
282
283 func (pcp *ConnectionPool) Session(token string) *Session {
284 res := make(chan *Session)
285 pcp.cmds <- func(pcp *ConnectionPool) {
286 con := pcp.conns[token]
287 if con == nil {
288 res <- nil
289 } else {
290 con.touch()
291 pcp.store(token, con)
292 res <- con.session
293 }
294 }
295 return <-res
296 }
297
298 func (pcp *ConnectionPool) Shutdown() error {
299 if db := pcp.storage; db != nil {
300 log.Println("info: shutdown persistent connection pool.")
301 pcp.storage = nil
302 return db.Close()
303 }
304 log.Println("info: shutdown in-memory connection pool.")
305 return nil
306 }