Mercurial > gemma
view pkg/imports/agm.go @ 5560:f2204f91d286
Join the log lines of imports to the log exports to recover data from them.
Used in SR export to extract information that where in the meta json
but now are only found in the log.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 09 Feb 2022 18:34:40 +0100 |
parents | 56c589f7435d |
children | 6270951dda28 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018, 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Sascha Wilde <wilde@intevation.de> // * Tom Gottfried <tom.gottfried@intevation.de> package imports import ( "bufio" "context" "database/sql" "encoding/csv" "encoding/json" "fmt" "io" "math" "os" "path/filepath" "sort" "strconv" "strings" "time" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/models" ) // ApprovedGaugeMeasurements is a Job to import // approved gauge measurements from a CVS file // into the database. type ApprovedGaugeMeasurements struct { Dir string `json:"dir"` Originator string `json:"originator"` } // AGMJobKind is the unique name of an approved gauge measurements import job. const AGMJobKind JobKind = "agm" type agmJobCreator struct{} func init() { RegisterJobCreator(AGMJobKind, agmJobCreator{}) } func (agmJobCreator) AutoAccept() bool { return false } func (agmJobCreator) Description() string { return "approved gauge measurements" } func (agmJobCreator) Create() Job { return new(ApprovedGaugeMeasurements) } func (agmJobCreator) Depends() [2][]string { return [2][]string{ {"gauge_measurements"}, {"gauges"}, } } const ( agmStageDoneDeleteSQL = ` DELETE FROM waterway.gauge_measurements WHERE id IN ( SELECT key FROM import.track_imports WHERE import_id = $1 AND relation = 'waterway.gauge_measurements'::regclass AND deletion )` agmStageDoneSQL = ` UPDATE waterway.gauge_measurements SET staging_done = true WHERE id IN ( SELECT key FROM import.track_imports WHERE import_id = $1 AND relation = 'waterway.gauge_measurements'::regclass AND NOT deletion )` ) // StageDone replaces gauge measurements with those in the staging area func (agmJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, _ Feedback, ) error { _, err := tx.ExecContext(ctx, agmStageDoneDeleteSQL, id) if err == nil { _, err = tx.ExecContext(ctx, agmStageDoneSQL, id) } return err } // CleanUp removes the folder containing the CSV file with the // the approved gauge measurements. func (agm *ApprovedGaugeMeasurements) CleanUp() error { return os.RemoveAll(agm.Dir) } var guessDate = common.TimeParser([]string{ "02.01.2006 15:04", "2006-01-02T15:04:05-07:00", }).Parse type timetz struct{ time.Time } func (ttz *timetz) MarshalJSON() ([]byte, error) { return json.Marshal(ttz.Time.Format("2006-01-02T15:04:05-07:00")) } type agmLine struct { id int64 Location models.Isrs `json:"fk-gauge-id"` CountryCode string `json:"country-code"` Sender string `json:"sender"` LanguageCode string `json:"language-code"` DateIssue timetz `json:"date-issue"` ReferenceCode string `json:"reference-code"` MeasureDate timetz `json:"measure-date"` WaterLevel float64 `json:"water-level"` DateInfo timetz `json:"date-info"` SourceOrganization string `json:"source-organization"` } func (a *agmLine) hasDiff(b *agmLine) bool { const eps = 0.00001 return a.CountryCode != b.CountryCode || a.Sender != b.Sender || a.LanguageCode != b.LanguageCode || a.ReferenceCode != b.ReferenceCode || math.Abs(a.WaterLevel-b.WaterLevel) > eps || a.SourceOrganization != b.SourceOrganization } type agmSummaryEntry struct { FKGaugeID models.Isrs `json:"fk-gauge-id"` MeasureDate timetz `json:"measure-date"` Versions []*agmLine `json:"versions"` } const ( agmSelectSQL = ` SELECT id, country_code, sender, language_code, date_issue, reference_code, measure_date, water_level, date_info, source_organization FROM waterway.gauge_measurements WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) AND measure_date BETWEEN $6 AND $7 AND staging_done ` agmInsertSQL = ` INSERT INTO waterway.gauge_measurements ( location, measure_date, country_code, sender, language_code, date_issue, reference_code, water_level, date_info, source_organization, staging_done ) VALUES ( ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), $6, $7, $8, $9, $10, $11, $12, $13, $14, false ) RETURNING id` agmGaugeCheckSQL = ` SELECT EXISTS( SELECT 1 FROM waterway.gauges WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)) ` ) func parseAGMHeaders(headers []string, fkGaugeIDIdx, measureDateIdx, valueIdx *int) error { headerFields := []struct { idx *int name string }{ {fkGaugeIDIdx, "fk_gauge_id"}, {measureDateIdx, "measure_date"}, {valueIdx, "value"}, // "water_level", } nextHeader: for i, f := range headers { h := strings.Replace(strings.ToLower( strings.TrimSpace(f)), " ", "_", -1) for j := range headerFields { if headerFields[j].name == h { if *headerFields[j].idx != -1 { return fmt.Errorf( "there is more than one column namend '%s'", h) } *headerFields[j].idx = i continue nextHeader } } } var missing []string for i := range headerFields { if headerFields[i].name != "unit" && *headerFields[i].idx == -1 { missing = append(missing, headerFields[i].name) } } if len(missing) > 0 { return fmt.Errorf("missing columns: %s", strings.Join(missing, ", ")) } return nil } // Do executes the actual approved gauge measurements import. func (agm *ApprovedGaugeMeasurements) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() f, err := os.Open(filepath.Join(agm.Dir, "agm.csv")) if err != nil { return nil, err } defer f.Close() r := csv.NewReader(bufio.NewReader(f)) r.Comma = ';' r.ReuseRecord = true headers, err := r.Read() if err != nil { return nil, err } var ( fkGaugeIDIdx = -1 measureDateIdx = -1 valueIdx = -1 ) if err := parseAGMHeaders( headers, &fkGaugeIDIdx, &measureDateIdx, &valueIdx, ); err != nil { return nil, err } gaugeCheckStmt, err := conn.PrepareContext(ctx, agmGaugeCheckSQL) if err != nil { return nil, err } defer gaugeCheckStmt.Close() selectStmt, err := conn.PrepareContext(ctx, agmSelectSQL) if err != nil { return nil, err } defer selectStmt.Close() insertStmt, err := conn.PrepareContext(ctx, agmInsertSQL) if err != nil { return nil, err } defer insertStmt.Close() trackStmt, err := conn.PrepareContext(ctx, trackImportDeletionSQL) if err != nil { return nil, err } defer trackStmt.Close() entries := []*agmSummaryEntry{} checkedGauges := map[models.Isrs]bool{} warnLimiter := common.WarningLimiter{Log: feedback.Warn, MaxWarnings: 100} warn := warnLimiter.Warn defer warnLimiter.Close() agmLines := []*agmLine{} ignored := 0 mdMinMax := map[models.Isrs]*[2]time.Time{} lines: for line := 1; ; line++ { row, err := r.Read() switch { case err == io.EOF || len(row) == 0: feedback.Info("Read %d entries in CSV file", line-1) if ignored > 0 { feedback.Info("%d entries ignored", ignored) } if ignored == line-1 { return nil, UnchangedError("No entries imported") } break lines case err != nil: return nil, fmt.Errorf("CSV parsing failed: %v", err) } gids := row[fkGaugeIDIdx] gid, err := models.IsrsFromString(gids) if err != nil { return nil, fmt.Errorf("invalid ISRS code line %d: %v", line, err) } if exists, found := checkedGauges[*gid]; found { if !exists { // Just ignore the line since we have already warned ignored++ continue lines } } else { // not found in gauge cache if err := gaugeCheckStmt.QueryRowContext( ctx, gid.CountryCode, gid.LoCode, gid.FairwaySection, gid.Orc, gid.Hectometre, ).Scan(&exists); err != nil { return nil, err } checkedGauges[*gid] = exists if !exists { warn("Ignoring data for unknown gauge %s", gid.String()) ignored++ continue lines } } md, err := guessDate(row[measureDateIdx]) if err != nil { return nil, fmt.Errorf("invalid 'measure_date' line %d: %v", line, err) } if v := mdMinMax[*gid]; v != nil { if md.Before(v[0]) { v[0] = md } if md.After(v[1]) { v[1] = md } } else { mdMinMax[*gid] = &[2]time.Time{md, md} } newSender := agm.Originator newCountryCode := gid.CountryCode newLanguageCode := common.CCtoLang[gid.CountryCode] newDateIssue := time.Now() newReferenceCode := "ZPG" value, err := strconv.ParseFloat(row[valueIdx], 32) if err != nil { return nil, fmt.Errorf("invalid 'value' line %d: %v", line, err) } newValue := value newDateInfo := newDateIssue newSourceOrganization := newSender agmLines = append(agmLines, newAGMLine( *gid, newCountryCode, newSender, newLanguageCode, newDateIssue, newReferenceCode, md, newValue, newDateInfo, newSourceOrganization, )) } oldGMLines := map[models.Isrs]map[int64]*agmLine{} for gid, minMax := range mdMinMax { oldGMLines[gid], err = getOldGMLines( ctx, selectStmt, gid, minMax[0], minMax[1]) if err != nil { return nil, err } } tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() txInsertStmt := tx.StmtContext(ctx, insertStmt) txTrackStmt := tx.StmtContext(ctx, trackStmt) agmLines: for _, line := range agmLines { var ase *agmSummaryEntry if old := oldGMLines[line.Location]; old != nil { ut := line.MeasureDate.Unix() if o, ok := old[ut]; ok { if !o.hasDiff(line) { // identical // don't delete delete(old, ut) continue agmLines } ase = &agmSummaryEntry{ FKGaugeID: line.Location, MeasureDate: line.MeasureDate, Versions: []*agmLine{o, line}, } } } if ase == nil { ase = &agmSummaryEntry{ FKGaugeID: line.Location, MeasureDate: line.MeasureDate, Versions: []*agmLine{line}, } } var newID int64 if err := txInsertStmt.QueryRowContext( ctx, line.Location.CountryCode, line.Location.LoCode, line.Location.FairwaySection, line.Location.Orc, line.Location.Hectometre, line.MeasureDate.Time, line.CountryCode, line.Sender, line.LanguageCode, line.DateIssue.Time, line.ReferenceCode, line.WaterLevel, line.DateInfo.Time, line.SourceOrganization, ).Scan(&newID); err != nil { return nil, err } if _, err := txTrackStmt.ExecContext( ctx, importID, "waterway.gauge_measurements", newID, false, ); err != nil { return nil, err } entries = append(entries, ase) } var removed int // Issue deletes for _, old := range oldGMLines { removed += len(old) for _, line := range old { if _, err := txTrackStmt.ExecContext( ctx, importID, "waterway.gauge_measurements", line.id, true, ); err != nil { return nil, err } entries = append(entries, &agmSummaryEntry{ FKGaugeID: line.Location, MeasureDate: line.MeasureDate, Versions: []*agmLine{line, nil}, }) } } feedback.Info("Measurements to update/insert: %d", len(entries)) feedback.Info("Measurements to delete: %d", removed) if len(entries) == 0 && removed == 0 { return nil, UnchangedError("No changes from AGM import") } if err = tx.Commit(); err != nil { return nil, fmt.Errorf("commit failed: %v", err) } // Sort here to mix the deletes right beside the matching inserts/updates. // This also makes the output deterministic. sort.Slice(entries, func(i, j int) bool { return entries[i].FKGaugeID.Less(&entries[j].FKGaugeID) }) feedback.Info("Imported %d entries with changes", len(entries)) feedback.Info("Importing approved gauge measurements took %s", time.Since(start)) return entries, nil } func getOldGMLines( ctx context.Context, stmt *sql.Stmt, location models.Isrs, from time.Time, to time.Time, ) (map[int64]*agmLine, error) { var ( oldID int64 oldCountryCode string oldSender string oldLanguageCode string oldDateIssue time.Time oldReferenceCode string oldMeasureDate time.Time oldValue float64 oldDateInfo time.Time oldSourceOrganization string ) gmLines := map[int64]*agmLine{} gms, err := stmt.QueryContext( ctx, location.CountryCode, location.LoCode, location.FairwaySection, location.Orc, location.Hectometre, from, to, ) if err != nil { return nil, err } defer gms.Close() for gms.Next() { if err = gms.Scan( &oldID, &oldCountryCode, &oldSender, &oldLanguageCode, &oldDateIssue, &oldReferenceCode, &oldMeasureDate, &oldValue, &oldDateInfo, &oldSourceOrganization, ); err != nil { return nil, err } line := newAGMLine( location, oldCountryCode, oldSender, oldLanguageCode, oldDateIssue, oldReferenceCode, oldMeasureDate, oldValue, oldDateInfo, oldSourceOrganization, ) line.id = oldID gmLines[oldMeasureDate.Unix()] = line } if err = gms.Err(); err != nil { return nil, err } return gmLines, nil } func newAGMLine( location models.Isrs, countryCode string, sender string, languageCode string, dateIssue time.Time, referenceCode string, measureDate time.Time, waterLevel float64, dateInfo time.Time, sourceOrganization string, ) *agmLine { return &agmLine{ Location: location, CountryCode: countryCode, Sender: sender, LanguageCode: languageCode, DateIssue: timetz{dateIssue}, ReferenceCode: referenceCode, MeasureDate: timetz{measureDate}, WaterLevel: waterLevel, DateInfo: timetz{dateInfo}, SourceOrganization: sourceOrganization, } }