Mercurial > gemma
view pkg/imports/wg.go @ 5591:0011f50cf216 surveysperbottleneckid
Removed no longer used alternative api for surveys/ endpoint.
As bottlenecks in the summary for SR imports are now identified by
their id and no longer by the (not guarantied to be unique!) name,
there is no longer the need to request survey data by the name+date
tuple (which isn't reliable anyway). So the workaround was now
reversed.
author | Sascha Wilde <wilde@sha-bang.de> |
---|---|
date | Wed, 06 Apr 2022 13:30:29 +0200 |
parents | ade07a3f2cfd |
children | e1936db6db8e |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018, 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Tom Gottfried <tom.gottfried@intevation.de> package imports import ( "context" "database/sql" "time" "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/pgxutils" "gemma.intevation.de/gemma/pkg/soap/erdms" ) // WaterwayGauge is a Job to load gauge data from // a specified NTS service and stores them into the database. type WaterwayGauge struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Username is the username used to authenticate. Username string `json:"username"` // Passwort is the password to authenticate. Password string `json:"password"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } // Description gives a short info about relevant facts of this import. func (wg *WaterwayGauge) Description([]string) (string, error) { return wg.URL, nil } // WGJobKind is the unique name of this import job type. const WGJobKind JobKind = "wg" type wgJobCreator struct{} func init() { RegisterJobCreator(WGJobKind, wgJobCreator{}) } func (wgJobCreator) Description() string { return "waterway gauges" } func (wgJobCreator) AutoAccept() bool { return true } func (wgJobCreator) Create() Job { return new(WaterwayGauge) } func (wgJobCreator) Depends() [2][]string { return [2][]string{ {"gauges_reference_water_levels", "gauges"}, {"depth_references"}, } } // StageDone does nothing as there is no staging for gauges. func (wgJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error { return nil } // CleanUp does nothing as there is nothing to cleanup with gauges. func (*WaterwayGauge) CleanUp() error { return nil } const ( eraseObsoleteGaugesSQL = ` UPDATE waterway.gauges SET erased = true, validity = validity - '[now,)' WHERE NOT erased AND (location).country_code = ANY($1) AND isrs_astext(location) <> ALL($2) RETURNING isrs_astext(location) ` eraseGaugeSQL = ` WITH upd AS ( UPDATE waterway.gauges SET erased = true WHERE isrs_astext(location) = $1 AND NOT erased -- Don't touch old entry if new validity contains old: will be updated AND NOT validity <@ $2 RETURNING 1 ) -- Decide whether a new version will be INSERTed SELECT EXISTS(SELECT 1 FROM upd) OR NOT EXISTS(SELECT 1 FROM waterway.gauges WHERE isrs_astext(location) = $1) ` insertGaugeSQL = ` INSERT INTO waterway.gauges ( location, objname, geom, applicability_from_km, applicability_to_km, validity, zero_point, geodref, date_info, source_organization, lastupdate ) VALUES ( ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), $6, ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography, $9, $10, $11, $12, $13, $14, $15, $16 ) ` fixValiditySQL = ` UPDATE waterway.gauges SET -- Set enddate of old entry to new startdate in case of overlap: validity = validity - $2 WHERE isrs_astext(location) = $1 AND validity && $2 AND erased ` updateGaugeSQL = ` UPDATE waterway.gauges SET objname = $6, geom = ST_SetSRID(ST_MakePoint($7, $8), 4326), applicability_from_km = $9, applicability_to_km = $10, zero_point = $11, geodref = $12, date_info = $13, source_organization = $14, lastupdate = $15, validity = $16 WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) AND NOT erased AND $15 > lastupdate RETURNING 1 ` deleteReferenceWaterLevelsSQL = ` DELETE FROM waterway.gauges_reference_water_levels WHERE isrs_astext(location) = $1 AND validity = $2 AND depth_reference <> ALL($3) RETURNING depth_reference ` isNtSDepthRefSQL = ` SELECT EXISTS(SELECT 1 FROM depth_references WHERE depth_reference = $1)` insertReferenceWaterLevelsSQL = ` INSERT INTO waterway.gauges_reference_water_levels ( location, validity, depth_reference, value ) VALUES ( ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), $6, $7, $8 ) ON CONFLICT (location, validity, depth_reference) DO UPDATE SET value = EXCLUDED.value ` ) // Do implements the actual import. func (wg *WaterwayGauge) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() responseData, countries, err := getRisData( ctx, conn, feedback, wg.Username, wg.Password, wg.URL, wg.Insecure, "wtwgag") if err != nil { return nil, err } var eraseGaugeStmt, insertStmt, fixValidityStmt, updateStmt, deleteReferenceWaterLevelsStmt, isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt for _, x := range []struct { sql string stmt **sql.Stmt }{ {eraseGaugeSQL, &eraseGaugeStmt}, {insertGaugeSQL, &insertStmt}, {fixValiditySQL, &fixValidityStmt}, {updateGaugeSQL, &updateStmt}, {deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt}, {isNtSDepthRefSQL, &isNtSDepthRefStmt}, {insertReferenceWaterLevelsSQL, &insertWaterLevelStmt}, } { var err error if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil { return nil, err } defer (*x.stmt).Close() } var gauges []string var unchanged int for _, data := range responseData { for _, dr := range data.RisdataReturn { isrs := string(*dr.RisidxCode) code, err := models.IsrsFromString(isrs) if err != nil { feedback.Warn("Invalid ISRS code '%s': %v", isrs, err) continue } gauges = append(gauges, isrs) feedback.Info("Processing %s", code) // We need a valid, non-empty time range to identify gauge versions if dr.Enddate != nil && dr.Startdate != nil && !time.Time(*dr.Enddate).After(time.Time(*dr.Startdate)) { feedback.Error("End date not after start date") unchanged++ continue } var from, to sql.NullInt64 if dr.Applicabilityfromkm != nil { from = sql.NullInt64{ Int64: int64(*dr.Applicabilityfromkm), Valid: true, } } if dr.Applicabilitytokm != nil { to = sql.NullInt64{ Int64: int64(*dr.Applicabilitytokm), Valid: true, } } var tfrom, tto, dateInfo pgtype.Timestamptz if dr.Startdate != nil { tfrom = pgtype.Timestamptz{ Time: time.Time(*dr.Startdate), Status: pgtype.Present, } } else { tfrom = pgtype.Timestamptz{ Status: pgtype.Null, } } if dr.Enddate != nil { tto = pgtype.Timestamptz{ Time: time.Time(*dr.Enddate), Status: pgtype.Present, } } else { tto = pgtype.Timestamptz{ Status: pgtype.Null, } } validity := pgtype.Tstzrange{ Lower: tfrom, Upper: tto, LowerType: pgtype.Inclusive, UpperType: pgtype.Exclusive, Status: pgtype.Present, } if dr.Infodate != nil { dateInfo = pgtype.Timestamptz{ Time: time.Time(*dr.Infodate), Status: pgtype.Present, } } else { dateInfo = pgtype.Timestamptz{ Status: pgtype.Null, } } var geodref sql.NullString if dr.Geodref != nil { geodref = sql.NullString{ String: string(*dr.Geodref), Valid: true, } } var source sql.NullString if dr.Source != nil { source = sql.NullString{ String: string(*dr.Source), Valid: true, } } tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() // Mark old entry of gauge as erased, if applicable var isNew bool err = tx.StmtContext(ctx, eraseGaugeStmt).QueryRowContext(ctx, code.String(), validity, ).Scan(&isNew) switch { case err != nil: feedback.Error(pgxutils.ReadableError{Err: err}.Error()) if err2 := tx.Rollback(); err2 != nil { return nil, err2 } unchanged++ continue case isNew: // insert gauge version entry if _, err = tx.StmtContext(ctx, insertStmt).ExecContext(ctx, code.CountryCode, code.LoCode, code.FairwaySection, code.Orc, code.Hectometre, dr.Objname.Loc, dr.Lon, dr.Lat, from, to, &validity, dr.Zeropoint, geodref, &dateInfo, source, time.Time(*dr.Lastupdate), ); err != nil { feedback.Error(pgxutils.ReadableError{Err: err}.Error()) if err2 := tx.Rollback(); err2 != nil { return nil, err2 } unchanged++ continue } feedback.Info("insert new version") case !isNew: // try to update var dummy int err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx, code.CountryCode, code.LoCode, code.FairwaySection, code.Orc, code.Hectometre, dr.Objname.Loc, dr.Lon, dr.Lat, from, to, dr.Zeropoint, geodref, &dateInfo, source, time.Time(*dr.Lastupdate), &validity, ).Scan(&dummy) switch { case err2 == sql.ErrNoRows: feedback.Info("unchanged") if err3 := tx.Rollback(); err3 != nil { return nil, err3 } unchanged++ continue case err2 != nil: feedback.Error(pgxutils.ReadableError{Err: err2}.Error()) if err3 := tx.Rollback(); err3 != nil { return nil, err3 } unchanged++ continue default: feedback.Info("update") } // Remove obsolete reference water levels var currLevels pgtype.VarcharArray currLevels.Set([]string{ string(*dr.Reflevel1code), string(*dr.Reflevel2code), string(*dr.Reflevel3code), }) rwls, err := tx.StmtContext(ctx, deleteReferenceWaterLevelsStmt).QueryContext(ctx, code.String(), &validity, &currLevels, ) if err != nil { return nil, err } defer rwls.Close() for rwls.Next() { var delRef string if err := rwls.Scan(&delRef); err != nil { return nil, err } feedback.Warn("Removed reference water level %s from %s", delRef, code) } if err := rwls.Err(); err != nil { return nil, err } } // Set end of validity of old version to start of new version // in case of overlap if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext( ctx, code.String(), &validity, ); err != nil { feedback.Error(pgxutils.ReadableError{Err: err}.Error()) if err2 := tx.Rollback(); err2 != nil { return nil, err2 } unchanged++ continue } // "Upsert" reference water levels for _, wl := range []struct { level **erdms.RisreflevelcodeType value **erdms.RisreflevelvalueType }{ {&dr.Reflevel1code, &dr.Reflevel1value}, {&dr.Reflevel2code, &dr.Reflevel2value}, {&dr.Reflevel3code, &dr.Reflevel3value}, } { if *wl.level == nil || *wl.value == nil { continue } var isNtSDepthRef bool if err := tx.StmtContext( ctx, isNtSDepthRefStmt).QueryRowContext(ctx, string(**wl.level), ).Scan( &isNtSDepthRef, ); err != nil { return nil, err } if !isNtSDepthRef { feedback.Warn( "Reference level code '%s' is not in line "+ "with the NtS reference_code table", string(**wl.level)) } if _, err := tx.StmtContext( ctx, insertWaterLevelStmt).ExecContext(ctx, code.CountryCode, code.LoCode, code.FairwaySection, code.Orc, code.Hectometre, &validity, string(**wl.level), int64(**wl.value), ); err != nil { feedback.Error(pgxutils.ReadableError{Err: err}.Error()) tx.Rollback() continue } } if err = tx.Commit(); err != nil { return nil, err } } } if len(gauges) == 0 { return nil, UnchangedError("No gauges returned from ERDMS") } var pgCountries, pgGauges pgtype.VarcharArray pgCountries.Set(countries) pgGauges.Set(gauges) obsGauges, err := conn.QueryContext(ctx, eraseObsoleteGaugesSQL, &pgCountries, &pgGauges) if err != nil { return nil, err } defer obsGauges.Close() for obsGauges.Next() { var isrs string if err := obsGauges.Scan(&isrs); err != nil { return nil, err } feedback.Info("Erased %s", isrs) unchanged-- } if err := obsGauges.Err(); err != nil { return nil, err } if unchanged == len(gauges) { return nil, UnchangedError("All gauges unchanged") } feedback.Info("Importing gauges took %s", time.Since(start)) return nil, err }