Mercurial > gemma
view pkg/imports/wg.go @ 3678:8f58851927c0
client: make layer factory only return new layer config for individual maps
instead of each time it is invoked. The purpose of the factory was to support multiple maps with individual layers.
But returning a new config each time it is invoked leads to bugs that rely on the layer's state. Now this factory
reuses the same objects it created before, per map.
author | Markus Kottlaender <markus@intevation.de> |
---|---|
date | Mon, 17 Jun 2019 17:31:35 +0200 |
parents | 29ef6d41e4af |
children | 5fed2f5bc104 6c5c15b2fb64 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018, 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Tom Gottfried <tom.gottfried@intevation.de> package imports import ( "context" "database/sql" "time" "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/soap/erdms" ) type WaterwayGauge struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Username is the username used to authenticate. Username string `json:"username"` // Passwort is the password to authenticate. Password string `json:"password"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } const WGJobKind JobKind = "wg" type wgJobCreator struct{} func init() { RegisterJobCreator(WGJobKind, wgJobCreator{}) } func (wgJobCreator) Description() string { return "waterway gauges" } func (wgJobCreator) AutoAccept() bool { return true } func (wgJobCreator) Create() Job { return new(WaterwayGauge) } func (wgJobCreator) Depends() [2][]string { return [2][]string{ {"gauges_reference_water_levels", "gauges"}, {"depth_references"}, } } // StageDone does nothing as there is no staging for gauges. func (wgJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil } // CleanUp does nothing as there is nothing to cleanup with gauges. func (*WaterwayGauge) CleanUp() error { return nil } const ( eraseObsoleteGaugesSQL = ` UPDATE waterway.gauges SET erased = true WHERE NOT erased AND (location).country_code = ANY($1) AND isrs_astext(location) <> ALL($2) RETURNING isrs_astext(location) ` eraseGaugeSQL = ` WITH upd AS ( UPDATE waterway.gauges SET erased = true WHERE isrs_astext(location) = $1 AND NOT erased -- Don't touch old entry if new validity contains old: will be updated AND NOT validity <@ $2 RETURNING 1 ) -- Decide whether a new version will be INSERTed SELECT EXISTS(SELECT 1 FROM upd) OR NOT EXISTS(SELECT 1 FROM waterway.gauges WHERE isrs_astext(location) = $1) ` insertGaugeSQL = ` INSERT INTO waterway.gauges ( location, objname, geom, applicability_from_km, applicability_to_km, validity, zero_point, geodref, date_info, source_organization, lastupdate ) VALUES ( ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), $6, ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography, $9, $10, $11, $12, $13, $14, $15, $16 ) ` fixValiditySQL = ` UPDATE waterway.gauges SET -- Set enddate of old entry to new startdate in case of overlap: validity = validity - $2 WHERE isrs_astext(location) = $1 AND validity && $2 AND erased ` updateGaugeSQL = ` UPDATE waterway.gauges SET objname = $6, geom = ST_SetSRID(ST_MakePoint($7, $8), 4326), applicability_from_km = $9, applicability_to_km = $10, zero_point = $11, geodref = $12, date_info = $13, source_organization = $14, lastupdate = $15, validity = $16 WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) AND NOT erased AND $15 > lastupdate RETURNING 1 ` deleteReferenceWaterLevelsSQL = ` DELETE FROM waterway.gauges_reference_water_levels WHERE isrs_astext(location) = $1 AND validity = $2 AND depth_reference <> ALL($3) RETURNING depth_reference ` isNtSDepthRefSQL = ` SELECT EXISTS(SELECT 1 FROM depth_references WHERE depth_reference = $1)` insertReferenceWaterLevelsSQL = ` INSERT INTO waterway.gauges_reference_water_levels ( location, validity, depth_reference, value ) VALUES ( ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), $6, $7, $8 ) ON CONFLICT (location, validity, depth_reference) DO UPDATE SET value = EXCLUDED.value ` ) func (wg *WaterwayGauge) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() responseData, countries, err := getRisData( ctx, conn, feedback, wg.Username, wg.Password, wg.URL, wg.Insecure, "wtwgag") if err != nil { return nil, err } var eraseGaugeStmt, insertStmt, fixValidityStmt, updateStmt, deleteReferenceWaterLevelsStmt, isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt for _, x := range []struct { sql string stmt **sql.Stmt }{ {eraseGaugeSQL, &eraseGaugeStmt}, {insertGaugeSQL, &insertStmt}, {fixValiditySQL, &fixValidityStmt}, {updateGaugeSQL, &updateStmt}, {deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt}, {isNtSDepthRefSQL, &isNtSDepthRefStmt}, {insertReferenceWaterLevelsSQL, &insertWaterLevelStmt}, } { var err error if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil { return nil, err } defer (*x.stmt).Close() } var gauges []string var unchanged int for _, data := range responseData { for _, dr := range data.RisdataReturn { isrs := string(*dr.RisidxCode) code, err := models.IsrsFromString(isrs) if err != nil { feedback.Warn("Invalid ISRS code '%s': %v", isrs, err) continue } gauges = append(gauges, isrs) feedback.Info("Processing %s", code) // We need a valid, non-empty time range to identify gauge versions if dr.Enddate != nil && dr.Startdate != nil && !time.Time(*dr.Enddate).After(time.Time(*dr.Startdate)) { feedback.Warn("End date not after start date") unchanged++ continue } var from, to sql.NullInt64 if dr.Applicabilityfromkm != nil { from = sql.NullInt64{ Int64: int64(*dr.Applicabilityfromkm), Valid: true, } } if dr.Applicabilitytokm != nil { to = sql.NullInt64{ Int64: int64(*dr.Applicabilitytokm), Valid: true, } } var tfrom, tto, dateInfo pgtype.Timestamptz if dr.Startdate != nil { tfrom = pgtype.Timestamptz{ Time: time.Time(*dr.Startdate), Status: pgtype.Present, } } else { tfrom = pgtype.Timestamptz{ Status: pgtype.Null, } } if dr.Enddate != nil { tto = pgtype.Timestamptz{ Time: time.Time(*dr.Enddate), Status: pgtype.Present, } } else { tto = pgtype.Timestamptz{ Status: pgtype.Null, } } validity := pgtype.Tstzrange{ Lower: tfrom, Upper: tto, LowerType: pgtype.Inclusive, UpperType: pgtype.Exclusive, Status: pgtype.Present, } if dr.Infodate != nil { dateInfo = pgtype.Timestamptz{ Time: time.Time(*dr.Infodate), Status: pgtype.Present, } } else { dateInfo = pgtype.Timestamptz{ Status: pgtype.Null, } } var geodref sql.NullString if dr.Geodref != nil { geodref = sql.NullString{ String: string(*dr.Geodref), Valid: true, } } var source sql.NullString if dr.Source != nil { source = sql.NullString{ String: string(*dr.Source), Valid: true, } } tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() // Mark old entry of gauge as erased, if applicable var isNew bool err = tx.StmtContext(ctx, eraseGaugeStmt).QueryRowContext(ctx, code.String(), validity, ).Scan(&isNew) switch { case err != nil: feedback.Warn(handleError(err).Error()) if err2 := tx.Rollback(); err2 != nil { return nil, err2 } unchanged++ continue case isNew: // insert gauge version entry if _, err = tx.StmtContext(ctx, insertStmt).ExecContext(ctx, code.CountryCode, code.LoCode, code.FairwaySection, code.Orc, code.Hectometre, dr.Objname.Loc, dr.Lon, dr.Lat, from, to, &validity, dr.Zeropoint, geodref, &dateInfo, source, time.Time(*dr.Lastupdate), ); err != nil { feedback.Warn(handleError(err).Error()) if err2 := tx.Rollback(); err2 != nil { return nil, err2 } unchanged++ continue } feedback.Info("insert new version") case !isNew: // try to update var dummy int err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx, code.CountryCode, code.LoCode, code.FairwaySection, code.Orc, code.Hectometre, dr.Objname.Loc, dr.Lon, dr.Lat, from, to, dr.Zeropoint, geodref, &dateInfo, source, time.Time(*dr.Lastupdate), &validity, ).Scan(&dummy) switch { case err2 == sql.ErrNoRows: feedback.Info("unchanged") if err3 := tx.Rollback(); err3 != nil { return nil, err3 } unchanged++ continue case err2 != nil: feedback.Warn(handleError(err2).Error()) if err3 := tx.Rollback(); err3 != nil { return nil, err3 } unchanged++ continue default: feedback.Info("update") } // Remove obsolete reference water levels var currLevels pgtype.VarcharArray currLevels.Set([]string{ string(*dr.Reflevel1code), string(*dr.Reflevel2code), string(*dr.Reflevel3code), }) rwls, err := tx.StmtContext(ctx, deleteReferenceWaterLevelsStmt).QueryContext(ctx, code.String(), &validity, &currLevels, ) if err != nil { return nil, err } defer rwls.Close() for rwls.Next() { var delRef string if err := rwls.Scan(&delRef); err != nil { return nil, err } feedback.Warn("Removed reference water level %s from %s", delRef, code) } if err := rwls.Err(); err != nil { return nil, err } } // Set end of validity of old version to start of new version // in case of overlap if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext( ctx, code.String(), &validity, ); err != nil { feedback.Warn(handleError(err).Error()) if err2 := tx.Rollback(); err2 != nil { return nil, err2 } unchanged++ continue } // "Upsert" reference water levels for _, wl := range []struct { level **erdms.RisreflevelcodeType value **erdms.RisreflevelvalueType }{ {&dr.Reflevel1code, &dr.Reflevel1value}, {&dr.Reflevel2code, &dr.Reflevel2value}, {&dr.Reflevel3code, &dr.Reflevel3value}, } { if *wl.level == nil || *wl.value == nil { continue } var isNtSDepthRef bool if err := tx.StmtContext( ctx, isNtSDepthRefStmt).QueryRowContext(ctx, string(**wl.level), ).Scan( &isNtSDepthRef, ); err != nil { return nil, err } if !isNtSDepthRef { feedback.Warn( "Reference level code '%s' is not in line "+ "with the NtS reference_code table", string(**wl.level)) } if _, err := tx.StmtContext( ctx, insertWaterLevelStmt).ExecContext(ctx, code.CountryCode, code.LoCode, code.FairwaySection, code.Orc, code.Hectometre, &validity, string(**wl.level), int64(**wl.value), ); err != nil { feedback.Warn(handleError(err).Error()) tx.Rollback() continue } } if err = tx.Commit(); err != nil { return nil, err } } } if len(gauges) == 0 { return nil, UnchangedError("No gauges returned from ERDMS") } var pgCountries, pgGauges pgtype.VarcharArray pgCountries.Set(countries) pgGauges.Set(gauges) obsGauges, err := conn.QueryContext(ctx, eraseObsoleteGaugesSQL, &pgCountries, &pgGauges) if err != nil { return nil, err } defer obsGauges.Close() for obsGauges.Next() { var isrs string if err := obsGauges.Scan(&isrs); err != nil { return nil, err } feedback.Info("Erased %s", isrs) unchanged-- } if err := obsGauges.Err(); err != nil { return nil, err } if unchanged == len(gauges) { return nil, UnchangedError("All gauges unchanged") } feedback.Info("Importing gauges took %s", time.Since(start)) return nil, err }