# HG changeset patch # User Sascha L. Teichmann # Date 1563977511 -7200 # Node ID edb1d11e14cd91599bbaba3be08684d8cfa22367 # Parent f2d5bf42ed38b9454d4a64d18e0f1b8fbf813598# Parent a18bf6bc7e3c1de79bb356173cb4937d8a93e6f6 Merged faster-agm branch back into default. diff -r f2d5bf42ed38 -r edb1d11e14cd pkg/imports/agm.go --- a/pkg/imports/agm.go Wed Jul 24 10:54:18 2019 +0200 +++ b/pkg/imports/agm.go Wed Jul 24 16:11:51 2019 +0200 @@ -21,7 +21,6 @@ "database/sql" "encoding/csv" "encoding/json" - "errors" "fmt" "io" "math" @@ -65,30 +64,24 @@ } const ( - // delete the old and keep the new measures. agmStageDoneDeleteSQL = ` -WITH staged AS ( +DELETE FROM waterway.gauge_measurements WHERE id IN ( SELECT key FROM import.track_imports WHERE import_id = $1 AND - relation = 'waterway.gauge_measurements'::regclass -), -to_delete AS ( - SELECT o.id AS id - FROM waterway.gauge_measurements o - JOIN waterway.gauge_measurements n - USING (location, measure_date) - WHERE n.id IN (SELECT key FROM staged) - AND o.id NOT IN (SELECT key FROM staged) -) -DELETE FROM waterway.gauge_measurements WHERE id IN (SELECT id from to_delete)` + relation = 'waterway.gauge_measurements'::regclass AND + deletion +)` agmStageDoneSQL = ` UPDATE waterway.gauge_measurements SET staging_done = true WHERE id IN ( - SELECT key FROM import.track_imports + SELECT key + FROM import.track_imports WHERE import_id = $1 AND - relation = 'waterway.gauge_measurements'::regclass)` + relation = 'waterway.gauge_measurements'::regclass AND + NOT deletion +)` ) func (agmJobCreator) StageDone( @@ -121,6 +114,7 @@ } type agmLine struct { + id int64 Location models.Isrs `json:"fk-gauge-id"` CountryCode string `json:"country-code"` Sender string `json:"sender"` @@ -152,6 +146,7 @@ const ( agmSelectSQL = ` SELECT + id, country_code, sender, language_code, @@ -211,8 +206,6 @@ ` ) -var errContinue = errors.New("continue") - func parseAGMHeaders(headers []string, fkGaugeIDIdx, measureDateIdx, valueIdx *int) error { headerFields := []struct { @@ -310,7 +303,7 @@ } defer insertStmt.Close() - trackStmt, err := conn.PrepareContext(ctx, trackImportSQL) + trackStmt, err := conn.PrepareContext(ctx, trackImportDeletionSQL) if err != nil { return nil, err } @@ -431,72 +424,101 @@ } } + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer tx.Rollback() + + txInsertStmt := tx.StmtContext(ctx, insertStmt) + txTrackStmt := tx.StmtContext(ctx, trackStmt) + agmLines: for _, line := range agmLines { - switch err := func() error { - tx, err := conn.BeginTx(ctx, nil) - if err != nil { - return err - } - defer tx.Rollback() - - var newID int64 + var ase *agmSummaryEntry - if err := tx.StmtContext(ctx, insertStmt).QueryRowContext( - ctx, - line.Location.CountryCode, - line.Location.LoCode, - line.Location.FairwaySection, - line.Location.Orc, - line.Location.Hectometre, - line.MeasureDate.Time, - line.CountryCode, - line.Sender, - line.LanguageCode, - line.DateIssue.Time, - line.ReferenceCode, - line.WaterLevel, - line.DateInfo.Time, - line.SourceOrganization, - ).Scan(&newID); err != nil { - warn(handleError(err).Error()) - ignored++ - return errContinue + if old := oldGMLines[line.Location]; old != nil { + ut := line.MeasureDate.Unix() + if o, ok := old[ut]; ok { + if !o.hasDiff(line) { // identical + // don't delete + delete(old, ut) + continue agmLines + } + ase = &agmSummaryEntry{ + FKGaugeID: line.Location, + MeasureDate: line.MeasureDate, + Versions: []*agmLine{o, line}, + } + } + } + if ase == nil { + ase = &agmSummaryEntry{ + FKGaugeID: line.Location, + MeasureDate: line.MeasureDate, + Versions: []*agmLine{line}, } + } - if _, err := tx.StmtContext(ctx, trackStmt).ExecContext( - ctx, importID, "waterway.gauge_measurements", newID, - ); err != nil { - return err - } + var newID int64 - if err = tx.Commit(); err != nil { - err = fmt.Errorf("Commit failed: %v", err) - } - return err - }(); { - case err == errContinue: - continue agmLines - case err != nil: + if err := txInsertStmt.QueryRowContext( + ctx, + line.Location.CountryCode, + line.Location.LoCode, + line.Location.FairwaySection, + line.Location.Orc, + line.Location.Hectometre, + line.MeasureDate.Time, + line.CountryCode, + line.Sender, + line.LanguageCode, + line.DateIssue.Time, + line.ReferenceCode, + line.WaterLevel, + line.DateInfo.Time, + line.SourceOrganization, + ).Scan(&newID); err != nil { return nil, err } - ase := &agmSummaryEntry{ - FKGaugeID: line.Location, - MeasureDate: line.MeasureDate, + if _, err := txTrackStmt.ExecContext( + ctx, importID, "waterway.gauge_measurements", + newID, + false, + ); err != nil { + return nil, err } - if o, hasOld := oldGMLines[line.Location][line.MeasureDate.Time.Unix()]; !hasOld { - ase.Versions = []*agmLine{line} - } else { - // Ignore if there is no diff. - if !line.hasDiff(o) { - continue + entries = append(entries, ase) + } + + var removed int + + // Issue deletes + for _, old := range oldGMLines { + removed += len(old) + for _, line := range old { + if _, err := txTrackStmt.ExecContext( + ctx, importID, "waterway.gauge_measurements", + line.id, + true, + ); err != nil { + return nil, err } - ase.Versions = []*agmLine{o, line} } - entries = append(entries, ase) + } + + feedback.Info("Measurements to update/insert: %d", len(entries)) + feedback.Info("Measurements to delete: %d", removed) + + if len(entries) == 0 && removed == 0 { + return nil, UnchangedError("No changes from AGM import") + } + + if err = tx.Commit(); err != nil { + return nil, fmt.Errorf("Commit failed: %v", err) } feedback.Info("Imported %d entries with changes", len(entries)) @@ -514,6 +536,7 @@ to time.Time, ) (map[int64]*agmLine, error) { var ( + oldID int64 oldCountryCode string oldSender string oldLanguageCode string @@ -542,6 +565,7 @@ defer gms.Close() for gms.Next() { if err = gms.Scan( + &oldID, &oldCountryCode, &oldSender, &oldLanguageCode, @@ -554,7 +578,7 @@ ); err != nil { return nil, err } - gmLines[oldMeasureDate.Unix()] = newAGMLine( + line := newAGMLine( location, oldCountryCode, oldSender, @@ -566,6 +590,8 @@ oldDateInfo, oldSourceOrganization, ) + line.id = oldID + gmLines[oldMeasureDate.Unix()] = line } if err = gms.Err(); err != nil { return nil, err diff -r f2d5bf42ed38 -r edb1d11e14cd pkg/imports/track.go --- a/pkg/imports/track.go Wed Jul 24 10:54:18 2019 +0200 +++ b/pkg/imports/track.go Wed Jul 24 16:11:51 2019 +0200 @@ -22,6 +22,10 @@ trackImportSQL = ` INSERT INTO import.track_imports (import_id, relation, key) VALUES ($1, $2::regclass, $3)` + + trackImportDeletionSQL = ` + INSERT INTO import.track_imports (import_id, deletion, relation, key) + VALUES ($1, $4, $2::regclass, $3)` ) func track(ctx context.Context, tx *sql.Tx, importID int64, relation string, key int64) error {