Mercurial > gemma
view pkg/imports/wg.go @ 3163:d9903cb34842
Handle failing INSERTs gracefully during gauges import
Using the special table EXCLUDED in INSERT statements makes
functionally no difference, but makes editing of the statements easier.
Since reference water levels are not deleted all at once before
(re-)importing anymore, take the chance to report those that were
deleted.
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Mon, 06 May 2019 13:25:49 +0200 |
parents | eb1d119f253f |
children | 4acbee65275d |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018, 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Tom Gottfried <tom.gottfried@intevation.de> package imports import ( "context" "database/sql" "time" "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/soap/erdms" ) type WaterwayGauge struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Username is the username used to authenticate. Username string `json:"username"` // Passwort is the password to authenticate. Password string `json:"password"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } const WGJobKind JobKind = "wg" type wgJobCreator struct{} func init() { RegisterJobCreator(WGJobKind, wgJobCreator{}) } func (wgJobCreator) Description() string { return "waterway gauges" } func (wgJobCreator) AutoAccept() bool { return true } func (wgJobCreator) Create() Job { return new(WaterwayGauge) } func (wgJobCreator) Depends() []string { return []string{ "gauges", "gauges_reference_water_levels", } } // StageDone does nothing as there is no staging for gauges. func (wgJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil } // CleanUp does nothing as there is nothing to cleanup with gauges. func (*WaterwayGauge) CleanUp() error { return nil } const ( deleteReferenceWaterLevelsSQL = ` DELETE FROM waterway.gauges_reference_water_levels WHERE gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) AND depth_reference <> ALL($6) RETURNING depth_reference ` insertGaugeSQL = ` INSERT INTO waterway.gauges ( location, objname, geom, applicability_from_km, applicability_to_km, validity, zero_point, geodref, date_info, source_organization ) VALUES ( ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), $6, ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography, $9, $10, $11, $12, $13, $14, $15 ) ON CONFLICT (location) DO UPDATE SET objname = EXCLUDED.objname, geom = EXCLUDED.geom, applicability_from_km = EXCLUDED.applicability_from_km, applicability_to_km = EXCLUDED.applicability_to_km, validity = EXCLUDED.validity, zero_point = EXCLUDED.zero_point, geodref = EXCLUDED.geodref, date_info = EXCLUDED.date_info, source_organization = EXCLUDED.source_organization ` isNtSDepthRefSQL = ` SELECT EXISTS(SELECT 1 FROM depth_references WHERE depth_reference = $1)` insertReferenceWaterLevelsSQL = ` INSERT INTO waterway.gauges_reference_water_levels ( gauge_id, depth_reference, value ) VALUES ( ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), $6, $7 ) ON CONFLICT (gauge_id, depth_reference) DO UPDATE SET value = EXCLUDED.value ` ) func (wg *WaterwayGauge) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() responseData, err := getRisData( ctx, conn, feedback, wg.Username, wg.Password, wg.URL, wg.Insecure, "wtwgag") if err != nil { return nil, err } var ignored int type idxCode struct { jdx int idx int code *models.Isrs } var gauges []idxCode for j, data := range responseData { for i, dr := range data.RisdataReturn { if dr.RisidxCode == nil { ignored++ continue } code, err := models.IsrsFromString(string(*dr.RisidxCode)) if err != nil { feedback.Warn("invalid ISRS code %v", err) ignored++ continue } if dr.Objname.Loc == nil { feedback.Warn("missing objname: %s", code) ignored++ continue } if dr.Lat == nil || dr.Lon == nil { feedback.Warn("missing lat/lon: %s", code) ignored++ continue } if dr.Zeropoint == nil { feedback.Warn("missing zeropoint: %s", code) ignored++ continue } gauges = append(gauges, idxCode{jdx: j, idx: i, code: code}) } } feedback.Info("ignored gauges: %d", ignored) feedback.Info("insert/update gauges: %d", len(gauges)) if len(gauges) == 0 { return nil, UnchangedError("nothing to do") } // insert/update the gauges var insertStmt, deleteReferenceWaterLevelsStmt, isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt for _, x := range []struct { sql string stmt **sql.Stmt }{ {insertGaugeSQL, &insertStmt}, {deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt}, {isNtSDepthRefSQL, &isNtSDepthRefStmt}, {insertReferenceWaterLevelsSQL, &insertWaterLevelStmt}, } { var err error if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil { return nil, err } defer (*x.stmt).Close() } for i := range gauges { ic := &gauges[i] dr := responseData[ic.jdx].RisdataReturn[ic.idx] feedback.Info("insert/update %s", ic.code) var from, to sql.NullInt64 if dr.Applicabilityfromkm != nil { from = sql.NullInt64{ Int64: int64(*dr.Applicabilityfromkm), Valid: true, } } if dr.Applicabilitytokm != nil { to = sql.NullInt64{ Int64: int64(*dr.Applicabilitytokm), Valid: true, } } var tfrom, tto, dateInfo pgtype.Timestamptz if dr.Startdate != nil { tfrom = pgtype.Timestamptz{ Time: time.Time(*dr.Startdate), Status: pgtype.Present, } } else { tfrom = pgtype.Timestamptz{ Status: pgtype.Null, } } if dr.Enddate != nil { tto = pgtype.Timestamptz{ Time: time.Time(*dr.Enddate), Status: pgtype.Present, } } else { tto = pgtype.Timestamptz{ Status: pgtype.Null, } } validity := pgtype.Tstzrange{ Lower: tfrom, Upper: tto, LowerType: pgtype.Inclusive, UpperType: pgtype.Inclusive, Status: pgtype.Present, } if dr.Infodate != nil { dateInfo = pgtype.Timestamptz{ Time: time.Time(*dr.Infodate), Status: pgtype.Present, } } else { dateInfo = pgtype.Timestamptz{ Status: pgtype.Null, } } var geodref sql.NullString if dr.Geodref != nil { geodref = sql.NullString{ String: string(*dr.Geodref), Valid: true, } } var source sql.NullString if dr.Source != nil { source = sql.NullString{ String: string(*dr.Source), Valid: true, } } tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() if _, err := tx.StmtContext(ctx, insertStmt).ExecContext(ctx, ic.code.CountryCode, ic.code.LoCode, ic.code.FairwaySection, ic.code.Orc, ic.code.Hectometre, string(*dr.Objname.Loc), float64(*dr.Lon), float64(*dr.Lat), from, to, &validity, float64(*dr.Zeropoint), geodref, &dateInfo, source, ); err != nil { feedback.Warn(handleError(err).Error()) tx.Rollback() continue } // Remove obsolete reference water levels var currLevels pgtype.VarcharArray currLevels.Set([]string{ string(*dr.Reflevel1code), string(*dr.Reflevel2code), string(*dr.Reflevel3code), }) var delRef string err = tx.StmtContext( ctx, deleteReferenceWaterLevelsStmt).QueryRowContext(ctx, ic.code.CountryCode, ic.code.LoCode, ic.code.FairwaySection, ic.code.Orc, ic.code.Hectometre, &currLevels, ).Scan(&delRef) switch { case err == sql.ErrNoRows: // There was nothing to delete case err != nil: return nil, err default: feedback.Info("Removed reference water level %s from %s", delRef, ic.code) } // Insert/update reference water levels for _, wl := range []struct { level **erdms.RisreflevelcodeType value **erdms.RisreflevelvalueType }{ {&dr.Reflevel1code, &dr.Reflevel1value}, {&dr.Reflevel2code, &dr.Reflevel2value}, {&dr.Reflevel3code, &dr.Reflevel3value}, } { if *wl.level == nil || *wl.value == nil { continue } var isNtSDepthRef bool if err := tx.StmtContext(ctx, isNtSDepthRefStmt).QueryRowContext( ctx, string(**wl.level), ).Scan( &isNtSDepthRef, ); err != nil { return nil, err } if !isNtSDepthRef { feedback.Warn( "Reference level code '%s' is not in line "+ "with the NtS reference_code table", string(**wl.level)) } if _, err := tx.StmtContext(ctx, insertWaterLevelStmt).ExecContext( ctx, ic.code.CountryCode, ic.code.LoCode, ic.code.FairwaySection, ic.code.Orc, ic.code.Hectometre, string(**wl.level), int64(**wl.value), ); err != nil { feedback.Warn(handleError(err).Error()) tx.Rollback() continue } } if err = tx.Commit(); err != nil { return nil, err } } feedback.Info("Refreshing gauges took %s.", time.Since(start)) return nil, err }