Mercurial > gemma
view pkg/imports/wg.go @ 1835:f7b926440449
Waterway gauge import: More stuggling with inserting gauges. Not working, yet.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 16 Jan 2019 18:34:43 +0100 |
parents | b4b9089c2d79 |
children | 4dcdd8891770 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "context" "database/sql" "errors" "fmt" "log" "strings" "time" "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/soap" "gemma.intevation.de/gemma/pkg/soap/erdms" ) type WaterwayGauge struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Username is the username used to authenticate. Username string `json:"username"` // Passwort is the password to authenticate. Password string `json:"password"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } const WGJobKind JobKind = "wg" type wgJobCreator struct{} func init() { RegisterJobCreator(WGJobKind, wgJobCreator{}) } func (wgJobCreator) Description() string { return "waterway gauges" } func (wgJobCreator) AutoAccept() bool { return true } func (wgJobCreator) Create(_ JobKind, data string) (Job, error) { wg := new(WaterwayGauge) if err := common.FromJSONString(data, wg); err != nil { return nil, err } return wg, nil } func (wgJobCreator) Depends() []string { return []string{ "gauges", "gauges_reference_water_levels", } } // StageDone does nothing as there is no staging for gauges. func (wgJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil } // CleanUp does nothing as there is nothing to cleanup with gauges. func (*WaterwayGauge) CleanUp() error { return nil } const ( selectCurrentUserCountrySQL = `SELECT users.current_user_country()` hasGaugeSQL = ` SELECT true FROM waterway.gauges WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)` deleteReferenceWaterLevelsSQL = ` DELETE FROM waterway.gauges_reference_water_levels WHERE gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)` deleteGaugeSQL = ` DELETE FROM waterway.gauges WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)` insertGaugeSQL = ` INSERT INTO waterway.gauges ( location, objname, geom, applicability_from_km, applicability_to_km, validity, zero_point, geodref, date_info, source_organization ) VALUES ( ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), $6, ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography, $9, $10, $11, $12, $13, $14, $15 )` ) func (wg *WaterwayGauge) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() var country string err = tx.QueryRowContext(ctx, selectCurrentUserCountrySQL).Scan(&country) switch { case err == sql.ErrNoRows: return nil, errors.New("Cannot figure out user country") case err != nil: return nil, err } country = strings.ToUpper(country) feedback.Info("Using country '%s'.", country) var auth *soap.BasicAuth if wg.Username != "" { auth = &soap.BasicAuth{ Login: wg.Username, Password: wg.Password, } } client := erdms.NewRefService(wg.URL, wg.Insecure, auth) request := &erdms.GetRisDataXML{ GetRisDataXMLType: &erdms.GetRisDataXMLType{ Subcode: erdms.NoNS{Text: country + "%"}, Funcode: erdms.NoNS{Text: "wtwgag"}, }, } data, err := client.GetRisDataXML(request) if err != nil { log.Printf("error: %v\n", err) return nil, fmt.Errorf("Error requesting ERDMS service: %v", err) } hasGaugeStmt, err := tx.PrepareContext(ctx, hasGaugeSQL) if err != nil { return nil, err } defer hasGaugeStmt.Close() var ignored int type idxCode struct { idx int code *models.Isrs } var news, olds []idxCode const layout = "2006-01-02T15:04:05.999-07:00" for i, dr := range data.RisdataReturn { if dr.RisidxCode == nil { ignored++ continue } code, err := models.IsrsFromString(string(*dr.RisidxCode)) if err != nil { feedback.Warn("invalid ISRS code %v", err) ignored++ continue } if dr.Objname.Loc == nil { feedback.Warn("missing objname: %s", code) ignored++ continue } if dr.Lat == nil || dr.Lon == nil { feedback.Warn("missing lat/lon: %s", code) ignored++ continue } if dr.Zeropoint == nil { feedback.Warn("missing zeropoint: %s", code) ignored++ continue } var dummy bool err = hasGaugeStmt.QueryRowContext(ctx, code.CountryCode, code.LoCode, code.FairwaySection, code.Orc, code.Hectometre, ).Scan(&dummy) switch { case err == sql.ErrNoRows: olds = append(olds, idxCode{idx: i, code: code}) case err != nil: return nil, err case !dummy: return nil, errors.New("Unexpected result") default: news = append(news, idxCode{idx: i, code: code}) } } feedback.Info("ignored gauges: %d", ignored) feedback.Info("new gauges: %d", len(news)) feedback.Info("update gauges: %d", len(olds)) if len(news) == 0 && len(olds) == 0 { return nil, errors.New("nothing to do") } // delete the old if len(olds) > 0 { deleteReferenceWaterLevelsStmt, err := tx.PrepareContext(ctx, deleteReferenceWaterLevelsSQL) if err != nil { return nil, err } defer deleteReferenceWaterLevelsStmt.Close() deleteGaugeStmt, err := tx.PrepareContext(ctx, deleteGaugeSQL) if err != nil { return nil, err } for i := range olds { ic := &olds[i] if _, err := deleteReferenceWaterLevelsStmt.ExecContext(ctx, ic.code.CountryCode, ic.code.LoCode, ic.code.FairwaySection, ic.code.Orc, ic.code.Hectometre, ); err != nil { return nil, err } if _, err := deleteGaugeStmt.ExecContext(ctx, ic.code.CountryCode, ic.code.LoCode, ic.code.FairwaySection, ic.code.Orc, ic.code.Hectometre, ); err != nil { return nil, err } } // treat them as new news = append(news, olds...) } if len(news) == 0 { return nil, errors.New("nothing to do") } insertStmt, err := tx.PrepareContext(ctx, insertGaugeSQL) if err != nil { return nil, err } defer insertStmt.Close() // (re)-insert the gauges for i := range news { ic := &news[i] dr := data.RisdataReturn[ic.idx] var from, to sql.NullInt64 if dr.Applicabilityfromkm != nil { from = sql.NullInt64{ Int64: int64(*dr.Applicabilityfromkm), Valid: true, } } if dr.Applicabilitytokm != nil { to = sql.NullInt64{ Int64: int64(*dr.Applicabilitytokm), Valid: true, } } var tfrom, tto, dateInfo pgtype.Timestamptz if dr.Startdate != nil { tfrom = pgtype.Timestamptz{Time: time.Time(*dr.Startdate)} } else { tfrom = pgtype.Timestamptz{Status: pgtype.Null} } if dr.Enddate != nil { tto = pgtype.Timestamptz{Time: time.Time(*dr.Enddate)} } else { tto = pgtype.Timestamptz{Status: pgtype.Null} } validity := pgtype.Tstzrange{ Lower: tfrom, Upper: tto, } if dr.Infodate != nil { dateInfo = pgtype.Timestamptz{Time: time.Time(*dr.Infodate)} } else { dateInfo = pgtype.Timestamptz{Status: pgtype.Null} } var geodref sql.NullString if dr.Geodref != nil { geodref = sql.NullString{String: string(*dr.Geodref), Valid: true} } var source sql.NullString if dr.Source != nil { source = sql.NullString{String: string(*dr.Source), Valid: true} } if _, err := insertStmt.ExecContext(ctx, ic.code.CountryCode, ic.code.LoCode, ic.code.FairwaySection, ic.code.Orc, ic.code.Hectometre, string(*dr.Objname.Loc), int64(*dr.Lat), int64(*dr.Lon), from, to, validity, float64(*dr.Zeropoint), geodref, dateInfo, source, ); err != nil { return nil, err } // TODO: Reference water levels. } // TODO: Implement me! return nil, errors.New("Not implemented, yet!") }