Mercurial > gemma
comparison pkg/imports/agm.go @ 4027:b17453420eff
Avoid doing one SELECT per line in import file
When the import is run by a waterway admin (more row level security
processing involved than as sys_admin), this approximately halves
the overall duration of the import
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Mon, 22 Jul 2019 19:19:00 +0200 |
parents | 82037bbd2c7c |
children | 040a5dc95eb9 |
comparison
equal
deleted
inserted
replaced
4026:82037bbd2c7c | 4027:b17453420eff |
---|---|
150 } | 150 } |
151 | 151 |
152 const ( | 152 const ( |
153 agmSelectSQL = ` | 153 agmSelectSQL = ` |
154 SELECT | 154 SELECT |
155 id, | |
156 country_code, | 155 country_code, |
157 sender, | 156 sender, |
158 language_code, | 157 language_code, |
159 date_issue, | 158 date_issue, |
160 reference_code, | 159 reference_code, |
160 measure_date, | |
161 water_level, | 161 water_level, |
162 date_info, | 162 date_info, |
163 source_organization | 163 source_organization |
164 FROM waterway.gauge_measurements | 164 FROM waterway.gauge_measurements |
165 WHERE | 165 WHERE |
166 location | 166 location |
167 = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) | 167 = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) |
168 AND measure_date = $6 | 168 AND measure_date BETWEEN $6 AND $7 |
169 AND staging_done | 169 AND staging_done |
170 ` | 170 ` |
171 | 171 |
172 agmInsertSQL = ` | 172 agmInsertSQL = ` |
173 INSERT INTO waterway.gauge_measurements ( | 173 INSERT INTO waterway.gauge_measurements ( |
312 warn := warnLimiter.Warn | 312 warn := warnLimiter.Warn |
313 defer warnLimiter.Close() | 313 defer warnLimiter.Close() |
314 | 314 |
315 agmLines := []*agmLine{} | 315 agmLines := []*agmLine{} |
316 ignored := 0 | 316 ignored := 0 |
317 mdMinMax := map[models.Isrs][2]time.Time{} | |
317 | 318 |
318 lines: | 319 lines: |
319 for line := 1; ; line++ { | 320 for line := 1; ; line++ { |
320 | 321 |
321 row, err := r.Read() | 322 row, err := r.Read() |
365 } | 366 } |
366 | 367 |
367 md, err := guessDate(row[measureDateIdx]) | 368 md, err := guessDate(row[measureDateIdx]) |
368 if err != nil { | 369 if err != nil { |
369 return nil, fmt.Errorf("Invalid 'measure_date' line %d: %v", line, err) | 370 return nil, fmt.Errorf("Invalid 'measure_date' line %d: %v", line, err) |
371 } | |
372 if _, hasGid := mdMinMax[*gid]; hasGid { | |
373 if md.Before(mdMinMax[*gid][0]) { | |
374 mdMinMax[*gid] = [2]time.Time{md, mdMinMax[*gid][1]} | |
375 } | |
376 if md.After(mdMinMax[*gid][1]) { | |
377 mdMinMax[*gid] = [2]time.Time{mdMinMax[*gid][0], md} | |
378 } | |
379 } else { | |
380 mdMinMax[*gid] = [2]time.Time{md, md} | |
370 } | 381 } |
371 | 382 |
372 newSender := agm.Originator | 383 newSender := agm.Originator |
373 newCountryCode := gid.CountryCode | 384 newCountryCode := gid.CountryCode |
374 newLanguageCode := misc.CCtoLang[gid.CountryCode] | 385 newLanguageCode := misc.CCtoLang[gid.CountryCode] |
397 newDateInfo, | 408 newDateInfo, |
398 newSourceOrganization, | 409 newSourceOrganization, |
399 )) | 410 )) |
400 } | 411 } |
401 | 412 |
413 oldGMLines := map[models.Isrs]map[int64]*agmLine{} | |
414 for gid, minMax := range mdMinMax { | |
415 oldGMLines[gid], err = getOldGMLines( | |
416 ctx, selectStmt, gid, minMax[0], minMax[1]) | |
417 if err != nil { | |
418 return nil, err | |
419 } | |
420 } | |
421 | |
402 agmLines: | 422 agmLines: |
403 for _, line := range agmLines { | 423 for _, line := range agmLines { |
404 var ( | |
405 oldID int64 | |
406 oldCountryCode string | |
407 oldSender string | |
408 oldLanguageCode string | |
409 oldDateIssue time.Time | |
410 oldReferenceCode string | |
411 oldValue float64 | |
412 oldDateInfo time.Time | |
413 oldSourceOrganization string | |
414 ) | |
415 | |
416 err = selectStmt.QueryRowContext( | |
417 ctx, | |
418 line.Location.CountryCode, | |
419 line.Location.LoCode, | |
420 line.Location.FairwaySection, | |
421 line.Location.Orc, | |
422 line.Location.Hectometre, | |
423 line.MeasureDate.Time, | |
424 ).Scan( | |
425 &oldID, | |
426 &oldCountryCode, | |
427 &oldSender, | |
428 &oldLanguageCode, | |
429 &oldDateIssue, | |
430 &oldReferenceCode, | |
431 &oldValue, | |
432 &oldDateInfo, | |
433 &oldSourceOrganization, | |
434 ) | |
435 | |
436 var newEntry bool | |
437 switch { | |
438 case err == sql.ErrNoRows: | |
439 // Complete new one | |
440 newEntry = true | |
441 case err != nil: | |
442 return nil, err | |
443 } | |
444 | 424 |
445 switch err := func() error { | 425 switch err := func() error { |
446 tx, err := conn.BeginTx(ctx, nil) | 426 tx, err := conn.BeginTx(ctx, nil) |
447 if err != nil { | 427 if err != nil { |
448 return err | 428 return err |
493 ase := &agmSummaryEntry{ | 473 ase := &agmSummaryEntry{ |
494 FKGaugeID: line.Location, | 474 FKGaugeID: line.Location, |
495 MeasureDate: line.MeasureDate, | 475 MeasureDate: line.MeasureDate, |
496 } | 476 } |
497 | 477 |
498 if newEntry { | 478 if o, hasOld := oldGMLines[line.Location][line.MeasureDate.Time.Unix()]; !hasOld { |
499 ase.Versions = []*agmLine{line} | 479 ase.Versions = []*agmLine{line} |
500 } else { | 480 } else { |
501 o := newAGMLine( | |
502 line.Location, | |
503 oldCountryCode, | |
504 oldSender, | |
505 oldLanguageCode, | |
506 oldDateIssue, | |
507 oldReferenceCode, | |
508 line.MeasureDate.Time, | |
509 oldValue, | |
510 oldDateInfo, | |
511 oldSourceOrganization, | |
512 ) | |
513 // Ignore if there is no diff. | 481 // Ignore if there is no diff. |
514 if !line.hasDiff(o) { | 482 if !line.hasDiff(o) { |
515 continue | 483 continue |
516 } | 484 } |
517 ase.Versions = []*agmLine{o, line} | 485 ase.Versions = []*agmLine{o, line} |
525 feedback.Info("Imported %d entries with changes", len(entries)) | 493 feedback.Info("Imported %d entries with changes", len(entries)) |
526 feedback.Info("Importing approved gauge measurements took %s", | 494 feedback.Info("Importing approved gauge measurements took %s", |
527 time.Since(start)) | 495 time.Since(start)) |
528 | 496 |
529 return entries, nil | 497 return entries, nil |
498 } | |
499 | |
500 func getOldGMLines( | |
501 ctx context.Context, | |
502 stmt *sql.Stmt, | |
503 location models.Isrs, | |
504 from time.Time, | |
505 to time.Time, | |
506 ) (map[int64]*agmLine, error) { | |
507 var ( | |
508 oldCountryCode string | |
509 oldSender string | |
510 oldLanguageCode string | |
511 oldDateIssue time.Time | |
512 oldReferenceCode string | |
513 oldMeasureDate time.Time | |
514 oldValue float64 | |
515 oldDateInfo time.Time | |
516 oldSourceOrganization string | |
517 ) | |
518 gmLines := map[int64]*agmLine{} | |
519 | |
520 gms, err := stmt.QueryContext( | |
521 ctx, | |
522 location.CountryCode, | |
523 location.LoCode, | |
524 location.FairwaySection, | |
525 location.Orc, | |
526 location.Hectometre, | |
527 from, | |
528 to, | |
529 ) | |
530 if err != nil { | |
531 return nil, err | |
532 } | |
533 defer gms.Close() | |
534 for gms.Next() { | |
535 if err = gms.Scan( | |
536 &oldCountryCode, | |
537 &oldSender, | |
538 &oldLanguageCode, | |
539 &oldDateIssue, | |
540 &oldReferenceCode, | |
541 &oldMeasureDate, | |
542 &oldValue, | |
543 &oldDateInfo, | |
544 &oldSourceOrganization, | |
545 ); err != nil { | |
546 return nil, err | |
547 } | |
548 gmLines[oldMeasureDate.Unix()] = newAGMLine( | |
549 location, | |
550 oldCountryCode, | |
551 oldSender, | |
552 oldLanguageCode, | |
553 oldDateIssue, | |
554 oldReferenceCode, | |
555 oldMeasureDate, | |
556 oldValue, | |
557 oldDateInfo, | |
558 oldSourceOrganization, | |
559 ) | |
560 } | |
561 if err = gms.Err(); err != nil { | |
562 return nil, err | |
563 } | |
564 return gmLines, nil | |
530 } | 565 } |
531 | 566 |
532 func newAGMLine( | 567 func newAGMLine( |
533 location models.Isrs, | 568 location models.Isrs, |
534 countryCode string, | 569 countryCode string, |