# HG changeset patch # User Sascha L. Teichmann # Date 1547652887 -3600 # Node ID b4b9089c2d79383a7ee0d7dbe7129426bc1b0d94 # Parent 1ecfcf46e4da736b611429516b92c28be1e29ca3 Waterway gauges: Started with deleting old gauges to be overwritten. diff -r 1ecfcf46e4da -r b4b9089c2d79 pkg/imports/wg.go --- a/pkg/imports/wg.go Wed Jan 16 15:35:23 2019 +0100 +++ b/pkg/imports/wg.go Wed Jan 16 16:34:47 2019 +0100 @@ -17,10 +17,12 @@ "context" "database/sql" "errors" + "fmt" "log" "strings" "gemma.intevation.de/gemma/pkg/common" + "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/soap" "gemma.intevation.de/gemma/pkg/soap/erdms" ) @@ -47,7 +49,7 @@ func (wgJobCreator) Description() string { return "waterway gauges" } -func (wgJobCreator) AutoAccept() bool { return false } +func (wgJobCreator) AutoAccept() bool { return true } func (wgJobCreator) Create(_ JobKind, data string) (Job, error) { wg := new(WaterwayGauge) @@ -60,22 +62,31 @@ func (wgJobCreator) Depends() []string { return []string{ "gauges", + "gauges_reference_water_levels", } } -func (wgJobCreator) StageDone( - ctx context.Context, - tx *sql.Tx, - id int64, -) error { - // TODO: Implement me! - return nil -} +// StageDone does nothing as there is no staging for gauges. +func (wgJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil } +// CleanUp does nothing as there is nothing to cleanup with gauges. func (*WaterwayGauge) CleanUp() error { return nil } const ( selectCurrentUserCountrySQL = `SELECT users.current_user_country()` + + hasGaugeSQL = ` +SELECT true +FROM waterway.gauges +WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)` + + deleteReferenceWaterLevelsSQL = ` +DELETE FROM waterway.gauges_reference_water_levels +WHERE gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)` + + deleteGaugeSQL = ` +DELETE FROM waterway.gauges +WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)` ) func (wg *WaterwayGauge) Do( @@ -124,20 +135,116 @@ if err != nil { log.Printf("error: %v\n", err) + return nil, fmt.Errorf("Error requesting ERDMS service: %v", err) + } + + hasGaugeStmt, err := tx.PrepareContext(ctx, hasGaugeSQL) + if err != nil { return nil, err } + defer hasGaugeStmt.Close() + + var ignored int + + type idxCode struct { + idx int + code *models.Isrs + } - for _, dr := range data.RisdataReturn { + var news, olds []idxCode + + for i, dr := range data.RisdataReturn { if dr.RisidxCode == nil { - log.Printf("warn: RisidxCode == nil") + ignored++ + continue + } + code, err := models.IsrsFromString(string(*dr.RisidxCode)) + if err != nil { + feedback.Warn("invalid ISRS code %v", err) + ignored++ + continue + } + + if dr.Objname.Loc == nil { + feedback.Warn("missing objname: %s", code) + ignored++ + continue + } + + if dr.Lat == nil || dr.Lon == nil { + feedback.Warn("missing lat/lon: %s", code) + ignored++ + continue + } + + if dr.Zeropoint == nil { + feedback.Warn("missing zeropoint: %s", code) + ignored++ continue } - if dr.Objname.Loc == nil { - log.Printf("warn: Objname == nil") - continue + + var dummy bool + err = hasGaugeStmt.QueryRowContext(ctx, + code.CountryCode, + code.LoCode, + code.FairwaySection, + code.Orc, + code.Hectometre, + ).Scan(&dummy) + switch { + case err == sql.ErrNoRows: + olds = append(olds, idxCode{idx: i, code: code}) + case err != nil: + return nil, err + case !dummy: + return nil, errors.New("Unexpected result") + default: + news = append(news, idxCode{idx: i, code: code}) } - log.Printf("RisidxCode: %s\n", *dr.RisidxCode) - log.Printf("\tObjname: %s\n", *dr.Objname.Loc) + } + feedback.Info("ignored gauges: %d", ignored) + feedback.Info("new gauges: %d", len(news)) + feedback.Info("update gauges: %d", len(olds)) + + if len(news) == 0 && len(olds) == 0 { + return nil, errors.New("nothing to do") + } + + // delete the old + if len(olds) > 0 { + deleteReferenceWaterLevelsStmt, err := tx.PrepareContext(ctx, deleteReferenceWaterLevelsSQL) + if err != nil { + return nil, err + } + defer deleteReferenceWaterLevelsStmt.Close() + deleteGaugeStmt, err := tx.PrepareContext(ctx, deleteGaugeSQL) + if err != nil { + return nil, err + } + for i := range olds { + ic := &olds[i] + + if _, err := deleteReferenceWaterLevelsStmt.ExecContext(ctx, + ic.code.CountryCode, + ic.code.LoCode, + ic.code.FairwaySection, + ic.code.Orc, + ic.code.Hectometre, + ); err != nil { + return nil, err + } + if _, err := deleteGaugeStmt.ExecContext(ctx, + ic.code.CountryCode, + ic.code.LoCode, + ic.code.FairwaySection, + ic.code.Orc, + ic.code.Hectometre, + ); err != nil { + return nil, err + } + } + // treat them as new + news = append(news, olds...) } // TODO: Implement me!