comparison auth/connection.go @ 148:0116aae1071b

Made ConnectionPool an interface and use current in-memory implementation.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 02 Jul 2018 11:00:19 +0200
parents 0c56c56a1c44
children 3349bfc2a047
comparison
equal deleted inserted replaced
147:5ca6f436d0b7 148:0116aae1071b
8 "time" 8 "time"
9 ) 9 )
10 10
11 var ErrNoSuchToken = errors.New("No such token") 11 var ErrNoSuchToken = errors.New("No such token")
12 12
13 var ConnPool = NewConnectionPool() 13 type ConnectionPool interface {
14 Delete(token string) bool
15 Add(token string, session *Session) *Connection
16 Renew(token string) (string, error)
17 Do(token string, fn func(*sql.DB) error) error
18 Session(token string) *Session
19 }
20
21 var ConnPool ConnectionPool = NewInMemoryConnectionPool()
14 22
15 const ( 23 const (
16 maxOpen = 16 24 maxOpen = 16
17 maxDBIdle = time.Minute * 5 25 maxDBIdle = time.Minute * 5
18 ) 26 )
51 log.Printf("warn: %v\n", err) 59 log.Printf("warn: %v\n", err)
52 } 60 }
53 c.db = nil 61 c.db = nil
54 } 62 }
55 } 63 }
56
57 type ConnectionPool struct {
58 conns map[string]*Connection
59 cmds chan func(*ConnectionPool)
60 }
61
62 func NewConnectionPool() *ConnectionPool {
63 cp := &ConnectionPool{
64 conns: make(map[string]*Connection),
65 cmds: make(chan func(*ConnectionPool)),
66 }
67 go cp.run()
68 return cp
69 }
70
71 func (cp *ConnectionPool) run() {
72 for {
73 select {
74 case cmd := <-cp.cmds:
75 cmd(cp)
76 case <-time.After(time.Minute):
77 cp.cleanDB()
78 case <-time.After(time.Minute * 5):
79 cp.cleanToken()
80 }
81 }
82 }
83
84 func (cp *ConnectionPool) cleanDB() {
85 valid := time.Now().Add(-maxDBIdle)
86 for _, con := range cp.conns {
87 if con.refCount <= 0 && con.last().Before(valid) {
88 con.close()
89 }
90 }
91 }
92
93 func (cp *ConnectionPool) cleanToken() {
94 now := time.Now()
95 for token, con := range cp.conns {
96 expires := time.Unix(con.session.ExpiresAt, 0)
97 if expires.Before(now) {
98 // TODO: Be more graceful here?
99 con.close()
100 delete(cp.conns, token)
101 }
102 }
103 }
104
105 func (cp *ConnectionPool) Delete(token string) bool {
106 res := make(chan bool)
107 cp.cmds <- func(cp *ConnectionPool) {
108 conn, found := cp.conns[token]
109 if !found {
110 res <- false
111 return
112 }
113 conn.close()
114 delete(cp.conns, token)
115 res <- true
116 }
117 return <-res
118 }
119
120 func (cp *ConnectionPool) Add(token string, session *Session) *Connection {
121 res := make(chan *Connection)
122
123 cp.cmds <- func(cp *ConnectionPool) {
124 con := cp.conns[token]
125 if con == nil {
126 con = &Connection{}
127 cp.conns[token] = con
128 }
129 con.set(session)
130 res <- con
131 }
132
133 con := <-res
134 return con
135 }
136
137 func (cp ConnectionPool) Renew(token string) (string, error) {
138
139 type result struct {
140 newToken string
141 err error
142 }
143
144 resCh := make(chan result)
145
146 cp.cmds <- func(cp *ConnectionPool) {
147 con := cp.conns[token]
148 if con == nil {
149 resCh <- result{err: ErrNoSuchToken}
150 } else {
151 delete(cp.conns, token)
152 newToken := GenerateSessionKey()
153 // TODO: Ensure that this is not racy!
154 con.session.ExpiresAt = time.Now().Add(maxTokenValid).Unix()
155 cp.conns[newToken] = con
156 resCh <- result{newToken: newToken}
157 }
158 }
159
160 r := <-resCh
161 return r.newToken, r.err
162 }
163
164 func (cp *ConnectionPool) trim(conn *Connection) {
165
166 conn.refCount--
167
168 for {
169 least := time.Now()
170 var count int
171 var oldest *Connection
172
173 for _, con := range cp.conns {
174 if con.db != nil && con.refCount <= 0 {
175 if last := con.last(); last.Before(least) {
176 least = last
177 oldest = con
178 }
179 count++
180 }
181 }
182 if count <= maxOpen {
183 break
184 }
185 oldest.close()
186 }
187 }
188
189 func (cp *ConnectionPool) Do(token string, fn func(*sql.DB) error) error {
190
191 type result struct {
192 con *Connection
193 err error
194 }
195
196 res := make(chan result)
197
198 cp.cmds <- func(cp *ConnectionPool) {
199 con := cp.conns[token]
200 if con == nil {
201 res <- result{err: ErrNoSuchToken}
202 return
203 }
204 con.touch()
205 if con.db != nil {
206 con.refCount++
207 res <- result{con: con}
208 return
209 }
210
211 session := con.session
212 db, err := opendb(session.User, session.Password)
213 if err != nil {
214 res <- result{err: err}
215 return
216 }
217 con.db = db
218 con.refCount++
219 res <- result{con: con}
220 }
221
222 r := <-res
223
224 if r.err != nil {
225 return r.err
226 }
227
228 defer func() {
229 cp.cmds <- func(cp *ConnectionPool) {
230 cp.trim(r.con)
231 }
232 }()
233
234 return fn(r.con.db)
235 }
236
237 func (cp *ConnectionPool) Session(token string) *Session {
238 res := make(chan *Session)
239 cp.cmds <- func(cp *ConnectionPool) {
240 con := cp.conns[token]
241 if con == nil {
242 res <- nil
243 } else {
244 con.touch()
245 res <- con.session
246 }
247 }
248 return <-res
249 }