Mercurial > gemma
view pkg/imports/wg.go @ 5560:f2204f91d286
Join the log lines of imports to the log exports to recover data from them.
Used in SR export to extract information that where in the meta json
but now are only found in the log.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 09 Feb 2022 18:34:40 +0100 |
parents | 59a99655f34d |
children | ade07a3f2cfd |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018, 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Tom Gottfried <tom.gottfried@intevation.de> package imports import ( "context" "database/sql" "time" "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/pgxutils" "gemma.intevation.de/gemma/pkg/soap/erdms" ) // WaterwayGauge is a Job to load gauge data from // a specified NTS service and stores them into the database. type WaterwayGauge struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Username is the username used to authenticate. Username string `json:"username"` // Passwort is the password to authenticate. Password string `json:"password"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } // Description gives a short info about relevant facts of this import. func (wg *WaterwayGauge) Description() (string, error) { return wg.URL, nil } // WGJobKind is the unique name of this import job type. const WGJobKind JobKind = "wg" type wgJobCreator struct{} func init() { RegisterJobCreator(WGJobKind, wgJobCreator{}) } func (wgJobCreator) Description() string { return "waterway gauges" } func (wgJobCreator) AutoAccept() bool { return true } func (wgJobCreator) Create() Job { return new(WaterwayGauge) } func (wgJobCreator) Depends() [2][]string { return [2][]string{ {"gauges_reference_water_levels", "gauges"}, {"depth_references"}, } } // StageDone does nothing as there is no staging for gauges. func (wgJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error { return nil } // CleanUp does nothing as there is nothing to cleanup with gauges. func (*WaterwayGauge) CleanUp() error { return nil } const ( eraseObsoleteGaugesSQL = ` UPDATE waterway.gauges SET erased = true, validity = validity - '[now,)' WHERE NOT erased AND (location).country_code = ANY($1) AND isrs_astext(location) <> ALL($2) RETURNING isrs_astext(location) ` eraseGaugeSQL = ` WITH upd AS ( UPDATE waterway.gauges SET erased = true WHERE isrs_astext(location) = $1 AND NOT erased -- Don't touch old entry if new validity contains old: will be updated AND NOT validity <@ $2 RETURNING 1 ) -- Decide whether a new version will be INSERTed SELECT EXISTS(SELECT 1 FROM upd) OR NOT EXISTS(SELECT 1 FROM waterway.gauges WHERE isrs_astext(location) = $1) ` insertGaugeSQL = ` INSERT INTO waterway.gauges ( location, objname, geom, applicability_from_km, applicability_to_km, validity, zero_point, geodref, date_info, source_organization, lastupdate ) VALUES ( ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), $6, ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography, $9, $10, $11, $12, $13, $14, $15, $16 ) ` fixValiditySQL = ` UPDATE waterway.gauges SET -- Set enddate of old entry to new startdate in case of overlap: validity = validity - $2 WHERE isrs_astext(location) = $1 AND validity && $2 AND erased ` updateGaugeSQL = ` UPDATE waterway.gauges SET objname = $6, geom = ST_SetSRID(ST_MakePoint($7, $8), 4326), applicability_from_km = $9, applicability_to_km = $10, zero_point = $11, geodref = $12, date_info = $13, source_organization = $14, lastupdate = $15, validity = $16 WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) AND NOT erased AND $15 > lastupdate RETURNING 1 ` deleteReferenceWaterLevelsSQL = ` DELETE FROM waterway.gauges_reference_water_levels WHERE isrs_astext(location) = $1 AND validity = $2 AND depth_reference <> ALL($3) RETURNING depth_reference ` isNtSDepthRefSQL = ` SELECT EXISTS(SELECT 1 FROM depth_references WHERE depth_reference = $1)` insertReferenceWaterLevelsSQL = ` INSERT INTO waterway.gauges_reference_water_levels ( location, validity, depth_reference, value ) VALUES ( ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), $6, $7, $8 ) ON CONFLICT (location, validity, depth_reference) DO UPDATE SET value = EXCLUDED.value ` ) // Do implements the actual import. func (wg *WaterwayGauge) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() responseData, countries, err := getRisData( ctx, conn, feedback, wg.Username, wg.Password, wg.URL, wg.Insecure, "wtwgag") if err != nil { return nil, err } var eraseGaugeStmt, insertStmt, fixValidityStmt, updateStmt, deleteReferenceWaterLevelsStmt, isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt for _, x := range []struct { sql string stmt **sql.Stmt }{ {eraseGaugeSQL, &eraseGaugeStmt}, {insertGaugeSQL, &insertStmt}, {fixValiditySQL, &fixValidityStmt}, {updateGaugeSQL, &updateStmt}, {deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt}, {isNtSDepthRefSQL, &isNtSDepthRefStmt}, {insertReferenceWaterLevelsSQL, &insertWaterLevelStmt}, } { var err error if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil { return nil, err } defer (*x.stmt).Close() } var gauges []string var unchanged int for _, data := range responseData { for _, dr := range data.RisdataReturn { isrs := string(*dr.RisidxCode) code, err := models.IsrsFromString(isrs) if err != nil { feedback.Warn("Invalid ISRS code '%s': %v", isrs, err) continue } gauges = append(gauges, isrs) feedback.Info("Processing %s", code) // We need a valid, non-empty time range to identify gauge versions if dr.Enddate != nil && dr.Startdate != nil && !time.Time(*dr.Enddate).After(time.Time(*dr.Startdate)) { feedback.Error("End date not after start date") unchanged++ continue } var from, to sql.NullInt64 if dr.Applicabilityfromkm != nil { from = sql.NullInt64{ Int64: int64(*dr.Applicabilityfromkm), Valid: true, } } if dr.Applicabilitytokm != nil { to = sql.NullInt64{ Int64: int64(*dr.Applicabilitytokm), Valid: true, } } var tfrom, tto, dateInfo pgtype.Timestamptz if dr.Startdate != nil { tfrom = pgtype.Timestamptz{ Time: time.Time(*dr.Startdate), Status: pgtype.Present, } } else { tfrom = pgtype.Timestamptz{ Status: pgtype.Null, } } if dr.Enddate != nil { tto = pgtype.Timestamptz{ Time: time.Time(*dr.Enddate), Status: pgtype.Present, } } else { tto = pgtype.Timestamptz{ Status: pgtype.Null, } } validity := pgtype.Tstzrange{ Lower: tfrom, Upper: tto, LowerType: pgtype.Inclusive, UpperType: pgtype.Exclusive, Status: pgtype.Present, } if dr.Infodate != nil { dateInfo = pgtype.Timestamptz{ Time: time.Time(*dr.Infodate), Status: pgtype.Present, } } else { dateInfo = pgtype.Timestamptz{ Status: pgtype.Null, } } var geodref sql.NullString if dr.Geodref != nil { geodref = sql.NullString{ String: string(*dr.Geodref), Valid: true, } } var source sql.NullString if dr.Source != nil { source = sql.NullString{ String: string(*dr.Source), Valid: true, } } tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() // Mark old entry of gauge as erased, if applicable var isNew bool err = tx.StmtContext(ctx, eraseGaugeStmt).QueryRowContext(ctx, code.String(), validity, ).Scan(&isNew) switch { case err != nil: feedback.Error(pgxutils.ReadableError{Err: err}.Error()) if err2 := tx.Rollback(); err2 != nil { return nil, err2 } unchanged++ continue case isNew: // insert gauge version entry if _, err = tx.StmtContext(ctx, insertStmt).ExecContext(ctx, code.CountryCode, code.LoCode, code.FairwaySection, code.Orc, code.Hectometre, dr.Objname.Loc, dr.Lon, dr.Lat, from, to, &validity, dr.Zeropoint, geodref, &dateInfo, source, time.Time(*dr.Lastupdate), ); err != nil { feedback.Error(pgxutils.ReadableError{Err: err}.Error()) if err2 := tx.Rollback(); err2 != nil { return nil, err2 } unchanged++ continue } feedback.Info("insert new version") case !isNew: // try to update var dummy int err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx, code.CountryCode, code.LoCode, code.FairwaySection, code.Orc, code.Hectometre, dr.Objname.Loc, dr.Lon, dr.Lat, from, to, dr.Zeropoint, geodref, &dateInfo, source, time.Time(*dr.Lastupdate), &validity, ).Scan(&dummy) switch { case err2 == sql.ErrNoRows: feedback.Info("unchanged") if err3 := tx.Rollback(); err3 != nil { return nil, err3 } unchanged++ continue case err2 != nil: feedback.Error(pgxutils.ReadableError{Err: err2}.Error()) if err3 := tx.Rollback(); err3 != nil { return nil, err3 } unchanged++ continue default: feedback.Info("update") } // Remove obsolete reference water levels var currLevels pgtype.VarcharArray currLevels.Set([]string{ string(*dr.Reflevel1code), string(*dr.Reflevel2code), string(*dr.Reflevel3code), }) rwls, err := tx.StmtContext(ctx, deleteReferenceWaterLevelsStmt).QueryContext(ctx, code.String(), &validity, &currLevels, ) if err != nil { return nil, err } defer rwls.Close() for rwls.Next() { var delRef string if err := rwls.Scan(&delRef); err != nil { return nil, err } feedback.Warn("Removed reference water level %s from %s", delRef, code) } if err := rwls.Err(); err != nil { return nil, err } } // Set end of validity of old version to start of new version // in case of overlap if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext( ctx, code.String(), &validity, ); err != nil { feedback.Error(pgxutils.ReadableError{Err: err}.Error()) if err2 := tx.Rollback(); err2 != nil { return nil, err2 } unchanged++ continue } // "Upsert" reference water levels for _, wl := range []struct { level **erdms.RisreflevelcodeType value **erdms.RisreflevelvalueType }{ {&dr.Reflevel1code, &dr.Reflevel1value}, {&dr.Reflevel2code, &dr.Reflevel2value}, {&dr.Reflevel3code, &dr.Reflevel3value}, } { if *wl.level == nil || *wl.value == nil { continue } var isNtSDepthRef bool if err := tx.StmtContext( ctx, isNtSDepthRefStmt).QueryRowContext(ctx, string(**wl.level), ).Scan( &isNtSDepthRef, ); err != nil { return nil, err } if !isNtSDepthRef { feedback.Warn( "Reference level code '%s' is not in line "+ "with the NtS reference_code table", string(**wl.level)) } if _, err := tx.StmtContext( ctx, insertWaterLevelStmt).ExecContext(ctx, code.CountryCode, code.LoCode, code.FairwaySection, code.Orc, code.Hectometre, &validity, string(**wl.level), int64(**wl.value), ); err != nil { feedback.Error(pgxutils.ReadableError{Err: err}.Error()) tx.Rollback() continue } } if err = tx.Commit(); err != nil { return nil, err } } } if len(gauges) == 0 { return nil, UnchangedError("No gauges returned from ERDMS") } var pgCountries, pgGauges pgtype.VarcharArray pgCountries.Set(countries) pgGauges.Set(gauges) obsGauges, err := conn.QueryContext(ctx, eraseObsoleteGaugesSQL, &pgCountries, &pgGauges) if err != nil { return nil, err } defer obsGauges.Close() for obsGauges.Next() { var isrs string if err := obsGauges.Scan(&isrs); err != nil { return nil, err } feedback.Info("Erased %s", isrs) unchanged-- } if err := obsGauges.Err(); err != nil { return nil, err } if unchanged == len(gauges) { return nil, UnchangedError("All gauges unchanged") } feedback.Info("Importing gauges took %s", time.Since(start)) return nil, err }