changeset 4036:e45442db19b1 faster-agm

First stab to make AGM imports faster by avoiding unnecessary inserts. Also delete un-updated measures in time ranges.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 24 Jul 2019 11:46:41 +0200
parents f2d5bf42ed38
children a18bf6bc7e3c
files pkg/imports/agm.go pkg/imports/track.go
diffstat 2 files changed, 92 insertions(+), 72 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/agm.go	Wed Jul 24 10:54:18 2019 +0200
+++ b/pkg/imports/agm.go	Wed Jul 24 11:46:41 2019 +0200
@@ -21,7 +21,6 @@
 	"database/sql"
 	"encoding/csv"
 	"encoding/json"
-	"errors"
 	"fmt"
 	"io"
 	"math"
@@ -65,30 +64,24 @@
 }
 
 const (
-	// delete the old  and keep the new measures.
 	agmStageDoneDeleteSQL = `
-WITH staged AS (
+DELETE FROM waterway.gauge_measurements WHERE id IN (
   SELECT key
   FROM import.track_imports
   WHERE import_id = $1 AND
-        relation = 'waterway.gauge_measurements'::regclass
-),
-to_delete AS (
-  SELECT o.id AS id
-  FROM waterway.gauge_measurements o
-  JOIN waterway.gauge_measurements n
-    USING (location, measure_date)
-    WHERE n.id IN (SELECT key FROM staged)
-	  AND o.id NOT IN (SELECT key FROM staged)
-)
-DELETE FROM waterway.gauge_measurements WHERE id IN (SELECT id from to_delete)`
+    relation = 'waterway.gauge_measurements'::regclass AND
+    deletion
+)`
 
 	agmStageDoneSQL = `
 UPDATE waterway.gauge_measurements SET staging_done = true
 WHERE id IN (
-  SELECT key FROM import.track_imports
+  SELECT key
+  FROM import.track_imports
   WHERE import_id = $1 AND
-    relation = 'waterway.gauge_measurements'::regclass)`
+    relation = 'waterway.gauge_measurements'::regclass AND
+  NOT deletion
+)`
 )
 
 func (agmJobCreator) StageDone(
@@ -121,6 +114,7 @@
 }
 
 type agmLine struct {
+	id                 int64
 	Location           models.Isrs `json:"fk-gauge-id"`
 	CountryCode        string      `json:"country-code"`
 	Sender             string      `json:"sender"`
@@ -152,6 +146,7 @@
 const (
 	agmSelectSQL = `
 SELECT
+  id,
   country_code,
   sender,
   language_code,
@@ -211,8 +206,6 @@
 `
 )
 
-var errContinue = errors.New("continue")
-
 func parseAGMHeaders(headers []string, fkGaugeIDIdx, measureDateIdx, valueIdx *int) error {
 
 	headerFields := []struct {
@@ -310,7 +303,7 @@
 	}
 	defer insertStmt.Close()
 
-	trackStmt, err := conn.PrepareContext(ctx, trackImportSQL)
+	trackStmt, err := conn.PrepareContext(ctx, trackImportDeletionSQL)
 	if err != nil {
 		return nil, err
 	}
@@ -431,72 +424,91 @@
 		}
 	}
 
+	tx, err := conn.BeginTx(ctx, nil)
+	if err != nil {
+		return nil, err
+	}
+	defer tx.Rollback()
+
+	txInsertStmt := tx.StmtContext(ctx, insertStmt)
+	txTrackStmt := tx.StmtContext(ctx, trackStmt)
+
 agmLines:
 	for _, line := range agmLines {
 
-		switch err := func() error {
-			tx, err := conn.BeginTx(ctx, nil)
-			if err != nil {
-				return err
-			}
-			defer tx.Rollback()
-
-			var newID int64
+		var ase *agmSummaryEntry
 
-			if err := tx.StmtContext(ctx, insertStmt).QueryRowContext(
-				ctx,
-				line.Location.CountryCode,
-				line.Location.LoCode,
-				line.Location.FairwaySection,
-				line.Location.Orc,
-				line.Location.Hectometre,
-				line.MeasureDate.Time,
-				line.CountryCode,
-				line.Sender,
-				line.LanguageCode,
-				line.DateIssue.Time,
-				line.ReferenceCode,
-				line.WaterLevel,
-				line.DateInfo.Time,
-				line.SourceOrganization,
-			).Scan(&newID); err != nil {
-				warn(handleError(err).Error())
-				ignored++
-				return errContinue
+		if old := oldGMLines[line.Location]; old != nil {
+			ut := line.MeasureDate.Unix()
+			if o, ok := old[ut]; ok {
+				if !o.hasDiff(line) { // identical
+					// don't delete
+					delete(old, ut)
+					continue agmLines
+				}
+				ase = &agmSummaryEntry{
+					FKGaugeID:   line.Location,
+					MeasureDate: line.MeasureDate,
+					Versions:    []*agmLine{o, line},
+				}
+			}
+		}
+		if ase == nil {
+			ase = &agmSummaryEntry{
+				FKGaugeID:   line.Location,
+				MeasureDate: line.MeasureDate,
+				Versions:    []*agmLine{line},
 			}
+		}
 
-			if _, err := tx.StmtContext(ctx, trackStmt).ExecContext(
-				ctx, importID, "waterway.gauge_measurements", newID,
-			); err != nil {
-				return err
-			}
+		var newID int64
 
-			if err = tx.Commit(); err != nil {
-				err = fmt.Errorf("Commit failed: %v", err)
-			}
-			return err
-		}(); {
-		case err == errContinue:
-			continue agmLines
-		case err != nil:
+		if err := txInsertStmt.QueryRowContext(
+			ctx,
+			line.Location.CountryCode,
+			line.Location.LoCode,
+			line.Location.FairwaySection,
+			line.Location.Orc,
+			line.Location.Hectometre,
+			line.MeasureDate.Time,
+			line.CountryCode,
+			line.Sender,
+			line.LanguageCode,
+			line.DateIssue.Time,
+			line.ReferenceCode,
+			line.WaterLevel,
+			line.DateInfo.Time,
+			line.SourceOrganization,
+		).Scan(&newID); err != nil {
 			return nil, err
 		}
 
-		ase := &agmSummaryEntry{
-			FKGaugeID:   line.Location,
-			MeasureDate: line.MeasureDate,
+		if _, err := txTrackStmt.ExecContext(
+			ctx, importID, "waterway.gauge_measurements",
+			newID,
+			false,
+		); err != nil {
+			return nil, err
 		}
 
-		if o, hasOld := oldGMLines[line.Location][line.MeasureDate.Time.Unix()]; !hasOld {
-			ase.Versions = []*agmLine{line}
-		} else {
-			// Ignore if there is no diff.
-			if !line.hasDiff(o) {
-				continue
+		entries = append(entries, ase)
+	}
+
+	// Issue deletes
+	for _, old := range oldGMLines {
+		for _, line := range old {
+			if _, err := txTrackStmt.ExecContext(
+				ctx, importID, "waterway.gauge_measurements",
+				line.id,
+				true,
+			); err != nil {
+				return nil, err
 			}
-			ase.Versions = []*agmLine{o, line}
 		}
-		entries = append(entries, ase)
+	}
+
+	if err = tx.Commit(); err != nil {
+		return nil, fmt.Errorf("Commit failed: %v", err)
 	}
 
 	feedback.Info("Imported %d entries with changes", len(entries))
@@ -514,6 +526,7 @@
 	to time.Time,
 ) (map[int64]*agmLine, error) {
 	var (
+		oldID                 int64
 		oldCountryCode        string
 		oldSender             string
 		oldLanguageCode       string
@@ -542,6 +555,7 @@
 	defer gms.Close()
 	for gms.Next() {
 		if err = gms.Scan(
+			&oldID,
 			&oldCountryCode,
 			&oldSender,
 			&oldLanguageCode,
@@ -554,7 +568,7 @@
 		); err != nil {
 			return nil, err
 		}
-		gmLines[oldMeasureDate.Unix()] = newAGMLine(
+		line := newAGMLine(
 			location,
 			oldCountryCode,
 			oldSender,
@@ -566,6 +580,8 @@
 			oldDateInfo,
 			oldSourceOrganization,
 		)
+		line.id = oldID
+		gmLines[oldMeasureDate.Unix()] = line
 	}
 	if err = gms.Err(); err != nil {
 		return nil, err
--- a/pkg/imports/track.go	Wed Jul 24 10:54:18 2019 +0200
+++ b/pkg/imports/track.go	Wed Jul 24 11:46:41 2019 +0200
@@ -22,6 +22,10 @@
 	trackImportSQL = `
     INSERT INTO import.track_imports (import_id, relation, key)
 	VALUES ($1, $2::regclass, $3)`
+
+	trackImportDeletionSQL = `
+    INSERT INTO import.track_imports (import_id, deletion, relation, key)
+	VALUES ($1, $4, $2::regclass, $3)`
 )
 
 func track(ctx context.Context, tx *sql.Tx, importID int64, relation string, key int64) error {