Mercurial > gemma
comparison auth/pool.go @ 205:2a152816fc38
Renamed the file containing the connection pool to a more suited one.
author | Sascha L. Teichmann <teichmann@intevation.de> |
---|---|
date | Sun, 22 Jul 2018 10:24:28 +0200 |
parents | auth/persistent.go@3d0988d9f867 |
children | 2fad2931a5a6 |
comparison
equal
deleted
inserted
replaced
204:3d0988d9f867 | 205:2a152816fc38 |
---|---|
1 package auth | |
2 | |
3 import ( | |
4 "bytes" | |
5 "database/sql" | |
6 "log" | |
7 "time" | |
8 | |
9 bolt "github.com/coreos/bbolt" | |
10 ) | |
11 | |
12 type ConnectionPool struct { | |
13 storage *bolt.DB | |
14 conns map[string]*Connection | |
15 cmds chan func(*ConnectionPool) | |
16 } | |
17 | |
18 var sessionsBucket = []byte("sessions") | |
19 | |
20 func NewConnectionPool(filename string) (*ConnectionPool, error) { | |
21 | |
22 pcp := &ConnectionPool{ | |
23 cmds: make(chan func(*ConnectionPool)), | |
24 } | |
25 if err := pcp.openStorage(filename); err != nil { | |
26 return nil, err | |
27 } | |
28 go pcp.run() | |
29 return pcp, nil | |
30 } | |
31 | |
32 // openStorage opens a storage file. | |
33 func (pcp *ConnectionPool) openStorage(filename string) error { | |
34 | |
35 // No file, nothing to restore/persist. | |
36 if filename == "" { | |
37 return nil | |
38 } | |
39 | |
40 db, err := bolt.Open(filename, 0600, nil) | |
41 if err != nil { | |
42 return err | |
43 } | |
44 | |
45 conns := make(map[string]*Connection) | |
46 err = db.Update(func(tx *bolt.Tx) error { | |
47 b, err := tx.CreateBucketIfNotExists(sessionsBucket) | |
48 if err != nil { | |
49 return err | |
50 } | |
51 | |
52 // pre-load sessions | |
53 c := b.Cursor() | |
54 | |
55 for k, v := c.First(); k != nil; k, v = c.Next() { | |
56 var conn Connection | |
57 if err := conn.deserialize(bytes.NewReader(v)); err != nil { | |
58 return err | |
59 } | |
60 conns[string(k)] = &conn | |
61 } | |
62 | |
63 return nil | |
64 }) | |
65 | |
66 if err != nil { | |
67 db.Close() | |
68 return err | |
69 } | |
70 | |
71 pcp.storage = db | |
72 pcp.conns = conns | |
73 | |
74 return nil | |
75 } | |
76 | |
77 func (pcp *ConnectionPool) run() { | |
78 for { | |
79 select { | |
80 case cmd := <-pcp.cmds: | |
81 cmd(pcp) | |
82 case <-time.After(time.Minute): | |
83 pcp.cleanDB() | |
84 case <-time.After(time.Minute * 5): | |
85 pcp.cleanToken() | |
86 } | |
87 } | |
88 } | |
89 | |
90 func (pcp *ConnectionPool) cleanDB() { | |
91 valid := time.Now().Add(-maxDBIdle) | |
92 for _, con := range pcp.conns { | |
93 if con.refCount <= 0 && con.last().Before(valid) { | |
94 con.close() | |
95 } | |
96 } | |
97 } | |
98 | |
99 func (pcp *ConnectionPool) cleanToken() { | |
100 now := time.Now() | |
101 for token, con := range pcp.conns { | |
102 expires := time.Unix(con.session.ExpiresAt, 0) | |
103 if expires.Before(now) { | |
104 // TODO: Be more graceful here? | |
105 con.close() | |
106 delete(pcp.conns, token) | |
107 pcp.remove(token) | |
108 } | |
109 } | |
110 } | |
111 | |
112 func (pcp *ConnectionPool) remove(token string) { | |
113 if pcp.storage == nil { | |
114 return | |
115 } | |
116 err := pcp.storage.Update(func(tx *bolt.Tx) error { | |
117 b := tx.Bucket(sessionsBucket) | |
118 return b.Delete([]byte(token)) | |
119 }) | |
120 if err != nil { | |
121 log.Printf("error: %v\n", err) | |
122 } | |
123 } | |
124 | |
125 func (pcp *ConnectionPool) Delete(token string) bool { | |
126 res := make(chan bool) | |
127 pcp.cmds <- func(pcp *ConnectionPool) { | |
128 conn, found := pcp.conns[token] | |
129 if !found { | |
130 res <- false | |
131 return | |
132 } | |
133 conn.close() | |
134 delete(pcp.conns, token) | |
135 pcp.remove(token) | |
136 res <- true | |
137 } | |
138 return <-res | |
139 } | |
140 | |
141 func (pcp *ConnectionPool) store(token string, con *Connection) { | |
142 if pcp.storage == nil { | |
143 return | |
144 } | |
145 err := pcp.storage.Update(func(tx *bolt.Tx) error { | |
146 b := tx.Bucket(sessionsBucket) | |
147 var buf bytes.Buffer | |
148 if err := con.serialize(&buf); err != nil { | |
149 return err | |
150 } | |
151 return b.Put([]byte(token), buf.Bytes()) | |
152 }) | |
153 if err != nil { | |
154 log.Printf("error: %v\n", err) | |
155 } | |
156 } | |
157 | |
158 func (pcp *ConnectionPool) Add(token string, session *Session) *Connection { | |
159 res := make(chan *Connection) | |
160 | |
161 pcp.cmds <- func(cp *ConnectionPool) { | |
162 con := pcp.conns[token] | |
163 if con == nil { | |
164 con = &Connection{} | |
165 pcp.conns[token] = con | |
166 } | |
167 con.set(session) | |
168 pcp.store(token, con) | |
169 res <- con | |
170 } | |
171 | |
172 con := <-res | |
173 return con | |
174 } | |
175 | |
176 func (pcp *ConnectionPool) Renew(token string) (string, error) { | |
177 | |
178 type result struct { | |
179 newToken string | |
180 err error | |
181 } | |
182 | |
183 resCh := make(chan result) | |
184 | |
185 pcp.cmds <- func(cp *ConnectionPool) { | |
186 con := pcp.conns[token] | |
187 if con == nil { | |
188 resCh <- result{err: ErrNoSuchToken} | |
189 } else { | |
190 delete(pcp.conns, token) | |
191 pcp.remove(token) | |
192 newToken := GenerateSessionKey() | |
193 // TODO: Ensure that this is not racy! | |
194 con.session.ExpiresAt = time.Now().Add(maxTokenValid).Unix() | |
195 pcp.conns[newToken] = con | |
196 pcp.store(newToken, con) | |
197 resCh <- result{newToken: newToken} | |
198 } | |
199 } | |
200 | |
201 r := <-resCh | |
202 return r.newToken, r.err | |
203 } | |
204 | |
205 func (pcp *ConnectionPool) trim(conn *Connection) { | |
206 | |
207 conn.refCount-- | |
208 | |
209 for { | |
210 least := time.Now() | |
211 var count int | |
212 var oldest *Connection | |
213 | |
214 for _, con := range pcp.conns { | |
215 if con.db != nil && con.refCount <= 0 { | |
216 if last := con.last(); last.Before(least) { | |
217 least = last | |
218 oldest = con | |
219 } | |
220 count++ | |
221 } | |
222 } | |
223 if count <= maxOpen { | |
224 break | |
225 } | |
226 oldest.close() | |
227 } | |
228 } | |
229 | |
230 func (pcp *ConnectionPool) Do(token string, fn func(*sql.DB) error) error { | |
231 | |
232 type result struct { | |
233 con *Connection | |
234 err error | |
235 } | |
236 | |
237 res := make(chan result) | |
238 | |
239 pcp.cmds <- func(pcp *ConnectionPool) { | |
240 con := pcp.conns[token] | |
241 if con == nil { | |
242 res <- result{err: ErrNoSuchToken} | |
243 return | |
244 } | |
245 con.touch() | |
246 // store the session here. The ref counting for | |
247 // open db connections is irrelevant for persistence | |
248 // as they all come up closed when the system reboots. | |
249 pcp.store(token, con) | |
250 | |
251 if con.db != nil { | |
252 con.refCount++ | |
253 res <- result{con: con} | |
254 return | |
255 } | |
256 | |
257 session := con.session | |
258 db, err := opendb(session.User, session.Password) | |
259 if err != nil { | |
260 res <- result{err: err} | |
261 return | |
262 } | |
263 con.db = db | |
264 con.refCount++ | |
265 res <- result{con: con} | |
266 } | |
267 | |
268 r := <-res | |
269 | |
270 if r.err != nil { | |
271 return r.err | |
272 } | |
273 | |
274 defer func() { | |
275 pcp.cmds <- func(pcp *ConnectionPool) { | |
276 pcp.trim(r.con) | |
277 } | |
278 }() | |
279 | |
280 return fn(r.con.db) | |
281 } | |
282 | |
283 func (pcp *ConnectionPool) Session(token string) *Session { | |
284 res := make(chan *Session) | |
285 pcp.cmds <- func(pcp *ConnectionPool) { | |
286 con := pcp.conns[token] | |
287 if con == nil { | |
288 res <- nil | |
289 } else { | |
290 con.touch() | |
291 pcp.store(token, con) | |
292 res <- con.session | |
293 } | |
294 } | |
295 return <-res | |
296 } | |
297 | |
298 func (pcp *ConnectionPool) Shutdown() error { | |
299 if db := pcp.storage; db != nil { | |
300 log.Println("info: shutdown persistent connection pool.") | |
301 pcp.storage = nil | |
302 return db.Close() | |
303 } | |
304 log.Println("info: shutdown in-memory connection pool.") | |
305 return nil | |
306 } |