Mercurial > gemma
view pkg/imports/wg.go @ 3219:4acbee65275d
Import queue: Split locked dependencies in exclusively and multiple uses.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Thu, 09 May 2019 12:49:53 +0200 |
parents | d9903cb34842 |
children | e640f51b5a4e |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018, 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Tom Gottfried <tom.gottfried@intevation.de> package imports import ( "context" "database/sql" "time" "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/soap/erdms" ) type WaterwayGauge struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Username is the username used to authenticate. Username string `json:"username"` // Passwort is the password to authenticate. Password string `json:"password"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } const WGJobKind JobKind = "wg" type wgJobCreator struct{} func init() { RegisterJobCreator(WGJobKind, wgJobCreator{}) } func (wgJobCreator) Description() string { return "waterway gauges" } func (wgJobCreator) AutoAccept() bool { return true } func (wgJobCreator) Create() Job { return new(WaterwayGauge) } func (wgJobCreator) Depends() [2][]string { return [2][]string{ {"gauges_reference_water_levels", "gauges"}, {"depth_references"}, } } // StageDone does nothing as there is no staging for gauges. func (wgJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil } // CleanUp does nothing as there is nothing to cleanup with gauges. func (*WaterwayGauge) CleanUp() error { return nil } const ( deleteReferenceWaterLevelsSQL = ` DELETE FROM waterway.gauges_reference_water_levels WHERE gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) AND depth_reference <> ALL($6) RETURNING depth_reference ` insertGaugeSQL = ` INSERT INTO waterway.gauges ( location, objname, geom, applicability_from_km, applicability_to_km, validity, zero_point, geodref, date_info, source_organization ) VALUES ( ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), $6, ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography, $9, $10, $11, $12, $13, $14, $15 ) ON CONFLICT (location) DO UPDATE SET objname = EXCLUDED.objname, geom = EXCLUDED.geom, applicability_from_km = EXCLUDED.applicability_from_km, applicability_to_km = EXCLUDED.applicability_to_km, validity = EXCLUDED.validity, zero_point = EXCLUDED.zero_point, geodref = EXCLUDED.geodref, date_info = EXCLUDED.date_info, source_organization = EXCLUDED.source_organization ` isNtSDepthRefSQL = ` SELECT EXISTS(SELECT 1 FROM depth_references WHERE depth_reference = $1)` insertReferenceWaterLevelsSQL = ` INSERT INTO waterway.gauges_reference_water_levels ( gauge_id, depth_reference, value ) VALUES ( ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), $6, $7 ) ON CONFLICT (gauge_id, depth_reference) DO UPDATE SET value = EXCLUDED.value ` ) func (wg *WaterwayGauge) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() responseData, err := getRisData( ctx, conn, feedback, wg.Username, wg.Password, wg.URL, wg.Insecure, "wtwgag") if err != nil { return nil, err } var ignored int type idxCode struct { jdx int idx int code *models.Isrs } var gauges []idxCode for j, data := range responseData { for i, dr := range data.RisdataReturn { if dr.RisidxCode == nil { ignored++ continue } code, err := models.IsrsFromString(string(*dr.RisidxCode)) if err != nil { feedback.Warn("invalid ISRS code %v", err) ignored++ continue } if dr.Objname.Loc == nil { feedback.Warn("missing objname: %s", code) ignored++ continue } if dr.Lat == nil || dr.Lon == nil { feedback.Warn("missing lat/lon: %s", code) ignored++ continue } if dr.Zeropoint == nil { feedback.Warn("missing zeropoint: %s", code) ignored++ continue } gauges = append(gauges, idxCode{jdx: j, idx: i, code: code}) } } feedback.Info("ignored gauges: %d", ignored) feedback.Info("insert/update gauges: %d", len(gauges)) if len(gauges) == 0 { return nil, UnchangedError("nothing to do") } // insert/update the gauges var insertStmt, deleteReferenceWaterLevelsStmt, isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt for _, x := range []struct { sql string stmt **sql.Stmt }{ {insertGaugeSQL, &insertStmt}, {deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt}, {isNtSDepthRefSQL, &isNtSDepthRefStmt}, {insertReferenceWaterLevelsSQL, &insertWaterLevelStmt}, } { var err error if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil { return nil, err } defer (*x.stmt).Close() } for i := range gauges { ic := &gauges[i] dr := responseData[ic.jdx].RisdataReturn[ic.idx] feedback.Info("insert/update %s", ic.code) var from, to sql.NullInt64 if dr.Applicabilityfromkm != nil { from = sql.NullInt64{ Int64: int64(*dr.Applicabilityfromkm), Valid: true, } } if dr.Applicabilitytokm != nil { to = sql.NullInt64{ Int64: int64(*dr.Applicabilitytokm), Valid: true, } } var tfrom, tto, dateInfo pgtype.Timestamptz if dr.Startdate != nil { tfrom = pgtype.Timestamptz{ Time: time.Time(*dr.Startdate), Status: pgtype.Present, } } else { tfrom = pgtype.Timestamptz{ Status: pgtype.Null, } } if dr.Enddate != nil { tto = pgtype.Timestamptz{ Time: time.Time(*dr.Enddate), Status: pgtype.Present, } } else { tto = pgtype.Timestamptz{ Status: pgtype.Null, } } validity := pgtype.Tstzrange{ Lower: tfrom, Upper: tto, LowerType: pgtype.Inclusive, UpperType: pgtype.Inclusive, Status: pgtype.Present, } if dr.Infodate != nil { dateInfo = pgtype.Timestamptz{ Time: time.Time(*dr.Infodate), Status: pgtype.Present, } } else { dateInfo = pgtype.Timestamptz{ Status: pgtype.Null, } } var geodref sql.NullString if dr.Geodref != nil { geodref = sql.NullString{ String: string(*dr.Geodref), Valid: true, } } var source sql.NullString if dr.Source != nil { source = sql.NullString{ String: string(*dr.Source), Valid: true, } } tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() if _, err := tx.StmtContext(ctx, insertStmt).ExecContext(ctx, ic.code.CountryCode, ic.code.LoCode, ic.code.FairwaySection, ic.code.Orc, ic.code.Hectometre, string(*dr.Objname.Loc), float64(*dr.Lon), float64(*dr.Lat), from, to, &validity, float64(*dr.Zeropoint), geodref, &dateInfo, source, ); err != nil { feedback.Warn(handleError(err).Error()) tx.Rollback() continue } // Remove obsolete reference water levels var currLevels pgtype.VarcharArray currLevels.Set([]string{ string(*dr.Reflevel1code), string(*dr.Reflevel2code), string(*dr.Reflevel3code), }) var delRef string err = tx.StmtContext( ctx, deleteReferenceWaterLevelsStmt).QueryRowContext(ctx, ic.code.CountryCode, ic.code.LoCode, ic.code.FairwaySection, ic.code.Orc, ic.code.Hectometre, &currLevels, ).Scan(&delRef) switch { case err == sql.ErrNoRows: // There was nothing to delete case err != nil: return nil, err default: feedback.Info("Removed reference water level %s from %s", delRef, ic.code) } // Insert/update reference water levels for _, wl := range []struct { level **erdms.RisreflevelcodeType value **erdms.RisreflevelvalueType }{ {&dr.Reflevel1code, &dr.Reflevel1value}, {&dr.Reflevel2code, &dr.Reflevel2value}, {&dr.Reflevel3code, &dr.Reflevel3value}, } { if *wl.level == nil || *wl.value == nil { continue } var isNtSDepthRef bool if err := tx.StmtContext(ctx, isNtSDepthRefStmt).QueryRowContext( ctx, string(**wl.level), ).Scan( &isNtSDepthRef, ); err != nil { return nil, err } if !isNtSDepthRef { feedback.Warn( "Reference level code '%s' is not in line "+ "with the NtS reference_code table", string(**wl.level)) } if _, err := tx.StmtContext(ctx, insertWaterLevelStmt).ExecContext( ctx, ic.code.CountryCode, ic.code.LoCode, ic.code.FairwaySection, ic.code.Orc, ic.code.Hectometre, string(**wl.level), int64(**wl.value), ); err != nil { feedback.Warn(handleError(err).Error()) tx.Rollback() continue } } if err = tx.Commit(); err != nil { return nil, err } } feedback.Info("Refreshing gauges took %s.", time.Since(start)) return nil, err }