Mercurial > gemma
view pkg/imports/agm.go @ 4685:7a9388943840
morphology: Clip class breaks again Z min and max of the the height model.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 16 Oct 2019 11:07:59 +0200 |
parents | 51e90370eced |
children | 59a99655f34d |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018, 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Sascha Wilde <wilde@intevation.de> // * Tom Gottfried <tom.gottfried@intevation.de> package imports import ( "bufio" "context" "database/sql" "encoding/csv" "encoding/json" "fmt" "io" "math" "os" "path/filepath" "sort" "strconv" "strings" "time" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/models" ) // ApprovedGaugeMeasurements is a Job to import // approved gauge measurements from a CVS file // into the database. type ApprovedGaugeMeasurements struct { Dir string `json:"dir"` Originator string `json:"originator"` } // AGMJobKind is the unique name of an approved gauge measurements import job. const AGMJobKind JobKind = "agm" type agmJobCreator struct{} func init() { RegisterJobCreator(AGMJobKind, agmJobCreator{}) } func (agmJobCreator) AutoAccept() bool { return false } func (agmJobCreator) Description() string { return "approved gauge measurements" } func (agmJobCreator) Create() Job { return new(ApprovedGaugeMeasurements) } func (agmJobCreator) Depends() [2][]string { return [2][]string{ {"gauge_measurements"}, {"gauges"}, } } const ( agmStageDoneDeleteSQL = ` DELETE FROM waterway.gauge_measurements WHERE id IN ( SELECT key FROM import.track_imports WHERE import_id = $1 AND relation = 'waterway.gauge_measurements'::regclass AND deletion )` agmStageDoneSQL = ` UPDATE waterway.gauge_measurements SET staging_done = true WHERE id IN ( SELECT key FROM import.track_imports WHERE import_id = $1 AND relation = 'waterway.gauge_measurements'::regclass AND NOT deletion )` ) func (agmJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, ) error { _, err := tx.ExecContext(ctx, agmStageDoneDeleteSQL, id) if err == nil { _, err = tx.ExecContext(ctx, agmStageDoneSQL, id) } return err } // CleanUp removes the folder containing the CSV file with the // the approved gauge measurements. func (agm *ApprovedGaugeMeasurements) CleanUp() error { return os.RemoveAll(agm.Dir) } var guessDate = common.TimeParser([]string{ "02.01.2006 15:04", "2006-01-02T15:04:05-07:00", }).Parse type timetz struct{ time.Time } func (ttz *timetz) MarshalJSON() ([]byte, error) { return json.Marshal(ttz.Time.Format("2006-01-02T15:04:05-07:00")) } type agmLine struct { id int64 Location models.Isrs `json:"fk-gauge-id"` CountryCode string `json:"country-code"` Sender string `json:"sender"` LanguageCode string `json:"language-code"` DateIssue timetz `json:"date-issue"` ReferenceCode string `json:"reference-code"` MeasureDate timetz `json:"measure-date"` WaterLevel float64 `json:"water-level"` DateInfo timetz `json:"date-info"` SourceOrganization string `json:"source-organization"` } func (a *agmLine) hasDiff(b *agmLine) bool { const eps = 0.00001 return a.CountryCode != b.CountryCode || a.Sender != b.Sender || a.LanguageCode != b.LanguageCode || a.ReferenceCode != b.ReferenceCode || math.Abs(a.WaterLevel-b.WaterLevel) > eps || a.SourceOrganization != b.SourceOrganization } type agmSummaryEntry struct { FKGaugeID models.Isrs `json:"fk-gauge-id"` MeasureDate timetz `json:"measure-date"` Versions []*agmLine `json:"versions"` } const ( agmSelectSQL = ` SELECT id, country_code, sender, language_code, date_issue, reference_code, measure_date, water_level, date_info, source_organization FROM waterway.gauge_measurements WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) AND measure_date BETWEEN $6 AND $7 AND staging_done ` agmInsertSQL = ` INSERT INTO waterway.gauge_measurements ( location, measure_date, country_code, sender, language_code, date_issue, reference_code, water_level, date_info, source_organization, staging_done ) VALUES ( ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), $6, $7, $8, $9, $10, $11, $12, $13, $14, false ) RETURNING id` agmGaugeCheckSQL = ` SELECT EXISTS( SELECT 1 FROM waterway.gauges WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)) ` ) func parseAGMHeaders(headers []string, fkGaugeIDIdx, measureDateIdx, valueIdx *int) error { headerFields := []struct { idx *int name string }{ {fkGaugeIDIdx, "fk_gauge_id"}, {measureDateIdx, "measure_date"}, {valueIdx, "value"}, // "water_level", } nextHeader: for i, f := range headers { h := strings.Replace(strings.ToLower( strings.TrimSpace(f)), " ", "_", -1) for j := range headerFields { if headerFields[j].name == h { if *headerFields[j].idx != -1 { return fmt.Errorf( "there is more than one column namend '%s'", h) } *headerFields[j].idx = i continue nextHeader } } } var missing []string for i := range headerFields { if headerFields[i].name != "unit" && *headerFields[i].idx == -1 { missing = append(missing, headerFields[i].name) } } if len(missing) > 0 { return fmt.Errorf("missing columns: %s", strings.Join(missing, ", ")) } return nil } // Do executes the actual approved gauge measurements import. func (agm *ApprovedGaugeMeasurements) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() f, err := os.Open(filepath.Join(agm.Dir, "agm.csv")) if err != nil { return nil, err } defer f.Close() r := csv.NewReader(bufio.NewReader(f)) r.Comma = ';' r.ReuseRecord = true headers, err := r.Read() if err != nil { return nil, err } var ( fkGaugeIDIdx = -1 measureDateIdx = -1 valueIdx = -1 ) if err := parseAGMHeaders( headers, &fkGaugeIDIdx, &measureDateIdx, &valueIdx, ); err != nil { return nil, err } gaugeCheckStmt, err := conn.PrepareContext(ctx, agmGaugeCheckSQL) if err != nil { return nil, err } defer gaugeCheckStmt.Close() selectStmt, err := conn.PrepareContext(ctx, agmSelectSQL) if err != nil { return nil, err } defer selectStmt.Close() insertStmt, err := conn.PrepareContext(ctx, agmInsertSQL) if err != nil { return nil, err } defer insertStmt.Close() trackStmt, err := conn.PrepareContext(ctx, trackImportDeletionSQL) if err != nil { return nil, err } defer trackStmt.Close() entries := []*agmSummaryEntry{} checkedGauges := map[models.Isrs]bool{} warnLimiter := common.WarningLimiter{Log: feedback.Warn, MaxWarnings: 100} warn := warnLimiter.Warn defer warnLimiter.Close() agmLines := []*agmLine{} ignored := 0 mdMinMax := map[models.Isrs]*[2]time.Time{} lines: for line := 1; ; line++ { row, err := r.Read() switch { case err == io.EOF || len(row) == 0: feedback.Info("Read %d entries in CSV file", line-1) if ignored > 0 { feedback.Info("%d entries ignored", ignored) } if ignored == line-1 { return nil, UnchangedError("No entries imported") } break lines case err != nil: return nil, fmt.Errorf("CSV parsing failed: %v", err) } gids := row[fkGaugeIDIdx] gid, err := models.IsrsFromString(gids) if err != nil { return nil, fmt.Errorf("invalid ISRS code line %d: %v", line, err) } if exists, found := checkedGauges[*gid]; found { if !exists { // Just ignore the line since we have already warned ignored++ continue lines } } else { // not found in gauge cache if err := gaugeCheckStmt.QueryRowContext( ctx, gid.CountryCode, gid.LoCode, gid.FairwaySection, gid.Orc, gid.Hectometre, ).Scan(&exists); err != nil { return nil, err } checkedGauges[*gid] = exists if !exists { warn("Ignoring data for unknown gauge %s", gid.String()) ignored++ continue lines } } md, err := guessDate(row[measureDateIdx]) if err != nil { return nil, fmt.Errorf("invalid 'measure_date' line %d: %v", line, err) } if v := mdMinMax[*gid]; v != nil { if md.Before(v[0]) { v[0] = md } if md.After(v[1]) { v[1] = md } } else { mdMinMax[*gid] = &[2]time.Time{md, md} } newSender := agm.Originator newCountryCode := gid.CountryCode newLanguageCode := common.CCtoLang[gid.CountryCode] newDateIssue := time.Now() newReferenceCode := "ZPG" value, err := strconv.ParseFloat(row[valueIdx], 32) if err != nil { return nil, fmt.Errorf("invalid 'value' line %d: %v", line, err) } newValue := value newDateInfo := newDateIssue newSourceOrganization := newSender agmLines = append(agmLines, newAGMLine( *gid, newCountryCode, newSender, newLanguageCode, newDateIssue, newReferenceCode, md, newValue, newDateInfo, newSourceOrganization, )) } oldGMLines := map[models.Isrs]map[int64]*agmLine{} for gid, minMax := range mdMinMax { oldGMLines[gid], err = getOldGMLines( ctx, selectStmt, gid, minMax[0], minMax[1]) if err != nil { return nil, err } } tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() txInsertStmt := tx.StmtContext(ctx, insertStmt) txTrackStmt := tx.StmtContext(ctx, trackStmt) agmLines: for _, line := range agmLines { var ase *agmSummaryEntry if old := oldGMLines[line.Location]; old != nil { ut := line.MeasureDate.Unix() if o, ok := old[ut]; ok { if !o.hasDiff(line) { // identical // don't delete delete(old, ut) continue agmLines } ase = &agmSummaryEntry{ FKGaugeID: line.Location, MeasureDate: line.MeasureDate, Versions: []*agmLine{o, line}, } } } if ase == nil { ase = &agmSummaryEntry{ FKGaugeID: line.Location, MeasureDate: line.MeasureDate, Versions: []*agmLine{line}, } } var newID int64 if err := txInsertStmt.QueryRowContext( ctx, line.Location.CountryCode, line.Location.LoCode, line.Location.FairwaySection, line.Location.Orc, line.Location.Hectometre, line.MeasureDate.Time, line.CountryCode, line.Sender, line.LanguageCode, line.DateIssue.Time, line.ReferenceCode, line.WaterLevel, line.DateInfo.Time, line.SourceOrganization, ).Scan(&newID); err != nil { return nil, err } if _, err := txTrackStmt.ExecContext( ctx, importID, "waterway.gauge_measurements", newID, false, ); err != nil { return nil, err } entries = append(entries, ase) } var removed int // Issue deletes for _, old := range oldGMLines { removed += len(old) for _, line := range old { if _, err := txTrackStmt.ExecContext( ctx, importID, "waterway.gauge_measurements", line.id, true, ); err != nil { return nil, err } entries = append(entries, &agmSummaryEntry{ FKGaugeID: line.Location, MeasureDate: line.MeasureDate, Versions: []*agmLine{line, nil}, }) } } feedback.Info("Measurements to update/insert: %d", len(entries)) feedback.Info("Measurements to delete: %d", removed) if len(entries) == 0 && removed == 0 { return nil, UnchangedError("No changes from AGM import") } if err = tx.Commit(); err != nil { return nil, fmt.Errorf("commit failed: %v", err) } // Sort here to mix the deletes right beside the matching inserts/updates. // This also makes the output deterministic. sort.Slice(entries, func(i, j int) bool { return entries[i].FKGaugeID.Less(&entries[j].FKGaugeID) }) feedback.Info("Imported %d entries with changes", len(entries)) feedback.Info("Importing approved gauge measurements took %s", time.Since(start)) return entries, nil } func getOldGMLines( ctx context.Context, stmt *sql.Stmt, location models.Isrs, from time.Time, to time.Time, ) (map[int64]*agmLine, error) { var ( oldID int64 oldCountryCode string oldSender string oldLanguageCode string oldDateIssue time.Time oldReferenceCode string oldMeasureDate time.Time oldValue float64 oldDateInfo time.Time oldSourceOrganization string ) gmLines := map[int64]*agmLine{} gms, err := stmt.QueryContext( ctx, location.CountryCode, location.LoCode, location.FairwaySection, location.Orc, location.Hectometre, from, to, ) if err != nil { return nil, err } defer gms.Close() for gms.Next() { if err = gms.Scan( &oldID, &oldCountryCode, &oldSender, &oldLanguageCode, &oldDateIssue, &oldReferenceCode, &oldMeasureDate, &oldValue, &oldDateInfo, &oldSourceOrganization, ); err != nil { return nil, err } line := newAGMLine( location, oldCountryCode, oldSender, oldLanguageCode, oldDateIssue, oldReferenceCode, oldMeasureDate, oldValue, oldDateInfo, oldSourceOrganization, ) line.id = oldID gmLines[oldMeasureDate.Unix()] = line } if err = gms.Err(); err != nil { return nil, err } return gmLines, nil } func newAGMLine( location models.Isrs, countryCode string, sender string, languageCode string, dateIssue time.Time, referenceCode string, measureDate time.Time, waterLevel float64, dateInfo time.Time, sourceOrganization string, ) *agmLine { return &agmLine{ Location: location, CountryCode: countryCode, Sender: sender, LanguageCode: languageCode, DateIssue: timetz{dateIssue}, ReferenceCode: referenceCode, MeasureDate: timetz{measureDate}, WaterLevel: waterLevel, DateInfo: timetz{dateInfo}, SourceOrganization: sourceOrganization, } }