Mercurial > gemma
comparison pkg/imports/fa.go @ 2202:0aee7d4954ae
Fairway availabilty import: Re-factored to be the base for the uploaded fairway availabilty import.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Tue, 12 Feb 2019 18:32:48 +0100 |
parents | cae0b597aefc |
children | 8d1a945d0c3b |
comparison
equal
deleted
inserted
replaced
2201:cae0b597aefc | 2202:0aee7d4954ae |
---|---|
257 return pgtype.Timestamp{}, err | 257 return pgtype.Timestamp{}, err |
258 } | 258 } |
259 return date, nil | 259 return date, nil |
260 } | 260 } |
261 | 261 |
262 // Do executes the actual fairway availability import. | 262 func storeFairwayAvailability( |
263 func (fa *FairwayAvailability) Do( | |
264 ctx context.Context, | 263 ctx context.Context, |
265 importID int64, | |
266 conn *sql.Conn, | 264 conn *sql.Conn, |
267 feedback Feedback, | 265 feedback Feedback, |
266 fetch func(context.Context, *sql.Tx, bottlenecks) ([]*ifaf.FairwayAvailability, error), | |
268 ) (interface{}, error) { | 267 ) (interface{}, error) { |
269 | 268 |
270 start := time.Now() | 269 start := time.Now() |
271 | 270 |
272 tx, err := conn.BeginTx(ctx, nil) | 271 tx, err := conn.BeginTx(ctx, nil) |
278 bns, err := loadBottleneckCountries(ctx, tx) | 277 bns, err := loadBottleneckCountries(ctx, tx) |
279 if err != nil { | 278 if err != nil { |
280 return nil, err | 279 return nil, err |
281 } | 280 } |
282 | 281 |
282 fas, err := fetch(ctx, tx, bns) | |
283 if err != nil { | |
284 return nil, err | |
285 } | |
286 | |
283 fairwayAvailabilities, err := loadFairwayAvailabilities(ctx, tx) | 287 fairwayAvailabilities, err := loadFairwayAvailabilities(ctx, tx) |
284 if err != nil { | 288 if err != nil { |
285 return nil, err | 289 return nil, err |
286 } | 290 } |
287 | 291 |
288 latest, err := latestDate(ctx, tx) | 292 faids, err := doForFAs(ctx, bns, fairwayAvailabilities, fas, tx, feedback) |
289 if err != nil { | |
290 return nil, err | |
291 } | |
292 | |
293 faids, err := fa.doForFAs(ctx, bns, fairwayAvailabilities, latest, tx, feedback) | |
294 if err != nil { | 293 if err != nil { |
295 return nil, fmt.Errorf("Error processing data: %v", err) | 294 return nil, fmt.Errorf("Error processing data: %v", err) |
296 } | 295 } |
297 if len(faids) == 0 { | 296 if len(faids) == 0 { |
298 feedback.Info("No new fairway availablity data found") | 297 feedback.Info("No new fairway availablity data found") |
299 return nil, UnchangedError("No new fairway availablity data found") | 298 return nil, UnchangedError("No new fairway availablity data found") |
300 } | 299 } |
301 feedback.Info("Processed %d fairway availabilities", len(faids)) | 300 feedback.Info("Processed %d fairway availabilities", len(faids)) |
302 | |
303 feedback.Info("Storing fairway availabilities took %s", time.Since(start)) | |
304 | 301 |
305 if err = tx.Commit(); err == nil { | 302 if err = tx.Commit(); err == nil { |
306 feedback.Info( | 303 feedback.Info( |
307 "Importing fairway availabilities successfully took %s", time.Since(start)) | 304 "Importing fairway availabilities successfully took %s", time.Since(start)) |
308 } else { | 305 } else { |
318 FairwayAvailabilities: faids, | 315 FairwayAvailabilities: faids, |
319 } | 316 } |
320 return &summary, nil | 317 return &summary, nil |
321 } | 318 } |
322 | 319 |
323 func (fa *FairwayAvailability) doForFAs( | 320 func doForFAs( |
324 ctx context.Context, | 321 ctx context.Context, |
325 bnIds bottlenecks, | 322 bnIds bottlenecks, |
326 fairwayAvailabilities map[uniqueFairwayAvailability]int64, | 323 fairwayAvailabilities map[uniqueFairwayAvailability]int64, |
327 latestDate pgtype.Timestamp, | 324 fas []*ifaf.FairwayAvailability, |
328 tx *sql.Tx, | 325 tx *sql.Tx, |
329 feedback Feedback, | 326 feedback Feedback, |
330 ) ([]string, error) { | 327 ) ([]string, error) { |
331 | 328 |
332 client := ifaf.NewFairwayAvailabilityService(fa.URL, fa.Insecure, nil) | |
333 | |
334 var period ifaf.RequestedPeriod | |
335 period.Date_start = latestDate.Time | |
336 period.Date_end = time.Now() | |
337 | |
338 ids := ifaf.ArrayOfString{ | |
339 String: bnIds, | |
340 } | |
341 | |
342 req := &ifaf.Get_bottleneck_fa{ | |
343 Bottleneck_id: &ids, | |
344 Period: &period, | |
345 } | |
346 resp, err := client.Get_bottleneck_fa(req) | |
347 if err != nil { | |
348 return nil, err | |
349 } | |
350 | |
351 if resp.Get_bottleneck_faResult == nil { | |
352 return nil, errors.New("no fairway availabilities found") | |
353 } | |
354 | |
355 result := resp.Get_bottleneck_faResult | |
356 | |
357 insertFAStmt, err := tx.PrepareContext(ctx, insertFASQL) | 329 insertFAStmt, err := tx.PrepareContext(ctx, insertFASQL) |
358 if err != nil { | 330 if err != nil { |
359 return nil, err | 331 return nil, err |
360 } | 332 } |
361 defer insertFAStmt.Close() | 333 defer insertFAStmt.Close() |
375 } | 347 } |
376 defer insertFAVStmt.Close() | 348 defer insertFAVStmt.Close() |
377 | 349 |
378 var faIDs []string | 350 var faIDs []string |
379 var faID int64 | 351 var faID int64 |
380 feedback.Info("Found %d fairway availabilities", len(result.FairwayAvailability)) | 352 feedback.Info("Found %d fairway availabilities", len(fas)) |
381 for _, faRes := range result.FairwayAvailability { | 353 for _, faRes := range fas { |
382 if !bnIds.contains(faRes.Bottleneck_id) { | 354 if !bnIds.contains(faRes.Bottleneck_id) { |
383 feedback.Warn("Bottleneck %s not found in database.", faRes.Bottleneck_id) | 355 feedback.Warn("Bottleneck %s not found in database.", faRes.Bottleneck_id) |
384 continue | 356 continue |
385 } | 357 } |
386 uniqueFa := uniqueFairwayAvailability{ | 358 uniqueFa := uniqueFairwayAvailability{ |
490 feedback.Info("Add %d Reference Values", rvCount) | 462 feedback.Info("Add %d Reference Values", rvCount) |
491 } | 463 } |
492 } | 464 } |
493 return faIDs, nil | 465 return faIDs, nil |
494 } | 466 } |
467 | |
468 // Do executes the actual fairway availability import. | |
469 func (fa *FairwayAvailability) Do( | |
470 ctx context.Context, | |
471 importID int64, | |
472 conn *sql.Conn, | |
473 feedback Feedback, | |
474 ) (interface{}, error) { | |
475 | |
476 fetch := func(ctx context.Context, tx *sql.Tx, bns bottlenecks) ([]*ifaf.FairwayAvailability, error) { | |
477 | |
478 latest, err := latestDate(ctx, tx) | |
479 if err != nil { | |
480 return nil, err | |
481 } | |
482 | |
483 client := ifaf.NewFairwayAvailabilityService(fa.URL, fa.Insecure, nil) | |
484 | |
485 var period ifaf.RequestedPeriod | |
486 period.Date_start = latest.Time | |
487 period.Date_end = time.Now() | |
488 | |
489 ids := ifaf.ArrayOfString{String: bns} | |
490 | |
491 req := &ifaf.Get_bottleneck_fa{ | |
492 Bottleneck_id: &ids, | |
493 Period: &period, | |
494 } | |
495 resp, err := client.Get_bottleneck_fa(req) | |
496 if err != nil { | |
497 return nil, err | |
498 } | |
499 | |
500 if resp.Get_bottleneck_faResult == nil { | |
501 return nil, errors.New("no fairway availabilities found") | |
502 } | |
503 | |
504 result := resp.Get_bottleneck_faResult | |
505 return result.FairwayAvailability, nil | |
506 } | |
507 | |
508 return storeFairwayAvailability(ctx, conn, feedback, fetch) | |
509 } |