comparison pkg/imports/agm.go @ 4036:e45442db19b1 faster-agm

First stab to make AGM imports faster by avoiding unnecessary inserts. Also delete un-updated measures in time ranges.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 24 Jul 2019 11:46:41 +0200
parents f2d5bf42ed38
children a18bf6bc7e3c
comparison
equal deleted inserted replaced
4035:f2d5bf42ed38 4036:e45442db19b1
19 "bufio" 19 "bufio"
20 "context" 20 "context"
21 "database/sql" 21 "database/sql"
22 "encoding/csv" 22 "encoding/csv"
23 "encoding/json" 23 "encoding/json"
24 "errors"
25 "fmt" 24 "fmt"
26 "io" 25 "io"
27 "math" 26 "math"
28 "os" 27 "os"
29 "path/filepath" 28 "path/filepath"
63 {"gauges"}, 62 {"gauges"},
64 } 63 }
65 } 64 }
66 65
67 const ( 66 const (
68 // delete the old and keep the new measures.
69 agmStageDoneDeleteSQL = ` 67 agmStageDoneDeleteSQL = `
70 WITH staged AS ( 68 DELETE FROM waterway.gauge_measurements WHERE id IN (
71 SELECT key 69 SELECT key
72 FROM import.track_imports 70 FROM import.track_imports
73 WHERE import_id = $1 AND 71 WHERE import_id = $1 AND
74 relation = 'waterway.gauge_measurements'::regclass 72 relation = 'waterway.gauge_measurements'::regclass AND
75 ), 73 deletion
76 to_delete AS ( 74 )`
77 SELECT o.id AS id
78 FROM waterway.gauge_measurements o
79 JOIN waterway.gauge_measurements n
80 USING (location, measure_date)
81 WHERE n.id IN (SELECT key FROM staged)
82 AND o.id NOT IN (SELECT key FROM staged)
83 )
84 DELETE FROM waterway.gauge_measurements WHERE id IN (SELECT id from to_delete)`
85 75
86 agmStageDoneSQL = ` 76 agmStageDoneSQL = `
87 UPDATE waterway.gauge_measurements SET staging_done = true 77 UPDATE waterway.gauge_measurements SET staging_done = true
88 WHERE id IN ( 78 WHERE id IN (
89 SELECT key FROM import.track_imports 79 SELECT key
80 FROM import.track_imports
90 WHERE import_id = $1 AND 81 WHERE import_id = $1 AND
91 relation = 'waterway.gauge_measurements'::regclass)` 82 relation = 'waterway.gauge_measurements'::regclass AND
83 NOT deletion
84 )`
92 ) 85 )
93 86
94 func (agmJobCreator) StageDone( 87 func (agmJobCreator) StageDone(
95 ctx context.Context, 88 ctx context.Context,
96 tx *sql.Tx, 89 tx *sql.Tx,
119 func (ttz *timetz) MarshalJSON() ([]byte, error) { 112 func (ttz *timetz) MarshalJSON() ([]byte, error) {
120 return json.Marshal(ttz.Time.Format("2006-01-02T15:04:05-07:00")) 113 return json.Marshal(ttz.Time.Format("2006-01-02T15:04:05-07:00"))
121 } 114 }
122 115
123 type agmLine struct { 116 type agmLine struct {
117 id int64
124 Location models.Isrs `json:"fk-gauge-id"` 118 Location models.Isrs `json:"fk-gauge-id"`
125 CountryCode string `json:"country-code"` 119 CountryCode string `json:"country-code"`
126 Sender string `json:"sender"` 120 Sender string `json:"sender"`
127 LanguageCode string `json:"language-code"` 121 LanguageCode string `json:"language-code"`
128 DateIssue timetz `json:"date-issue"` 122 DateIssue timetz `json:"date-issue"`
150 } 144 }
151 145
152 const ( 146 const (
153 agmSelectSQL = ` 147 agmSelectSQL = `
154 SELECT 148 SELECT
149 id,
155 country_code, 150 country_code,
156 sender, 151 sender,
157 language_code, 152 language_code,
158 date_issue, 153 date_issue,
159 reference_code, 154 reference_code,
209 SELECT 1 FROM waterway.gauges 204 SELECT 1 FROM waterway.gauges
210 WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)) 205 WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int))
211 ` 206 `
212 ) 207 )
213 208
214 var errContinue = errors.New("continue")
215
216 func parseAGMHeaders(headers []string, fkGaugeIDIdx, measureDateIdx, valueIdx *int) error { 209 func parseAGMHeaders(headers []string, fkGaugeIDIdx, measureDateIdx, valueIdx *int) error {
217 210
218 headerFields := []struct { 211 headerFields := []struct {
219 idx *int 212 idx *int
220 name string 213 name string
308 if err != nil { 301 if err != nil {
309 return nil, err 302 return nil, err
310 } 303 }
311 defer insertStmt.Close() 304 defer insertStmt.Close()
312 305
313 trackStmt, err := conn.PrepareContext(ctx, trackImportSQL) 306 trackStmt, err := conn.PrepareContext(ctx, trackImportDeletionSQL)
314 if err != nil { 307 if err != nil {
315 return nil, err 308 return nil, err
316 } 309 }
317 defer trackStmt.Close() 310 defer trackStmt.Close()
318 311
429 if err != nil { 422 if err != nil {
430 return nil, err 423 return nil, err
431 } 424 }
432 } 425 }
433 426
427 tx, err := conn.BeginTx(ctx, nil)
428 if err != nil {
429 return nil, err
430 }
431 defer tx.Rollback()
432
433 txInsertStmt := tx.StmtContext(ctx, insertStmt)
434 txTrackStmt := tx.StmtContext(ctx, trackStmt)
435
434 agmLines: 436 agmLines:
435 for _, line := range agmLines { 437 for _, line := range agmLines {
436 438
437 switch err := func() error { 439 var ase *agmSummaryEntry
438 tx, err := conn.BeginTx(ctx, nil) 440
439 if err != nil { 441 if old := oldGMLines[line.Location]; old != nil {
440 return err 442 ut := line.MeasureDate.Unix()
441 } 443 if o, ok := old[ut]; ok {
442 defer tx.Rollback() 444 if !o.hasDiff(line) { // identical
443 445 // don't delete
444 var newID int64 446 delete(old, ut)
445 447 continue agmLines
446 if err := tx.StmtContext(ctx, insertStmt).QueryRowContext( 448 }
447 ctx, 449 ase = &agmSummaryEntry{
448 line.Location.CountryCode, 450 FKGaugeID: line.Location,
449 line.Location.LoCode, 451 MeasureDate: line.MeasureDate,
450 line.Location.FairwaySection, 452 Versions: []*agmLine{o, line},
451 line.Location.Orc, 453 }
452 line.Location.Hectometre, 454 }
453 line.MeasureDate.Time, 455 }
454 line.CountryCode, 456 if ase == nil {
455 line.Sender, 457 ase = &agmSummaryEntry{
456 line.LanguageCode, 458 FKGaugeID: line.Location,
457 line.DateIssue.Time, 459 MeasureDate: line.MeasureDate,
458 line.ReferenceCode, 460 Versions: []*agmLine{line},
459 line.WaterLevel, 461 }
460 line.DateInfo.Time, 462 }
461 line.SourceOrganization, 463
462 ).Scan(&newID); err != nil { 464 var newID int64
463 warn(handleError(err).Error()) 465
464 ignored++ 466 if err := txInsertStmt.QueryRowContext(
465 return errContinue 467 ctx,
466 } 468 line.Location.CountryCode,
467 469 line.Location.LoCode,
468 if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( 470 line.Location.FairwaySection,
469 ctx, importID, "waterway.gauge_measurements", newID, 471 line.Location.Orc,
472 line.Location.Hectometre,
473 line.MeasureDate.Time,
474 line.CountryCode,
475 line.Sender,
476 line.LanguageCode,
477 line.DateIssue.Time,
478 line.ReferenceCode,
479 line.WaterLevel,
480 line.DateInfo.Time,
481 line.SourceOrganization,
482 ).Scan(&newID); err != nil {
483 return nil, err
484 }
485
486 if _, err := txTrackStmt.ExecContext(
487 ctx, importID, "waterway.gauge_measurements",
488 newID,
489 false,
490 ); err != nil {
491 return nil, err
492 }
493
494 entries = append(entries, ase)
495 }
496
497 // Issue deletes
498 for _, old := range oldGMLines {
499 for _, line := range old {
500 if _, err := txTrackStmt.ExecContext(
501 ctx, importID, "waterway.gauge_measurements",
502 line.id,
503 true,
470 ); err != nil { 504 ); err != nil {
471 return err 505 return nil, err
472 } 506 }
473 507 }
474 if err = tx.Commit(); err != nil { 508 }
475 err = fmt.Errorf("Commit failed: %v", err) 509
476 } 510 if err = tx.Commit(); err != nil {
477 return err 511 return nil, fmt.Errorf("Commit failed: %v", err)
478 }(); {
479 case err == errContinue:
480 continue agmLines
481 case err != nil:
482 return nil, err
483 }
484
485 ase := &agmSummaryEntry{
486 FKGaugeID: line.Location,
487 MeasureDate: line.MeasureDate,
488 }
489
490 if o, hasOld := oldGMLines[line.Location][line.MeasureDate.Time.Unix()]; !hasOld {
491 ase.Versions = []*agmLine{line}
492 } else {
493 // Ignore if there is no diff.
494 if !line.hasDiff(o) {
495 continue
496 }
497 ase.Versions = []*agmLine{o, line}
498 }
499 entries = append(entries, ase)
500 } 512 }
501 513
502 feedback.Info("Imported %d entries with changes", len(entries)) 514 feedback.Info("Imported %d entries with changes", len(entries))
503 feedback.Info("Importing approved gauge measurements took %s", 515 feedback.Info("Importing approved gauge measurements took %s",
504 time.Since(start)) 516 time.Since(start))
512 location models.Isrs, 524 location models.Isrs,
513 from time.Time, 525 from time.Time,
514 to time.Time, 526 to time.Time,
515 ) (map[int64]*agmLine, error) { 527 ) (map[int64]*agmLine, error) {
516 var ( 528 var (
529 oldID int64
517 oldCountryCode string 530 oldCountryCode string
518 oldSender string 531 oldSender string
519 oldLanguageCode string 532 oldLanguageCode string
520 oldDateIssue time.Time 533 oldDateIssue time.Time
521 oldReferenceCode string 534 oldReferenceCode string
540 return nil, err 553 return nil, err
541 } 554 }
542 defer gms.Close() 555 defer gms.Close()
543 for gms.Next() { 556 for gms.Next() {
544 if err = gms.Scan( 557 if err = gms.Scan(
558 &oldID,
545 &oldCountryCode, 559 &oldCountryCode,
546 &oldSender, 560 &oldSender,
547 &oldLanguageCode, 561 &oldLanguageCode,
548 &oldDateIssue, 562 &oldDateIssue,
549 &oldReferenceCode, 563 &oldReferenceCode,
552 &oldDateInfo, 566 &oldDateInfo,
553 &oldSourceOrganization, 567 &oldSourceOrganization,
554 ); err != nil { 568 ); err != nil {
555 return nil, err 569 return nil, err
556 } 570 }
557 gmLines[oldMeasureDate.Unix()] = newAGMLine( 571 line := newAGMLine(
558 location, 572 location,
559 oldCountryCode, 573 oldCountryCode,
560 oldSender, 574 oldSender,
561 oldLanguageCode, 575 oldLanguageCode,
562 oldDateIssue, 576 oldDateIssue,
564 oldMeasureDate, 578 oldMeasureDate,
565 oldValue, 579 oldValue,
566 oldDateInfo, 580 oldDateInfo,
567 oldSourceOrganization, 581 oldSourceOrganization,
568 ) 582 )
583 line.id = oldID
584 gmLines[oldMeasureDate.Unix()] = line
569 } 585 }
570 if err = gms.Err(); err != nil { 586 if err = gms.Err(); err != nil {
571 return nil, err 587 return nil, err
572 } 588 }
573 return gmLines, nil 589 return gmLines, nil