changeset 4043:fbd7c012f10c historization_ng

Merged
author Sascha Wilde <wilde@intevation.de>
date Wed, 24 Jul 2019 11:16:38 +0200
parents 9f6a6b8ad965 (current diff) 4f2f34f5d14d (diff)
children f42f7f7eb81f
files schema/gemma.sql schema/version.sql
diffstat 4 files changed, 155 insertions(+), 99 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/agm.go	Tue Jul 23 13:12:35 2019 +0200
+++ b/pkg/imports/agm.go	Wed Jul 24 11:16:38 2019 +0200
@@ -152,12 +152,12 @@
 const (
 	agmSelectSQL = `
 SELECT
-  id,
   country_code,
   sender,
   language_code,
   date_issue,
   reference_code,
+  measure_date,
   water_level,
   date_info,
   source_organization
@@ -165,7 +165,7 @@
 WHERE
   location
     = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
-  AND measure_date = $6
+  AND measure_date BETWEEN $6 AND $7
   AND staging_done
 `
 
@@ -213,6 +213,47 @@
 
 var errContinue = errors.New("continue")
 
+func parseAGMHeaders(headers []string, fkGaugeIDIdx, measureDateIdx, valueIdx *int) error {
+
+	headerFields := []struct {
+		idx  *int
+		name string
+	}{
+		{fkGaugeIDIdx, "fk_gauge_id"},
+		{measureDateIdx, "measure_date"},
+		{valueIdx, "value"}, // "water_level",
+	}
+
+nextHeader:
+	for i, f := range headers {
+		h := strings.Replace(strings.ToLower(
+			strings.TrimSpace(f)), " ", "_", -1)
+
+		for j := range headerFields {
+			if headerFields[j].name == h {
+				if *headerFields[j].idx != -1 {
+					return fmt.Errorf(
+						"There is more than one column namend '%s'", h)
+				}
+				*headerFields[j].idx = i
+				continue nextHeader
+			}
+		}
+	}
+
+	var missing []string
+	for i := range headerFields {
+		if headerFields[i].name != "unit" && *headerFields[i].idx == -1 {
+			missing = append(missing, headerFields[i].name)
+		}
+	}
+	if len(missing) > 0 {
+		return fmt.Errorf("Missing columns: %s", strings.Join(missing, ", "))
+	}
+
+	return nil
+}
+
 // Do executes the actual approved gauge measurements import.
 func (agm *ApprovedGaugeMeasurements) Do(
 	ctx context.Context,
@@ -244,40 +285,11 @@
 		valueIdx       = -1
 	)
 
-	headerFields := []struct {
-		idx  *int
-		name string
-	}{
-		{&fkGaugeIDIdx, "fk_gauge_id"},
-		{&measureDateIdx, "measure_date"},
-		{&valueIdx, "value"}, // "water_level",
-	}
-
-nextHeader:
-	for i, f := range headers {
-		h := strings.Replace(strings.ToLower(
-			strings.TrimSpace(f)), " ", "_", -1)
-
-		for j := range headerFields {
-			if headerFields[j].name == h {
-				if *headerFields[j].idx != -1 {
-					return nil, fmt.Errorf(
-						"There is more than one column namend '%s'", h)
-				}
-				*headerFields[j].idx = i
-				continue nextHeader
-			}
-		}
-	}
-
-	var missing []string
-	for i := range headerFields {
-		if headerFields[i].name != "unit" && *headerFields[i].idx == -1 {
-			missing = append(missing, headerFields[i].name)
-		}
-	}
-	if len(missing) > 0 {
-		return nil, fmt.Errorf("Missing columns: %s", strings.Join(missing, ", "))
+	if err := parseAGMHeaders(
+		headers,
+		&fkGaugeIDIdx, &measureDateIdx, &valueIdx,
+	); err != nil {
+		return nil, err
 	}
 
 	gaugeCheckStmt, err := conn.PrepareContext(ctx, agmGaugeCheckSQL)
@@ -314,6 +326,7 @@
 
 	agmLines := []*agmLine{}
 	ignored := 0
+	mdMinMax := map[models.Isrs][2]time.Time{}
 
 lines:
 	for line := 1; ; line++ {
@@ -368,6 +381,16 @@
 		if err != nil {
 			return nil, fmt.Errorf("Invalid 'measure_date' line %d: %v", line, err)
 		}
+		if _, hasGid := mdMinMax[*gid]; hasGid {
+			if md.Before(mdMinMax[*gid][0]) {
+				mdMinMax[*gid] = [2]time.Time{md, mdMinMax[*gid][1]}
+			}
+			if md.After(mdMinMax[*gid][1]) {
+				mdMinMax[*gid] = [2]time.Time{mdMinMax[*gid][0], md}
+			}
+		} else {
+			mdMinMax[*gid] = [2]time.Time{md, md}
+		}
 
 		newSender := agm.Originator
 		newCountryCode := gid.CountryCode
@@ -399,48 +422,17 @@
 		))
 	}
 
+	oldGMLines := map[models.Isrs]map[int64]*agmLine{}
+	for gid, minMax := range mdMinMax {
+		oldGMLines[gid], err = getOldGMLines(
+			ctx, selectStmt, gid, minMax[0], minMax[1])
+		if err != nil {
+			return nil, err
+		}
+	}
+
 agmLines:
 	for _, line := range agmLines {
-		var (
-			oldID                 int64
-			oldCountryCode        string
-			oldSender             string
-			oldLanguageCode       string
-			oldDateIssue          time.Time
-			oldReferenceCode      string
-			oldValue              float64
-			oldDateInfo           time.Time
-			oldSourceOrganization string
-		)
-
-		err = selectStmt.QueryRowContext(
-			ctx,
-			line.Location.CountryCode,
-			line.Location.LoCode,
-			line.Location.FairwaySection,
-			line.Location.Orc,
-			line.Location.Hectometre,
-			line.MeasureDate.Time,
-		).Scan(
-			&oldID,
-			&oldCountryCode,
-			&oldSender,
-			&oldLanguageCode,
-			&oldDateIssue,
-			&oldReferenceCode,
-			&oldValue,
-			&oldDateInfo,
-			&oldSourceOrganization,
-		)
-
-		var newEntry bool
-		switch {
-		case err == sql.ErrNoRows:
-			// Complete new one
-			newEntry = true
-		case err != nil:
-			return nil, err
-		}
 
 		switch err := func() error {
 			tx, err := conn.BeginTx(ctx, nil)
@@ -495,21 +487,9 @@
 			MeasureDate: line.MeasureDate,
 		}
 
-		if newEntry {
+		if o, hasOld := oldGMLines[line.Location][line.MeasureDate.Time.Unix()]; !hasOld {
 			ase.Versions = []*agmLine{line}
 		} else {
-			o := newAGMLine(
-				line.Location,
-				oldCountryCode,
-				oldSender,
-				oldLanguageCode,
-				oldDateIssue,
-				oldReferenceCode,
-				line.MeasureDate.Time,
-				oldValue,
-				oldDateInfo,
-				oldSourceOrganization,
-			)
 			// Ignore if there is no diff.
 			if !line.hasDiff(o) {
 				continue
@@ -526,6 +506,73 @@
 	return entries, nil
 }
 
+func getOldGMLines(
+	ctx context.Context,
+	stmt *sql.Stmt,
+	location models.Isrs,
+	from time.Time,
+	to time.Time,
+) (map[int64]*agmLine, error) {
+	var (
+		oldCountryCode        string
+		oldSender             string
+		oldLanguageCode       string
+		oldDateIssue          time.Time
+		oldReferenceCode      string
+		oldMeasureDate        time.Time
+		oldValue              float64
+		oldDateInfo           time.Time
+		oldSourceOrganization string
+	)
+	gmLines := map[int64]*agmLine{}
+
+	gms, err := stmt.QueryContext(
+		ctx,
+		location.CountryCode,
+		location.LoCode,
+		location.FairwaySection,
+		location.Orc,
+		location.Hectometre,
+		from,
+		to,
+	)
+	if err != nil {
+		return nil, err
+	}
+	defer gms.Close()
+	for gms.Next() {
+		if err = gms.Scan(
+			&oldCountryCode,
+			&oldSender,
+			&oldLanguageCode,
+			&oldDateIssue,
+			&oldReferenceCode,
+			&oldMeasureDate,
+			&oldValue,
+			&oldDateInfo,
+			&oldSourceOrganization,
+		); err != nil {
+			return nil, err
+		}
+		gmLines[oldMeasureDate.Unix()] = newAGMLine(
+			location,
+			oldCountryCode,
+			oldSender,
+			oldLanguageCode,
+			oldDateIssue,
+			oldReferenceCode,
+			oldMeasureDate,
+			oldValue,
+			oldDateInfo,
+			oldSourceOrganization,
+		)
+	}
+	if err = gms.Err(); err != nil {
+		return nil, err
+	}
+	return gmLines, nil
+}
+
 func newAGMLine(
 	location models.Isrs,
 	countryCode string,
--- a/schema/gemma.sql	Tue Jul 23 13:12:35 2019 +0200
+++ b/schema/gemma.sql	Wed Jul 24 11:16:38 2019 +0200
@@ -902,6 +902,7 @@
     CREATE TABLE track_imports (
         import_id int      NOT NULL REFERENCES imports(id)
             ON DELETE CASCADE,
+        deletion  bool     NOT NULL DEFAULT false,
         relation  regclass NOT NULL,
         key       int      NOT NULL,
         UNIQUE (relation, key)
@@ -914,7 +915,7 @@
     tmp RECORD;
 BEGIN
     FOR tmp IN
-        SELECT * FROM import.track_imports WHERE import_id = imp_id
+        SELECT * FROM import.track_imports WHERE import_id = imp_id AND NOT deletion
     LOOP
         EXECUTE format('DELETE FROM %s WHERE id = $1', tmp.relation) USING tmp.key;
     END LOOP;
@@ -922,15 +923,6 @@
 $$
 LANGUAGE plpgsql;
 
-CREATE FUNCTION import.del_import() RETURNS trigger AS
-$$
-BEGIN
-    EXECUTE format('DELETE FROM %s WHERE id = $1', OLD.relation) USING OLD.key;
-    RETURN NULL;
-END;
-$$
-LANGUAGE plpgsql;
-
 CREATE SCHEMA caching
 
     CREATE TABLE sounding_differences (
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/schema/updates/1007/01.delete-import-tracking.sql	Wed Jul 24 11:16:38 2019 +0200
@@ -0,0 +1,1 @@
+ALTER TABLE import.track_imports ADD COLUMN deletion bool NOT NULL DEFAULT false;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/schema/updates/1008/01.import-delete.sql	Wed Jul 24 11:16:38 2019 +0200
@@ -0,0 +1,16 @@
+DROP FUNCTION  import.del_import();
+
+CREATE OR REPLACE FUNCTION import.del_import(imp_id int) RETURNS void AS
+$$
+DECLARE
+    tmp RECORD;
+BEGIN
+    FOR tmp IN
+        SELECT * FROM import.track_imports WHERE import_id = imp_id AND NOT deletion
+    LOOP
+        EXECUTE format('DELETE FROM %s WHERE id = $1', tmp.relation) USING tmp.key;
+    END LOOP;
+END;
+$$
+LANGUAGE plpgsql;
+