Mercurial > gemma
view pkg/imports/wg.go @ 2130:f3aabc05f9b2
Fix constraints on waterway profiles
staging_done in the UNIQUE constraint had no effect, because the
exclusion constraint prevented two rows with equal location and
validity anyhow. Adding staging_done to the exclusion constraint
makes the UNIQUE constraint checking only a corner case of what
the exclusion constraint checks. Thus, remove the UNIQUE constraint.
Casting staging_done to int is needed because there is no appropriate
operator class for booleans. Casting to smallint or even bit would have
been better (i.e. should result in smaller index size), but that would
have required creating such a CAST, in addition.
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Wed, 06 Feb 2019 15:42:32 +0100 |
parents | d966f03ea819 |
children | b868cb653c4d |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "context" "database/sql" "errors" "fmt" "strings" "time" "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/soap" "gemma.intevation.de/gemma/pkg/soap/erdms" ) type WaterwayGauge struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Username is the username used to authenticate. Username string `json:"username"` // Passwort is the password to authenticate. Password string `json:"password"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } const WGJobKind JobKind = "wg" type wgJobCreator struct{} func init() { RegisterJobCreator(WGJobKind, wgJobCreator{}) } func (wgJobCreator) Description() string { return "waterway gauges" } func (wgJobCreator) AutoAccept() bool { return true } func (wgJobCreator) Create(_ JobKind, data string) (Job, error) { wg := new(WaterwayGauge) if err := common.FromJSONString(data, wg); err != nil { return nil, err } return wg, nil } func (wgJobCreator) Depends() []string { return []string{ "gauges", "gauges_reference_water_levels", } } // StageDone does nothing as there is no staging for gauges. func (wgJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil } // CleanUp does nothing as there is nothing to cleanup with gauges. func (*WaterwayGauge) CleanUp() error { return nil } const ( selectCurrentUserCountrySQL = `SELECT users.current_user_country()` hasGaugeSQL = ` SELECT true FROM waterway.gauges WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)` deleteReferenceWaterLevelsSQL = ` DELETE FROM waterway.gauges_reference_water_levels WHERE gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)` insertGaugeSQL = ` INSERT INTO waterway.gauges ( location, objname, geom, applicability_from_km, applicability_to_km, validity, zero_point, geodref, date_info, source_organization ) VALUES ( ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), $6, ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography, $9, $10, $11, $12, $13, $14, $15 ) ON CONFLICT (location) DO UPDATE SET objname = $6, geom = ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography, applicability_from_km = $9, applicability_to_km = $10, validity = $11, zero_point = $12, geodref = $13, date_info = $14, source_organization = $15 ` insertReferenceWaterLevelsSQL = ` INSERT INTO waterway.gauges_reference_water_levels ( gauge_id, reference_water_level, value ) VALUES ( ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), $6, $7 ) ` ) func (wg *WaterwayGauge) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() var country string err = tx.QueryRowContext(ctx, selectCurrentUserCountrySQL).Scan(&country) switch { case err == sql.ErrNoRows: return nil, errors.New("Cannot figure out user country") case err != nil: return nil, err } country = strings.ToUpper(country) feedback.Info("Using country '%s'.", country) var auth *soap.BasicAuth if wg.Username != "" { auth = &soap.BasicAuth{ Login: wg.Username, Password: wg.Password, } } client := erdms.NewRefService(wg.URL, wg.Insecure, auth) request := &erdms.GetRisDataXML{ GetRisDataXMLType: &erdms.GetRisDataXMLType{ Subcode: erdms.NoNS{Text: country + "%"}, Funcode: erdms.NoNS{Text: "wtwgag"}, }, } data, err := client.GetRisDataXML(request) if err != nil { return nil, fmt.Errorf("Error requesting ERDMS service: %v", err) } hasGaugeStmt, err := tx.PrepareContext(ctx, hasGaugeSQL) if err != nil { return nil, err } defer hasGaugeStmt.Close() var ignored int type idxCode struct { idx int code *models.Isrs } var news, olds []idxCode for i, dr := range data.RisdataReturn { if dr.RisidxCode == nil { ignored++ continue } code, err := models.IsrsFromString(string(*dr.RisidxCode)) if err != nil { feedback.Warn("invalid ISRS code %v", err) ignored++ continue } if dr.Objname.Loc == nil { feedback.Warn("missing objname: %s", code) ignored++ continue } if dr.Lat == nil || dr.Lon == nil { feedback.Warn("missing lat/lon: %s", code) ignored++ continue } if dr.Zeropoint == nil { feedback.Warn("missing zeropoint: %s", code) ignored++ continue } var dummy bool err = hasGaugeStmt.QueryRowContext(ctx, code.CountryCode, code.LoCode, code.FairwaySection, code.Orc, code.Hectometre, ).Scan(&dummy) switch { case err == sql.ErrNoRows: news = append(news, idxCode{idx: i, code: code}) case err != nil: return nil, err case !dummy: return nil, errors.New("Unexpected result") default: olds = append(olds, idxCode{idx: i, code: code}) } } feedback.Info("ignored gauges: %d", ignored) feedback.Info("new gauges: %d", len(news)) feedback.Info("update gauges: %d", len(olds)) if len(news) == 0 && len(olds) == 0 { return nil, UnchangedError("nothing to do") } // delete reference water leves of the old. if len(olds) > 0 { deleteReferenceWaterLevelsStmt, err := tx.PrepareContext( ctx, deleteReferenceWaterLevelsSQL) if err != nil { return nil, err } defer deleteReferenceWaterLevelsStmt.Close() for i := range olds { code := olds[i].code if _, err := deleteReferenceWaterLevelsStmt.ExecContext(ctx, code.CountryCode, code.LoCode, code.FairwaySection, code.Orc, code.Hectometre, ); err != nil { return nil, err } } // treat them as new news = append(news, olds...) } insertStmt, err := tx.PrepareContext(ctx, insertGaugeSQL) if err != nil { return nil, err } defer insertStmt.Close() insertWaterLevelStmt, err := tx.PrepareContext( ctx, insertReferenceWaterLevelsSQL) if err != nil { return nil, err } defer insertWaterLevelStmt.Close() // insert/update the gauges for i := range news { ic := &news[i] dr := data.RisdataReturn[ic.idx] feedback.Info("insert/update %s", ic.code) var from, to sql.NullInt64 if dr.Applicabilityfromkm != nil { from = sql.NullInt64{ Int64: int64(*dr.Applicabilityfromkm), Valid: true, } } if dr.Applicabilitytokm != nil { to = sql.NullInt64{ Int64: int64(*dr.Applicabilitytokm), Valid: true, } } var tfrom, tto, dateInfo pgtype.Timestamptz if dr.Startdate != nil { tfrom = pgtype.Timestamptz{ Time: time.Time(*dr.Startdate), Status: pgtype.Present, } } else { tfrom = pgtype.Timestamptz{ Status: pgtype.Null, } } if dr.Enddate != nil { tto = pgtype.Timestamptz{ Time: time.Time(*dr.Enddate), Status: pgtype.Present, } } else { tto = pgtype.Timestamptz{ Status: pgtype.Null, } } validity := pgtype.Tstzrange{ Lower: tfrom, Upper: tto, LowerType: pgtype.Inclusive, UpperType: pgtype.Inclusive, Status: pgtype.Present, } if dr.Infodate != nil { dateInfo = pgtype.Timestamptz{ Time: time.Time(*dr.Infodate), Status: pgtype.Present, } } else { dateInfo = pgtype.Timestamptz{ Status: pgtype.Null, } } var geodref sql.NullString if dr.Geodref != nil { geodref = sql.NullString{ String: string(*dr.Geodref), Valid: true, } } var source sql.NullString if dr.Source != nil { source = sql.NullString{ String: string(*dr.Source), Valid: true, } } if _, err := insertStmt.ExecContext(ctx, ic.code.CountryCode, ic.code.LoCode, ic.code.FairwaySection, ic.code.Orc, ic.code.Hectometre, string(*dr.Objname.Loc), int64(*dr.Lon), int64(*dr.Lat), from, to, &validity, float64(*dr.Zeropoint), geodref, &dateInfo, source, ); err != nil { return nil, err } for _, wl := range []struct { level **erdms.RisreflevelcodeType value **erdms.RisreflevelvalueType }{ {&dr.Reflevel1code, &dr.Reflevel1value}, {&dr.Reflevel2code, &dr.Reflevel2value}, {&dr.Reflevel3code, &dr.Reflevel3value}, } { if *wl.level == nil || *wl.value == nil { continue } if _, err := insertWaterLevelStmt.ExecContext( ctx, ic.code.CountryCode, ic.code.LoCode, ic.code.FairwaySection, ic.code.Orc, ic.code.Hectometre, string(**wl.level), int64(**wl.value), ); err != nil { return nil, err } } } if err = tx.Commit(); err == nil { feedback.Info("Refreshing gauges successfully took %s.", time.Since(start)) } else { feedback.Error("Refreshing gauges failed after %s.", time.Since(start)) } return nil, err }