Mercurial > gemma
view pkg/imports/wg.go @ 2104:c9af355d4a2c
staging: display stretch name
author | Thomas Junk <thomas.junk@intevation.de> |
---|---|
date | Mon, 04 Feb 2019 14:35:47 +0100 |
parents | d966f03ea819 |
children | b868cb653c4d |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "context" "database/sql" "errors" "fmt" "strings" "time" "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/soap" "gemma.intevation.de/gemma/pkg/soap/erdms" ) type WaterwayGauge struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Username is the username used to authenticate. Username string `json:"username"` // Passwort is the password to authenticate. Password string `json:"password"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } const WGJobKind JobKind = "wg" type wgJobCreator struct{} func init() { RegisterJobCreator(WGJobKind, wgJobCreator{}) } func (wgJobCreator) Description() string { return "waterway gauges" } func (wgJobCreator) AutoAccept() bool { return true } func (wgJobCreator) Create(_ JobKind, data string) (Job, error) { wg := new(WaterwayGauge) if err := common.FromJSONString(data, wg); err != nil { return nil, err } return wg, nil } func (wgJobCreator) Depends() []string { return []string{ "gauges", "gauges_reference_water_levels", } } // StageDone does nothing as there is no staging for gauges. func (wgJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil } // CleanUp does nothing as there is nothing to cleanup with gauges. func (*WaterwayGauge) CleanUp() error { return nil } const ( selectCurrentUserCountrySQL = `SELECT users.current_user_country()` hasGaugeSQL = ` SELECT true FROM waterway.gauges WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)` deleteReferenceWaterLevelsSQL = ` DELETE FROM waterway.gauges_reference_water_levels WHERE gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)` insertGaugeSQL = ` INSERT INTO waterway.gauges ( location, objname, geom, applicability_from_km, applicability_to_km, validity, zero_point, geodref, date_info, source_organization ) VALUES ( ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), $6, ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography, $9, $10, $11, $12, $13, $14, $15 ) ON CONFLICT (location) DO UPDATE SET objname = $6, geom = ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography, applicability_from_km = $9, applicability_to_km = $10, validity = $11, zero_point = $12, geodref = $13, date_info = $14, source_organization = $15 ` insertReferenceWaterLevelsSQL = ` INSERT INTO waterway.gauges_reference_water_levels ( gauge_id, reference_water_level, value ) VALUES ( ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), $6, $7 ) ` ) func (wg *WaterwayGauge) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() var country string err = tx.QueryRowContext(ctx, selectCurrentUserCountrySQL).Scan(&country) switch { case err == sql.ErrNoRows: return nil, errors.New("Cannot figure out user country") case err != nil: return nil, err } country = strings.ToUpper(country) feedback.Info("Using country '%s'.", country) var auth *soap.BasicAuth if wg.Username != "" { auth = &soap.BasicAuth{ Login: wg.Username, Password: wg.Password, } } client := erdms.NewRefService(wg.URL, wg.Insecure, auth) request := &erdms.GetRisDataXML{ GetRisDataXMLType: &erdms.GetRisDataXMLType{ Subcode: erdms.NoNS{Text: country + "%"}, Funcode: erdms.NoNS{Text: "wtwgag"}, }, } data, err := client.GetRisDataXML(request) if err != nil { return nil, fmt.Errorf("Error requesting ERDMS service: %v", err) } hasGaugeStmt, err := tx.PrepareContext(ctx, hasGaugeSQL) if err != nil { return nil, err } defer hasGaugeStmt.Close() var ignored int type idxCode struct { idx int code *models.Isrs } var news, olds []idxCode for i, dr := range data.RisdataReturn { if dr.RisidxCode == nil { ignored++ continue } code, err := models.IsrsFromString(string(*dr.RisidxCode)) if err != nil { feedback.Warn("invalid ISRS code %v", err) ignored++ continue } if dr.Objname.Loc == nil { feedback.Warn("missing objname: %s", code) ignored++ continue } if dr.Lat == nil || dr.Lon == nil { feedback.Warn("missing lat/lon: %s", code) ignored++ continue } if dr.Zeropoint == nil { feedback.Warn("missing zeropoint: %s", code) ignored++ continue } var dummy bool err = hasGaugeStmt.QueryRowContext(ctx, code.CountryCode, code.LoCode, code.FairwaySection, code.Orc, code.Hectometre, ).Scan(&dummy) switch { case err == sql.ErrNoRows: news = append(news, idxCode{idx: i, code: code}) case err != nil: return nil, err case !dummy: return nil, errors.New("Unexpected result") default: olds = append(olds, idxCode{idx: i, code: code}) } } feedback.Info("ignored gauges: %d", ignored) feedback.Info("new gauges: %d", len(news)) feedback.Info("update gauges: %d", len(olds)) if len(news) == 0 && len(olds) == 0 { return nil, UnchangedError("nothing to do") } // delete reference water leves of the old. if len(olds) > 0 { deleteReferenceWaterLevelsStmt, err := tx.PrepareContext( ctx, deleteReferenceWaterLevelsSQL) if err != nil { return nil, err } defer deleteReferenceWaterLevelsStmt.Close() for i := range olds { code := olds[i].code if _, err := deleteReferenceWaterLevelsStmt.ExecContext(ctx, code.CountryCode, code.LoCode, code.FairwaySection, code.Orc, code.Hectometre, ); err != nil { return nil, err } } // treat them as new news = append(news, olds...) } insertStmt, err := tx.PrepareContext(ctx, insertGaugeSQL) if err != nil { return nil, err } defer insertStmt.Close() insertWaterLevelStmt, err := tx.PrepareContext( ctx, insertReferenceWaterLevelsSQL) if err != nil { return nil, err } defer insertWaterLevelStmt.Close() // insert/update the gauges for i := range news { ic := &news[i] dr := data.RisdataReturn[ic.idx] feedback.Info("insert/update %s", ic.code) var from, to sql.NullInt64 if dr.Applicabilityfromkm != nil { from = sql.NullInt64{ Int64: int64(*dr.Applicabilityfromkm), Valid: true, } } if dr.Applicabilitytokm != nil { to = sql.NullInt64{ Int64: int64(*dr.Applicabilitytokm), Valid: true, } } var tfrom, tto, dateInfo pgtype.Timestamptz if dr.Startdate != nil { tfrom = pgtype.Timestamptz{ Time: time.Time(*dr.Startdate), Status: pgtype.Present, } } else { tfrom = pgtype.Timestamptz{ Status: pgtype.Null, } } if dr.Enddate != nil { tto = pgtype.Timestamptz{ Time: time.Time(*dr.Enddate), Status: pgtype.Present, } } else { tto = pgtype.Timestamptz{ Status: pgtype.Null, } } validity := pgtype.Tstzrange{ Lower: tfrom, Upper: tto, LowerType: pgtype.Inclusive, UpperType: pgtype.Inclusive, Status: pgtype.Present, } if dr.Infodate != nil { dateInfo = pgtype.Timestamptz{ Time: time.Time(*dr.Infodate), Status: pgtype.Present, } } else { dateInfo = pgtype.Timestamptz{ Status: pgtype.Null, } } var geodref sql.NullString if dr.Geodref != nil { geodref = sql.NullString{ String: string(*dr.Geodref), Valid: true, } } var source sql.NullString if dr.Source != nil { source = sql.NullString{ String: string(*dr.Source), Valid: true, } } if _, err := insertStmt.ExecContext(ctx, ic.code.CountryCode, ic.code.LoCode, ic.code.FairwaySection, ic.code.Orc, ic.code.Hectometre, string(*dr.Objname.Loc), int64(*dr.Lon), int64(*dr.Lat), from, to, &validity, float64(*dr.Zeropoint), geodref, &dateInfo, source, ); err != nil { return nil, err } for _, wl := range []struct { level **erdms.RisreflevelcodeType value **erdms.RisreflevelvalueType }{ {&dr.Reflevel1code, &dr.Reflevel1value}, {&dr.Reflevel2code, &dr.Reflevel2value}, {&dr.Reflevel3code, &dr.Reflevel3value}, } { if *wl.level == nil || *wl.value == nil { continue } if _, err := insertWaterLevelStmt.ExecContext( ctx, ic.code.CountryCode, ic.code.LoCode, ic.code.FairwaySection, ic.code.Orc, ic.code.Hectometre, string(**wl.level), int64(**wl.value), ); err != nil { return nil, err } } } if err = tx.Commit(); err == nil { feedback.Info("Refreshing gauges successfully took %s.", time.Since(start)) } else { feedback.Error("Refreshing gauges failed after %s.", time.Since(start)) } return nil, err }