Mercurial > gemma
changeset 4043:fbd7c012f10c historization_ng
Merged
author | Sascha Wilde <wilde@intevation.de> |
---|---|
date | Wed, 24 Jul 2019 11:16:38 +0200 |
parents | 9f6a6b8ad965 (current diff) 4f2f34f5d14d (diff) |
children | f42f7f7eb81f |
files | schema/gemma.sql schema/version.sql |
diffstat | 4 files changed, 155 insertions(+), 99 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/imports/agm.go Tue Jul 23 13:12:35 2019 +0200 +++ b/pkg/imports/agm.go Wed Jul 24 11:16:38 2019 +0200 @@ -152,12 +152,12 @@ const ( agmSelectSQL = ` SELECT - id, country_code, sender, language_code, date_issue, reference_code, + measure_date, water_level, date_info, source_organization @@ -165,7 +165,7 @@ WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) - AND measure_date = $6 + AND measure_date BETWEEN $6 AND $7 AND staging_done ` @@ -213,6 +213,47 @@ var errContinue = errors.New("continue") +func parseAGMHeaders(headers []string, fkGaugeIDIdx, measureDateIdx, valueIdx *int) error { + + headerFields := []struct { + idx *int + name string + }{ + {fkGaugeIDIdx, "fk_gauge_id"}, + {measureDateIdx, "measure_date"}, + {valueIdx, "value"}, // "water_level", + } + +nextHeader: + for i, f := range headers { + h := strings.Replace(strings.ToLower( + strings.TrimSpace(f)), " ", "_", -1) + + for j := range headerFields { + if headerFields[j].name == h { + if *headerFields[j].idx != -1 { + return fmt.Errorf( + "There is more than one column namend '%s'", h) + } + *headerFields[j].idx = i + continue nextHeader + } + } + } + + var missing []string + for i := range headerFields { + if headerFields[i].name != "unit" && *headerFields[i].idx == -1 { + missing = append(missing, headerFields[i].name) + } + } + if len(missing) > 0 { + return fmt.Errorf("Missing columns: %s", strings.Join(missing, ", ")) + } + + return nil +} + // Do executes the actual approved gauge measurements import. func (agm *ApprovedGaugeMeasurements) Do( ctx context.Context, @@ -244,40 +285,11 @@ valueIdx = -1 ) - headerFields := []struct { - idx *int - name string - }{ - {&fkGaugeIDIdx, "fk_gauge_id"}, - {&measureDateIdx, "measure_date"}, - {&valueIdx, "value"}, // "water_level", - } - -nextHeader: - for i, f := range headers { - h := strings.Replace(strings.ToLower( - strings.TrimSpace(f)), " ", "_", -1) - - for j := range headerFields { - if headerFields[j].name == h { - if *headerFields[j].idx != -1 { - return nil, fmt.Errorf( - "There is more than one column namend '%s'", h) - } - *headerFields[j].idx = i - continue nextHeader - } - } - } - - var missing []string - for i := range headerFields { - if headerFields[i].name != "unit" && *headerFields[i].idx == -1 { - missing = append(missing, headerFields[i].name) - } - } - if len(missing) > 0 { - return nil, fmt.Errorf("Missing columns: %s", strings.Join(missing, ", ")) + if err := parseAGMHeaders( + headers, + &fkGaugeIDIdx, &measureDateIdx, &valueIdx, + ); err != nil { + return nil, err } gaugeCheckStmt, err := conn.PrepareContext(ctx, agmGaugeCheckSQL) @@ -314,6 +326,7 @@ agmLines := []*agmLine{} ignored := 0 + mdMinMax := map[models.Isrs][2]time.Time{} lines: for line := 1; ; line++ { @@ -368,6 +381,16 @@ if err != nil { return nil, fmt.Errorf("Invalid 'measure_date' line %d: %v", line, err) } + if _, hasGid := mdMinMax[*gid]; hasGid { + if md.Before(mdMinMax[*gid][0]) { + mdMinMax[*gid] = [2]time.Time{md, mdMinMax[*gid][1]} + } + if md.After(mdMinMax[*gid][1]) { + mdMinMax[*gid] = [2]time.Time{mdMinMax[*gid][0], md} + } + } else { + mdMinMax[*gid] = [2]time.Time{md, md} + } newSender := agm.Originator newCountryCode := gid.CountryCode @@ -399,48 +422,17 @@ )) } + oldGMLines := map[models.Isrs]map[int64]*agmLine{} + for gid, minMax := range mdMinMax { + oldGMLines[gid], err = getOldGMLines( + ctx, selectStmt, gid, minMax[0], minMax[1]) + if err != nil { + return nil, err + } + } + agmLines: for _, line := range agmLines { - var ( - oldID int64 - oldCountryCode string - oldSender string - oldLanguageCode string - oldDateIssue time.Time - oldReferenceCode string - oldValue float64 - oldDateInfo time.Time - oldSourceOrganization string - ) - - err = selectStmt.QueryRowContext( - ctx, - line.Location.CountryCode, - line.Location.LoCode, - line.Location.FairwaySection, - line.Location.Orc, - line.Location.Hectometre, - line.MeasureDate.Time, - ).Scan( - &oldID, - &oldCountryCode, - &oldSender, - &oldLanguageCode, - &oldDateIssue, - &oldReferenceCode, - &oldValue, - &oldDateInfo, - &oldSourceOrganization, - ) - - var newEntry bool - switch { - case err == sql.ErrNoRows: - // Complete new one - newEntry = true - case err != nil: - return nil, err - } switch err := func() error { tx, err := conn.BeginTx(ctx, nil) @@ -495,21 +487,9 @@ MeasureDate: line.MeasureDate, } - if newEntry { + if o, hasOld := oldGMLines[line.Location][line.MeasureDate.Time.Unix()]; !hasOld { ase.Versions = []*agmLine{line} } else { - o := newAGMLine( - line.Location, - oldCountryCode, - oldSender, - oldLanguageCode, - oldDateIssue, - oldReferenceCode, - line.MeasureDate.Time, - oldValue, - oldDateInfo, - oldSourceOrganization, - ) // Ignore if there is no diff. if !line.hasDiff(o) { continue @@ -526,6 +506,73 @@ return entries, nil } +func getOldGMLines( + ctx context.Context, + stmt *sql.Stmt, + location models.Isrs, + from time.Time, + to time.Time, +) (map[int64]*agmLine, error) { + var ( + oldCountryCode string + oldSender string + oldLanguageCode string + oldDateIssue time.Time + oldReferenceCode string + oldMeasureDate time.Time + oldValue float64 + oldDateInfo time.Time + oldSourceOrganization string + ) + gmLines := map[int64]*agmLine{} + + gms, err := stmt.QueryContext( + ctx, + location.CountryCode, + location.LoCode, + location.FairwaySection, + location.Orc, + location.Hectometre, + from, + to, + ) + if err != nil { + return nil, err + } + defer gms.Close() + for gms.Next() { + if err = gms.Scan( + &oldCountryCode, + &oldSender, + &oldLanguageCode, + &oldDateIssue, + &oldReferenceCode, + &oldMeasureDate, + &oldValue, + &oldDateInfo, + &oldSourceOrganization, + ); err != nil { + return nil, err + } + gmLines[oldMeasureDate.Unix()] = newAGMLine( + location, + oldCountryCode, + oldSender, + oldLanguageCode, + oldDateIssue, + oldReferenceCode, + oldMeasureDate, + oldValue, + oldDateInfo, + oldSourceOrganization, + ) + } + if err = gms.Err(); err != nil { + return nil, err + } + return gmLines, nil +} + func newAGMLine( location models.Isrs, countryCode string,
--- a/schema/gemma.sql Tue Jul 23 13:12:35 2019 +0200 +++ b/schema/gemma.sql Wed Jul 24 11:16:38 2019 +0200 @@ -902,6 +902,7 @@ CREATE TABLE track_imports ( import_id int NOT NULL REFERENCES imports(id) ON DELETE CASCADE, + deletion bool NOT NULL DEFAULT false, relation regclass NOT NULL, key int NOT NULL, UNIQUE (relation, key) @@ -914,7 +915,7 @@ tmp RECORD; BEGIN FOR tmp IN - SELECT * FROM import.track_imports WHERE import_id = imp_id + SELECT * FROM import.track_imports WHERE import_id = imp_id AND NOT deletion LOOP EXECUTE format('DELETE FROM %s WHERE id = $1', tmp.relation) USING tmp.key; END LOOP; @@ -922,15 +923,6 @@ $$ LANGUAGE plpgsql; -CREATE FUNCTION import.del_import() RETURNS trigger AS -$$ -BEGIN - EXECUTE format('DELETE FROM %s WHERE id = $1', OLD.relation) USING OLD.key; - RETURN NULL; -END; -$$ -LANGUAGE plpgsql; - CREATE SCHEMA caching CREATE TABLE sounding_differences (
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/schema/updates/1007/01.delete-import-tracking.sql Wed Jul 24 11:16:38 2019 +0200 @@ -0,0 +1,1 @@ +ALTER TABLE import.track_imports ADD COLUMN deletion bool NOT NULL DEFAULT false;
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/schema/updates/1008/01.import-delete.sql Wed Jul 24 11:16:38 2019 +0200 @@ -0,0 +1,16 @@ +DROP FUNCTION import.del_import(); + +CREATE OR REPLACE FUNCTION import.del_import(imp_id int) RETURNS void AS +$$ +DECLARE + tmp RECORD; +BEGIN + FOR tmp IN + SELECT * FROM import.track_imports WHERE import_id = imp_id AND NOT deletion + LOOP + EXECUTE format('DELETE FROM %s WHERE id = $1', tmp.relation) USING tmp.key; + END LOOP; +END; +$$ +LANGUAGE plpgsql; +