Mercurial > gemma
view pkg/imports/wg.go @ 4685:7a9388943840
morphology: Clip class breaks again Z min and max of the the height model.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 16 Oct 2019 11:07:59 +0200 |
parents | ca7f9c56697a |
children | ca6a5f722471 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018, 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Tom Gottfried <tom.gottfried@intevation.de> package imports import ( "context" "database/sql" "time" "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/pgxutils" "gemma.intevation.de/gemma/pkg/soap/erdms" ) // WaterwayGauge is a Job to load gauge data from // a specified NTS service and stores them into the database. type WaterwayGauge struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Username is the username used to authenticate. Username string `json:"username"` // Passwort is the password to authenticate. Password string `json:"password"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } // WGJobKind is the unique name of this import job type. const WGJobKind JobKind = "wg" type wgJobCreator struct{} func init() { RegisterJobCreator(WGJobKind, wgJobCreator{}) } func (wgJobCreator) Description() string { return "waterway gauges" } func (wgJobCreator) AutoAccept() bool { return true } func (wgJobCreator) Create() Job { return new(WaterwayGauge) } func (wgJobCreator) Depends() [2][]string { return [2][]string{ {"gauges_reference_water_levels", "gauges"}, {"depth_references"}, } } // StageDone does nothing as there is no staging for gauges. func (wgJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil } // CleanUp does nothing as there is nothing to cleanup with gauges. func (*WaterwayGauge) CleanUp() error { return nil } const ( eraseObsoleteGaugesSQL = ` UPDATE waterway.gauges SET erased = true, validity = validity - '[now,)' WHERE NOT erased AND (location).country_code = ANY($1) AND isrs_astext(location) <> ALL($2) RETURNING isrs_astext(location) ` eraseGaugeSQL = ` WITH upd AS ( UPDATE waterway.gauges SET erased = true WHERE isrs_astext(location) = $1 AND NOT erased -- Don't touch old entry if new validity contains old: will be updated AND NOT validity <@ $2 RETURNING 1 ) -- Decide whether a new version will be INSERTed SELECT EXISTS(SELECT 1 FROM upd) OR NOT EXISTS(SELECT 1 FROM waterway.gauges WHERE isrs_astext(location) = $1) ` insertGaugeSQL = ` INSERT INTO waterway.gauges ( location, objname, geom, applicability_from_km, applicability_to_km, validity, zero_point, geodref, date_info, source_organization, lastupdate ) VALUES ( ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), $6, ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography, $9, $10, $11, $12, $13, $14, $15, $16 ) ` fixValiditySQL = ` UPDATE waterway.gauges SET -- Set enddate of old entry to new startdate in case of overlap: validity = validity - $2 WHERE isrs_astext(location) = $1 AND validity && $2 AND erased ` updateGaugeSQL = ` UPDATE waterway.gauges SET objname = $6, geom = ST_SetSRID(ST_MakePoint($7, $8), 4326), applicability_from_km = $9, applicability_to_km = $10, zero_point = $11, geodref = $12, date_info = $13, source_organization = $14, lastupdate = $15, validity = $16 WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) AND NOT erased AND $15 > lastupdate RETURNING 1 ` deleteReferenceWaterLevelsSQL = ` DELETE FROM waterway.gauges_reference_water_levels WHERE isrs_astext(location) = $1 AND validity = $2 AND depth_reference <> ALL($3) RETURNING depth_reference ` isNtSDepthRefSQL = ` SELECT EXISTS(SELECT 1 FROM depth_references WHERE depth_reference = $1)` insertReferenceWaterLevelsSQL = ` INSERT INTO waterway.gauges_reference_water_levels ( location, validity, depth_reference, value ) VALUES ( ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), $6, $7, $8 ) ON CONFLICT (location, validity, depth_reference) DO UPDATE SET value = EXCLUDED.value ` ) // Do implements the actual import. func (wg *WaterwayGauge) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() responseData, countries, err := getRisData( ctx, conn, feedback, wg.Username, wg.Password, wg.URL, wg.Insecure, "wtwgag") if err != nil { return nil, err } var eraseGaugeStmt, insertStmt, fixValidityStmt, updateStmt, deleteReferenceWaterLevelsStmt, isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt for _, x := range []struct { sql string stmt **sql.Stmt }{ {eraseGaugeSQL, &eraseGaugeStmt}, {insertGaugeSQL, &insertStmt}, {fixValiditySQL, &fixValidityStmt}, {updateGaugeSQL, &updateStmt}, {deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt}, {isNtSDepthRefSQL, &isNtSDepthRefStmt}, {insertReferenceWaterLevelsSQL, &insertWaterLevelStmt}, } { var err error if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil { return nil, err } defer (*x.stmt).Close() } var gauges []string var unchanged int for _, data := range responseData { for _, dr := range data.RisdataReturn { isrs := string(*dr.RisidxCode) code, err := models.IsrsFromString(isrs) if err != nil { feedback.Warn("Invalid ISRS code '%s': %v", isrs, err) continue } gauges = append(gauges, isrs) feedback.Info("Processing %s", code) // We need a valid, non-empty time range to identify gauge versions if dr.Enddate != nil && dr.Startdate != nil && !time.Time(*dr.Enddate).After(time.Time(*dr.Startdate)) { feedback.Error("End date not after start date") unchanged++ continue } var from, to sql.NullInt64 if dr.Applicabilityfromkm != nil { from = sql.NullInt64{ Int64: int64(*dr.Applicabilityfromkm), Valid: true, } } if dr.Applicabilitytokm != nil { to = sql.NullInt64{ Int64: int64(*dr.Applicabilitytokm), Valid: true, } } var tfrom, tto, dateInfo pgtype.Timestamptz if dr.Startdate != nil { tfrom = pgtype.Timestamptz{ Time: time.Time(*dr.Startdate), Status: pgtype.Present, } } else { tfrom = pgtype.Timestamptz{ Status: pgtype.Null, } } if dr.Enddate != nil { tto = pgtype.Timestamptz{ Time: time.Time(*dr.Enddate), Status: pgtype.Present, } } else { tto = pgtype.Timestamptz{ Status: pgtype.Null, } } validity := pgtype.Tstzrange{ Lower: tfrom, Upper: tto, LowerType: pgtype.Inclusive, UpperType: pgtype.Exclusive, Status: pgtype.Present, } if dr.Infodate != nil { dateInfo = pgtype.Timestamptz{ Time: time.Time(*dr.Infodate), Status: pgtype.Present, } } else { dateInfo = pgtype.Timestamptz{ Status: pgtype.Null, } } var geodref sql.NullString if dr.Geodref != nil { geodref = sql.NullString{ String: string(*dr.Geodref), Valid: true, } } var source sql.NullString if dr.Source != nil { source = sql.NullString{ String: string(*dr.Source), Valid: true, } } tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() // Mark old entry of gauge as erased, if applicable var isNew bool err = tx.StmtContext(ctx, eraseGaugeStmt).QueryRowContext(ctx, code.String(), validity, ).Scan(&isNew) switch { case err != nil: feedback.Error(pgxutils.ReadableError{Err: err}.Error()) if err2 := tx.Rollback(); err2 != nil { return nil, err2 } unchanged++ continue case isNew: // insert gauge version entry if _, err = tx.StmtContext(ctx, insertStmt).ExecContext(ctx, code.CountryCode, code.LoCode, code.FairwaySection, code.Orc, code.Hectometre, dr.Objname.Loc, dr.Lon, dr.Lat, from, to, &validity, dr.Zeropoint, geodref, &dateInfo, source, time.Time(*dr.Lastupdate), ); err != nil { feedback.Error(pgxutils.ReadableError{Err: err}.Error()) if err2 := tx.Rollback(); err2 != nil { return nil, err2 } unchanged++ continue } feedback.Info("insert new version") case !isNew: // try to update var dummy int err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx, code.CountryCode, code.LoCode, code.FairwaySection, code.Orc, code.Hectometre, dr.Objname.Loc, dr.Lon, dr.Lat, from, to, dr.Zeropoint, geodref, &dateInfo, source, time.Time(*dr.Lastupdate), &validity, ).Scan(&dummy) switch { case err2 == sql.ErrNoRows: feedback.Info("unchanged") if err3 := tx.Rollback(); err3 != nil { return nil, err3 } unchanged++ continue case err2 != nil: feedback.Error(pgxutils.ReadableError{Err: err2}.Error()) if err3 := tx.Rollback(); err3 != nil { return nil, err3 } unchanged++ continue default: feedback.Info("update") } // Remove obsolete reference water levels var currLevels pgtype.VarcharArray currLevels.Set([]string{ string(*dr.Reflevel1code), string(*dr.Reflevel2code), string(*dr.Reflevel3code), }) rwls, err := tx.StmtContext(ctx, deleteReferenceWaterLevelsStmt).QueryContext(ctx, code.String(), &validity, &currLevels, ) if err != nil { return nil, err } defer rwls.Close() for rwls.Next() { var delRef string if err := rwls.Scan(&delRef); err != nil { return nil, err } feedback.Warn("Removed reference water level %s from %s", delRef, code) } if err := rwls.Err(); err != nil { return nil, err } } // Set end of validity of old version to start of new version // in case of overlap if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext( ctx, code.String(), &validity, ); err != nil { feedback.Error(pgxutils.ReadableError{Err: err}.Error()) if err2 := tx.Rollback(); err2 != nil { return nil, err2 } unchanged++ continue } // "Upsert" reference water levels for _, wl := range []struct { level **erdms.RisreflevelcodeType value **erdms.RisreflevelvalueType }{ {&dr.Reflevel1code, &dr.Reflevel1value}, {&dr.Reflevel2code, &dr.Reflevel2value}, {&dr.Reflevel3code, &dr.Reflevel3value}, } { if *wl.level == nil || *wl.value == nil { continue } var isNtSDepthRef bool if err := tx.StmtContext( ctx, isNtSDepthRefStmt).QueryRowContext(ctx, string(**wl.level), ).Scan( &isNtSDepthRef, ); err != nil { return nil, err } if !isNtSDepthRef { feedback.Warn( "Reference level code '%s' is not in line "+ "with the NtS reference_code table", string(**wl.level)) } if _, err := tx.StmtContext( ctx, insertWaterLevelStmt).ExecContext(ctx, code.CountryCode, code.LoCode, code.FairwaySection, code.Orc, code.Hectometre, &validity, string(**wl.level), int64(**wl.value), ); err != nil { feedback.Error(pgxutils.ReadableError{Err: err}.Error()) tx.Rollback() continue } } if err = tx.Commit(); err != nil { return nil, err } } } if len(gauges) == 0 { return nil, UnchangedError("No gauges returned from ERDMS") } var pgCountries, pgGauges pgtype.VarcharArray pgCountries.Set(countries) pgGauges.Set(gauges) obsGauges, err := conn.QueryContext(ctx, eraseObsoleteGaugesSQL, &pgCountries, &pgGauges) if err != nil { return nil, err } defer obsGauges.Close() for obsGauges.Next() { var isrs string if err := obsGauges.Scan(&isrs); err != nil { return nil, err } feedback.Info("Erased %s", isrs) unchanged-- } if err := obsGauges.Err(); err != nil { return nil, err } if unchanged == len(gauges) { return nil, UnchangedError("All gauges unchanged") } feedback.Info("Importing gauges took %s", time.Since(start)) return nil, err }