Mercurial > gemma
comparison pkg/imports/agm.go @ 4036:e45442db19b1 faster-agm
First stab to make AGM imports faster by avoiding unnecessary inserts. Also delete un-updated measures in time ranges.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 24 Jul 2019 11:46:41 +0200 |
parents | f2d5bf42ed38 |
children | a18bf6bc7e3c |
comparison
equal
deleted
inserted
replaced
4035:f2d5bf42ed38 | 4036:e45442db19b1 |
---|---|
19 "bufio" | 19 "bufio" |
20 "context" | 20 "context" |
21 "database/sql" | 21 "database/sql" |
22 "encoding/csv" | 22 "encoding/csv" |
23 "encoding/json" | 23 "encoding/json" |
24 "errors" | |
25 "fmt" | 24 "fmt" |
26 "io" | 25 "io" |
27 "math" | 26 "math" |
28 "os" | 27 "os" |
29 "path/filepath" | 28 "path/filepath" |
63 {"gauges"}, | 62 {"gauges"}, |
64 } | 63 } |
65 } | 64 } |
66 | 65 |
67 const ( | 66 const ( |
68 // delete the old and keep the new measures. | |
69 agmStageDoneDeleteSQL = ` | 67 agmStageDoneDeleteSQL = ` |
70 WITH staged AS ( | 68 DELETE FROM waterway.gauge_measurements WHERE id IN ( |
71 SELECT key | 69 SELECT key |
72 FROM import.track_imports | 70 FROM import.track_imports |
73 WHERE import_id = $1 AND | 71 WHERE import_id = $1 AND |
74 relation = 'waterway.gauge_measurements'::regclass | 72 relation = 'waterway.gauge_measurements'::regclass AND |
75 ), | 73 deletion |
76 to_delete AS ( | 74 )` |
77 SELECT o.id AS id | |
78 FROM waterway.gauge_measurements o | |
79 JOIN waterway.gauge_measurements n | |
80 USING (location, measure_date) | |
81 WHERE n.id IN (SELECT key FROM staged) | |
82 AND o.id NOT IN (SELECT key FROM staged) | |
83 ) | |
84 DELETE FROM waterway.gauge_measurements WHERE id IN (SELECT id from to_delete)` | |
85 | 75 |
86 agmStageDoneSQL = ` | 76 agmStageDoneSQL = ` |
87 UPDATE waterway.gauge_measurements SET staging_done = true | 77 UPDATE waterway.gauge_measurements SET staging_done = true |
88 WHERE id IN ( | 78 WHERE id IN ( |
89 SELECT key FROM import.track_imports | 79 SELECT key |
80 FROM import.track_imports | |
90 WHERE import_id = $1 AND | 81 WHERE import_id = $1 AND |
91 relation = 'waterway.gauge_measurements'::regclass)` | 82 relation = 'waterway.gauge_measurements'::regclass AND |
83 NOT deletion | |
84 )` | |
92 ) | 85 ) |
93 | 86 |
94 func (agmJobCreator) StageDone( | 87 func (agmJobCreator) StageDone( |
95 ctx context.Context, | 88 ctx context.Context, |
96 tx *sql.Tx, | 89 tx *sql.Tx, |
119 func (ttz *timetz) MarshalJSON() ([]byte, error) { | 112 func (ttz *timetz) MarshalJSON() ([]byte, error) { |
120 return json.Marshal(ttz.Time.Format("2006-01-02T15:04:05-07:00")) | 113 return json.Marshal(ttz.Time.Format("2006-01-02T15:04:05-07:00")) |
121 } | 114 } |
122 | 115 |
123 type agmLine struct { | 116 type agmLine struct { |
117 id int64 | |
124 Location models.Isrs `json:"fk-gauge-id"` | 118 Location models.Isrs `json:"fk-gauge-id"` |
125 CountryCode string `json:"country-code"` | 119 CountryCode string `json:"country-code"` |
126 Sender string `json:"sender"` | 120 Sender string `json:"sender"` |
127 LanguageCode string `json:"language-code"` | 121 LanguageCode string `json:"language-code"` |
128 DateIssue timetz `json:"date-issue"` | 122 DateIssue timetz `json:"date-issue"` |
150 } | 144 } |
151 | 145 |
152 const ( | 146 const ( |
153 agmSelectSQL = ` | 147 agmSelectSQL = ` |
154 SELECT | 148 SELECT |
149 id, | |
155 country_code, | 150 country_code, |
156 sender, | 151 sender, |
157 language_code, | 152 language_code, |
158 date_issue, | 153 date_issue, |
159 reference_code, | 154 reference_code, |
209 SELECT 1 FROM waterway.gauges | 204 SELECT 1 FROM waterway.gauges |
210 WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)) | 205 WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)) |
211 ` | 206 ` |
212 ) | 207 ) |
213 | 208 |
214 var errContinue = errors.New("continue") | |
215 | |
216 func parseAGMHeaders(headers []string, fkGaugeIDIdx, measureDateIdx, valueIdx *int) error { | 209 func parseAGMHeaders(headers []string, fkGaugeIDIdx, measureDateIdx, valueIdx *int) error { |
217 | 210 |
218 headerFields := []struct { | 211 headerFields := []struct { |
219 idx *int | 212 idx *int |
220 name string | 213 name string |
308 if err != nil { | 301 if err != nil { |
309 return nil, err | 302 return nil, err |
310 } | 303 } |
311 defer insertStmt.Close() | 304 defer insertStmt.Close() |
312 | 305 |
313 trackStmt, err := conn.PrepareContext(ctx, trackImportSQL) | 306 trackStmt, err := conn.PrepareContext(ctx, trackImportDeletionSQL) |
314 if err != nil { | 307 if err != nil { |
315 return nil, err | 308 return nil, err |
316 } | 309 } |
317 defer trackStmt.Close() | 310 defer trackStmt.Close() |
318 | 311 |
429 if err != nil { | 422 if err != nil { |
430 return nil, err | 423 return nil, err |
431 } | 424 } |
432 } | 425 } |
433 | 426 |
427 tx, err := conn.BeginTx(ctx, nil) | |
428 if err != nil { | |
429 return nil, err | |
430 } | |
431 defer tx.Rollback() | |
432 | |
433 txInsertStmt := tx.StmtContext(ctx, insertStmt) | |
434 txTrackStmt := tx.StmtContext(ctx, trackStmt) | |
435 | |
434 agmLines: | 436 agmLines: |
435 for _, line := range agmLines { | 437 for _, line := range agmLines { |
436 | 438 |
437 switch err := func() error { | 439 var ase *agmSummaryEntry |
438 tx, err := conn.BeginTx(ctx, nil) | 440 |
439 if err != nil { | 441 if old := oldGMLines[line.Location]; old != nil { |
440 return err | 442 ut := line.MeasureDate.Unix() |
441 } | 443 if o, ok := old[ut]; ok { |
442 defer tx.Rollback() | 444 if !o.hasDiff(line) { // identical |
443 | 445 // don't delete |
444 var newID int64 | 446 delete(old, ut) |
445 | 447 continue agmLines |
446 if err := tx.StmtContext(ctx, insertStmt).QueryRowContext( | 448 } |
447 ctx, | 449 ase = &agmSummaryEntry{ |
448 line.Location.CountryCode, | 450 FKGaugeID: line.Location, |
449 line.Location.LoCode, | 451 MeasureDate: line.MeasureDate, |
450 line.Location.FairwaySection, | 452 Versions: []*agmLine{o, line}, |
451 line.Location.Orc, | 453 } |
452 line.Location.Hectometre, | 454 } |
453 line.MeasureDate.Time, | 455 } |
454 line.CountryCode, | 456 if ase == nil { |
455 line.Sender, | 457 ase = &agmSummaryEntry{ |
456 line.LanguageCode, | 458 FKGaugeID: line.Location, |
457 line.DateIssue.Time, | 459 MeasureDate: line.MeasureDate, |
458 line.ReferenceCode, | 460 Versions: []*agmLine{line}, |
459 line.WaterLevel, | 461 } |
460 line.DateInfo.Time, | 462 } |
461 line.SourceOrganization, | 463 |
462 ).Scan(&newID); err != nil { | 464 var newID int64 |
463 warn(handleError(err).Error()) | 465 |
464 ignored++ | 466 if err := txInsertStmt.QueryRowContext( |
465 return errContinue | 467 ctx, |
466 } | 468 line.Location.CountryCode, |
467 | 469 line.Location.LoCode, |
468 if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( | 470 line.Location.FairwaySection, |
469 ctx, importID, "waterway.gauge_measurements", newID, | 471 line.Location.Orc, |
472 line.Location.Hectometre, | |
473 line.MeasureDate.Time, | |
474 line.CountryCode, | |
475 line.Sender, | |
476 line.LanguageCode, | |
477 line.DateIssue.Time, | |
478 line.ReferenceCode, | |
479 line.WaterLevel, | |
480 line.DateInfo.Time, | |
481 line.SourceOrganization, | |
482 ).Scan(&newID); err != nil { | |
483 return nil, err | |
484 } | |
485 | |
486 if _, err := txTrackStmt.ExecContext( | |
487 ctx, importID, "waterway.gauge_measurements", | |
488 newID, | |
489 false, | |
490 ); err != nil { | |
491 return nil, err | |
492 } | |
493 | |
494 entries = append(entries, ase) | |
495 } | |
496 | |
497 // Issue deletes | |
498 for _, old := range oldGMLines { | |
499 for _, line := range old { | |
500 if _, err := txTrackStmt.ExecContext( | |
501 ctx, importID, "waterway.gauge_measurements", | |
502 line.id, | |
503 true, | |
470 ); err != nil { | 504 ); err != nil { |
471 return err | 505 return nil, err |
472 } | 506 } |
473 | 507 } |
474 if err = tx.Commit(); err != nil { | 508 } |
475 err = fmt.Errorf("Commit failed: %v", err) | 509 |
476 } | 510 if err = tx.Commit(); err != nil { |
477 return err | 511 return nil, fmt.Errorf("Commit failed: %v", err) |
478 }(); { | |
479 case err == errContinue: | |
480 continue agmLines | |
481 case err != nil: | |
482 return nil, err | |
483 } | |
484 | |
485 ase := &agmSummaryEntry{ | |
486 FKGaugeID: line.Location, | |
487 MeasureDate: line.MeasureDate, | |
488 } | |
489 | |
490 if o, hasOld := oldGMLines[line.Location][line.MeasureDate.Time.Unix()]; !hasOld { | |
491 ase.Versions = []*agmLine{line} | |
492 } else { | |
493 // Ignore if there is no diff. | |
494 if !line.hasDiff(o) { | |
495 continue | |
496 } | |
497 ase.Versions = []*agmLine{o, line} | |
498 } | |
499 entries = append(entries, ase) | |
500 } | 512 } |
501 | 513 |
502 feedback.Info("Imported %d entries with changes", len(entries)) | 514 feedback.Info("Imported %d entries with changes", len(entries)) |
503 feedback.Info("Importing approved gauge measurements took %s", | 515 feedback.Info("Importing approved gauge measurements took %s", |
504 time.Since(start)) | 516 time.Since(start)) |
512 location models.Isrs, | 524 location models.Isrs, |
513 from time.Time, | 525 from time.Time, |
514 to time.Time, | 526 to time.Time, |
515 ) (map[int64]*agmLine, error) { | 527 ) (map[int64]*agmLine, error) { |
516 var ( | 528 var ( |
529 oldID int64 | |
517 oldCountryCode string | 530 oldCountryCode string |
518 oldSender string | 531 oldSender string |
519 oldLanguageCode string | 532 oldLanguageCode string |
520 oldDateIssue time.Time | 533 oldDateIssue time.Time |
521 oldReferenceCode string | 534 oldReferenceCode string |
540 return nil, err | 553 return nil, err |
541 } | 554 } |
542 defer gms.Close() | 555 defer gms.Close() |
543 for gms.Next() { | 556 for gms.Next() { |
544 if err = gms.Scan( | 557 if err = gms.Scan( |
558 &oldID, | |
545 &oldCountryCode, | 559 &oldCountryCode, |
546 &oldSender, | 560 &oldSender, |
547 &oldLanguageCode, | 561 &oldLanguageCode, |
548 &oldDateIssue, | 562 &oldDateIssue, |
549 &oldReferenceCode, | 563 &oldReferenceCode, |
552 &oldDateInfo, | 566 &oldDateInfo, |
553 &oldSourceOrganization, | 567 &oldSourceOrganization, |
554 ); err != nil { | 568 ); err != nil { |
555 return nil, err | 569 return nil, err |
556 } | 570 } |
557 gmLines[oldMeasureDate.Unix()] = newAGMLine( | 571 line := newAGMLine( |
558 location, | 572 location, |
559 oldCountryCode, | 573 oldCountryCode, |
560 oldSender, | 574 oldSender, |
561 oldLanguageCode, | 575 oldLanguageCode, |
562 oldDateIssue, | 576 oldDateIssue, |
564 oldMeasureDate, | 578 oldMeasureDate, |
565 oldValue, | 579 oldValue, |
566 oldDateInfo, | 580 oldDateInfo, |
567 oldSourceOrganization, | 581 oldSourceOrganization, |
568 ) | 582 ) |
583 line.id = oldID | |
584 gmLines[oldMeasureDate.Unix()] = line | |
569 } | 585 } |
570 if err = gms.Err(); err != nil { | 586 if err = gms.Err(); err != nil { |
571 return nil, err | 587 return nil, err |
572 } | 588 } |
573 return gmLines, nil | 589 return gmLines, nil |