comparison pkg/imports/fa.go @ 2202:0aee7d4954ae

Fairway availabilty import: Re-factored to be the base for the uploaded fairway availabilty import.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 12 Feb 2019 18:32:48 +0100
parents cae0b597aefc
children 8d1a945d0c3b
comparison
equal deleted inserted replaced
2201:cae0b597aefc 2202:0aee7d4954ae
257 return pgtype.Timestamp{}, err 257 return pgtype.Timestamp{}, err
258 } 258 }
259 return date, nil 259 return date, nil
260 } 260 }
261 261
262 // Do executes the actual fairway availability import. 262 func storeFairwayAvailability(
263 func (fa *FairwayAvailability) Do(
264 ctx context.Context, 263 ctx context.Context,
265 importID int64,
266 conn *sql.Conn, 264 conn *sql.Conn,
267 feedback Feedback, 265 feedback Feedback,
266 fetch func(context.Context, *sql.Tx, bottlenecks) ([]*ifaf.FairwayAvailability, error),
268 ) (interface{}, error) { 267 ) (interface{}, error) {
269 268
270 start := time.Now() 269 start := time.Now()
271 270
272 tx, err := conn.BeginTx(ctx, nil) 271 tx, err := conn.BeginTx(ctx, nil)
278 bns, err := loadBottleneckCountries(ctx, tx) 277 bns, err := loadBottleneckCountries(ctx, tx)
279 if err != nil { 278 if err != nil {
280 return nil, err 279 return nil, err
281 } 280 }
282 281
282 fas, err := fetch(ctx, tx, bns)
283 if err != nil {
284 return nil, err
285 }
286
283 fairwayAvailabilities, err := loadFairwayAvailabilities(ctx, tx) 287 fairwayAvailabilities, err := loadFairwayAvailabilities(ctx, tx)
284 if err != nil { 288 if err != nil {
285 return nil, err 289 return nil, err
286 } 290 }
287 291
288 latest, err := latestDate(ctx, tx) 292 faids, err := doForFAs(ctx, bns, fairwayAvailabilities, fas, tx, feedback)
289 if err != nil {
290 return nil, err
291 }
292
293 faids, err := fa.doForFAs(ctx, bns, fairwayAvailabilities, latest, tx, feedback)
294 if err != nil { 293 if err != nil {
295 return nil, fmt.Errorf("Error processing data: %v", err) 294 return nil, fmt.Errorf("Error processing data: %v", err)
296 } 295 }
297 if len(faids) == 0 { 296 if len(faids) == 0 {
298 feedback.Info("No new fairway availablity data found") 297 feedback.Info("No new fairway availablity data found")
299 return nil, UnchangedError("No new fairway availablity data found") 298 return nil, UnchangedError("No new fairway availablity data found")
300 } 299 }
301 feedback.Info("Processed %d fairway availabilities", len(faids)) 300 feedback.Info("Processed %d fairway availabilities", len(faids))
302
303 feedback.Info("Storing fairway availabilities took %s", time.Since(start))
304 301
305 if err = tx.Commit(); err == nil { 302 if err = tx.Commit(); err == nil {
306 feedback.Info( 303 feedback.Info(
307 "Importing fairway availabilities successfully took %s", time.Since(start)) 304 "Importing fairway availabilities successfully took %s", time.Since(start))
308 } else { 305 } else {
318 FairwayAvailabilities: faids, 315 FairwayAvailabilities: faids,
319 } 316 }
320 return &summary, nil 317 return &summary, nil
321 } 318 }
322 319
323 func (fa *FairwayAvailability) doForFAs( 320 func doForFAs(
324 ctx context.Context, 321 ctx context.Context,
325 bnIds bottlenecks, 322 bnIds bottlenecks,
326 fairwayAvailabilities map[uniqueFairwayAvailability]int64, 323 fairwayAvailabilities map[uniqueFairwayAvailability]int64,
327 latestDate pgtype.Timestamp, 324 fas []*ifaf.FairwayAvailability,
328 tx *sql.Tx, 325 tx *sql.Tx,
329 feedback Feedback, 326 feedback Feedback,
330 ) ([]string, error) { 327 ) ([]string, error) {
331 328
332 client := ifaf.NewFairwayAvailabilityService(fa.URL, fa.Insecure, nil)
333
334 var period ifaf.RequestedPeriod
335 period.Date_start = latestDate.Time
336 period.Date_end = time.Now()
337
338 ids := ifaf.ArrayOfString{
339 String: bnIds,
340 }
341
342 req := &ifaf.Get_bottleneck_fa{
343 Bottleneck_id: &ids,
344 Period: &period,
345 }
346 resp, err := client.Get_bottleneck_fa(req)
347 if err != nil {
348 return nil, err
349 }
350
351 if resp.Get_bottleneck_faResult == nil {
352 return nil, errors.New("no fairway availabilities found")
353 }
354
355 result := resp.Get_bottleneck_faResult
356
357 insertFAStmt, err := tx.PrepareContext(ctx, insertFASQL) 329 insertFAStmt, err := tx.PrepareContext(ctx, insertFASQL)
358 if err != nil { 330 if err != nil {
359 return nil, err 331 return nil, err
360 } 332 }
361 defer insertFAStmt.Close() 333 defer insertFAStmt.Close()
375 } 347 }
376 defer insertFAVStmt.Close() 348 defer insertFAVStmt.Close()
377 349
378 var faIDs []string 350 var faIDs []string
379 var faID int64 351 var faID int64
380 feedback.Info("Found %d fairway availabilities", len(result.FairwayAvailability)) 352 feedback.Info("Found %d fairway availabilities", len(fas))
381 for _, faRes := range result.FairwayAvailability { 353 for _, faRes := range fas {
382 if !bnIds.contains(faRes.Bottleneck_id) { 354 if !bnIds.contains(faRes.Bottleneck_id) {
383 feedback.Warn("Bottleneck %s not found in database.", faRes.Bottleneck_id) 355 feedback.Warn("Bottleneck %s not found in database.", faRes.Bottleneck_id)
384 continue 356 continue
385 } 357 }
386 uniqueFa := uniqueFairwayAvailability{ 358 uniqueFa := uniqueFairwayAvailability{
490 feedback.Info("Add %d Reference Values", rvCount) 462 feedback.Info("Add %d Reference Values", rvCount)
491 } 463 }
492 } 464 }
493 return faIDs, nil 465 return faIDs, nil
494 } 466 }
467
468 // Do executes the actual fairway availability import.
469 func (fa *FairwayAvailability) Do(
470 ctx context.Context,
471 importID int64,
472 conn *sql.Conn,
473 feedback Feedback,
474 ) (interface{}, error) {
475
476 fetch := func(ctx context.Context, tx *sql.Tx, bns bottlenecks) ([]*ifaf.FairwayAvailability, error) {
477
478 latest, err := latestDate(ctx, tx)
479 if err != nil {
480 return nil, err
481 }
482
483 client := ifaf.NewFairwayAvailabilityService(fa.URL, fa.Insecure, nil)
484
485 var period ifaf.RequestedPeriod
486 period.Date_start = latest.Time
487 period.Date_end = time.Now()
488
489 ids := ifaf.ArrayOfString{String: bns}
490
491 req := &ifaf.Get_bottleneck_fa{
492 Bottleneck_id: &ids,
493 Period: &period,
494 }
495 resp, err := client.Get_bottleneck_fa(req)
496 if err != nil {
497 return nil, err
498 }
499
500 if resp.Get_bottleneck_faResult == nil {
501 return nil, errors.New("no fairway availabilities found")
502 }
503
504 result := resp.Get_bottleneck_faResult
505 return result.FairwayAvailability, nil
506 }
507
508 return storeFairwayAvailability(ctx, conn, feedback, fetch)
509 }