comparison pkg/imports/fa.go @ 2198:4db1fa4f049c

Fairway availabilty import: Fixed row query leak.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 12 Feb 2019 14:55:09 +0100
parents e57ba9585aaa
children 64147a137e0a
comparison
equal deleted inserted replaced
2197:3203c53a8d29 2198:4db1fa4f049c
182 type bottleneckCountry struct { 182 type bottleneckCountry struct {
183 ID string 183 ID string
184 ResponsibleCountry string 184 ResponsibleCountry string
185 } 185 }
186 186
187 func loadBottleneckCountries(ctx context.Context, conn *sql.Conn) ([]bottleneckCountry, error) {
188
189 // Get available bottlenecks from database for use as filter in SOAP request
190 rows, err := conn.QueryContext(ctx, listBottlenecksSQL)
191 if err != nil {
192 return nil, err
193 }
194 defer rows.Close()
195
196 var bottlenecks []bottleneckCountry
197
198 for rows.Next() {
199 var bn bottleneckCountry
200 if err = rows.Scan(
201 &bn.ID,
202 &bn.ResponsibleCountry,
203 ); err != nil {
204 return nil, err
205 }
206 bottlenecks = append(bottlenecks, bn)
207 }
208 if err = rows.Err(); err != nil {
209 return nil, err
210 }
211 return bottlenecks, nil
212 }
213
214 func loadFairwayAvailabilities(ctx context.Context, conn *sql.Conn) (map[uniqueFairwayAvailability]int64, error) {
215 rows, err := conn.QueryContext(ctx, listFairwayAvailabilitySQL)
216 if err != nil {
217 return nil, err
218 }
219 defer rows.Close()
220 fairwayAvailabilities := map[uniqueFairwayAvailability]int64{}
221 for rows.Next() {
222 var id int64
223 var bnId string
224 var sd time.Time
225 if err = rows.Scan(
226 &id,
227 &bnId,
228 &sd,
229 ); err != nil {
230 return nil, err
231 }
232 key := uniqueFairwayAvailability{
233 BottleneckId: bnId,
234 Surdat: sd,
235 }
236 fairwayAvailabilities[key] = id
237 }
238 if err = rows.Err(); err != nil {
239 return nil, err
240 }
241 return fairwayAvailabilities, nil
242 }
243
244 func latestDate(ctx context.Context, conn *sql.Conn) (pgtype.Timestamp, error) {
245 var date pgtype.Timestamp
246 err := conn.QueryRowContext(ctx, latestMeasureDateSQL).Scan(&date)
247 switch {
248 case err == sql.ErrNoRows:
249 date = pgtype.Timestamp{
250 // Fill Database with data of the last 5 days. Change this to a more useful value.
251 Time: time.Now().AddDate(0, 0, -5),
252 }
253 case err != nil:
254 return pgtype.Timestamp{}, err
255 }
256 return date, nil
257 }
258
187 // Do executes the actual fairway availability import. 259 // Do executes the actual fairway availability import.
188 func (fa *FairwayAvailability) Do( 260 func (fa *FairwayAvailability) Do(
189 ctx context.Context, 261 ctx context.Context,
190 importID int64, 262 importID int64,
191 conn *sql.Conn, 263 conn *sql.Conn,
192 feedback Feedback, 264 feedback Feedback,
193 ) (interface{}, error) { 265 ) (interface{}, error) {
194 266
195 // Get available bottlenecks from database for use as filter in SOAP request 267 bottlenecks, err := loadBottleneckCountries(ctx, conn)
196 var rows *sql.Rows 268 if err != nil {
197 269 return nil, err
198 rows, err := conn.QueryContext(ctx, listBottlenecksSQL) 270 }
199 if err != nil { 271
200 return nil, err 272 fairwayAvailabilities, err := loadFairwayAvailabilities(ctx, conn)
201 } 273 if err != nil {
202 defer rows.Close() 274 return nil, err
203 275 }
204 bottlenecks := []bottleneckCountry{} 276
205 277 latest, err := latestDate(ctx, conn)
206 for rows.Next() { 278 if err != nil {
207 var bn bottleneckCountry 279 return nil, err
208 if err = rows.Scan( 280 }
209 &bn.ID, 281
210 &bn.ResponsibleCountry, 282 faids, err := fa.doForFAs(ctx, bottlenecks, fairwayAvailabilities, latest, conn, feedback)
211 ); err != nil {
212 return nil, err
213 }
214 bottlenecks = append(bottlenecks, bn)
215 }
216 if err = rows.Err(); err != nil {
217 return nil, err
218 }
219
220 var faRows *sql.Rows
221 faRows, err = conn.QueryContext(ctx, listFairwayAvailabilitySQL)
222 if err != nil {
223 return nil, err
224 }
225 fairwayAvailabilities := map[uniqueFairwayAvailability]int64{}
226 for faRows.Next() {
227 var id int64
228 var bnId string
229 var sd time.Time
230 if err = faRows.Scan(
231 &id,
232 &bnId,
233 &sd,
234 ); err != nil {
235 return nil, err
236 }
237 key := uniqueFairwayAvailability{
238 BottleneckId: bnId,
239 Surdat: sd,
240 }
241 fairwayAvailabilities[key] = id
242 }
243 if err = faRows.Err(); err != nil {
244 return nil, err
245 }
246
247 var latestDate pgtype.Timestamp
248 err = conn.QueryRowContext(ctx, latestMeasureDateSQL).Scan(&latestDate)
249 switch {
250 case err == sql.ErrNoRows:
251 latestDate = pgtype.Timestamp{
252 // Fill Database with data of the last 5 days. Change this to a more useful value.
253 Time: time.Now().AddDate(0, 0, -5),
254 }
255 case err != nil:
256 return nil, err
257 }
258
259 faids, err := fa.doForFAs(ctx, bottlenecks, fairwayAvailabilities, latestDate, conn, feedback)
260 if err != nil { 283 if err != nil {
261 feedback.Error("Error processing data: %s", err) 284 feedback.Error("Error processing data: %s", err)
262 } 285 }
263 if len(faids) == 0 { 286 if len(faids) == 0 {
264 feedback.Info("No new fairway availablity data found") 287 feedback.Info("No new fairway availablity data found")
303 Bottleneck_id: &ids, 326 Bottleneck_id: &ids,
304 Period: &period, 327 Period: &period,
305 } 328 }
306 resp, err := client.Get_bottleneck_fa(req) 329 resp, err := client.Get_bottleneck_fa(req)
307 if err != nil { 330 if err != nil {
308 feedback.Error("%v", err)
309 return nil, err 331 return nil, err
310 } 332 }
311 333
312 if resp.Get_bottleneck_faResult == nil { 334 if resp.Get_bottleneck_faResult == nil {
313 err := errors.New("no fairway availabilities found") 335 return nil, errors.New("no fairway availabilities found")
314 return nil, err
315 } 336 }
316 337
317 result := resp.Get_bottleneck_faResult 338 result := resp.Get_bottleneck_faResult
318 339
319 tx, err := conn.BeginTx(ctx, nil) 340 tx, err := conn.BeginTx(ctx, nil)