diff pkg/imports/wg.go @ 1829:b4b9089c2d79

Waterway gauges: Started with deleting old gauges to be overwritten.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 16 Jan 2019 16:34:47 +0100
parents 4910bcfab319
children f7b926440449
line wrap: on
line diff
--- a/pkg/imports/wg.go	Wed Jan 16 15:35:23 2019 +0100
+++ b/pkg/imports/wg.go	Wed Jan 16 16:34:47 2019 +0100
@@ -17,10 +17,12 @@
 	"context"
 	"database/sql"
 	"errors"
+	"fmt"
 	"log"
 	"strings"
 
 	"gemma.intevation.de/gemma/pkg/common"
+	"gemma.intevation.de/gemma/pkg/models"
 	"gemma.intevation.de/gemma/pkg/soap"
 	"gemma.intevation.de/gemma/pkg/soap/erdms"
 )
@@ -47,7 +49,7 @@
 
 func (wgJobCreator) Description() string { return "waterway gauges" }
 
-func (wgJobCreator) AutoAccept() bool { return false }
+func (wgJobCreator) AutoAccept() bool { return true }
 
 func (wgJobCreator) Create(_ JobKind, data string) (Job, error) {
 	wg := new(WaterwayGauge)
@@ -60,22 +62,31 @@
 func (wgJobCreator) Depends() []string {
 	return []string{
 		"gauges",
+		"gauges_reference_water_levels",
 	}
 }
 
-func (wgJobCreator) StageDone(
-	ctx context.Context,
-	tx *sql.Tx,
-	id int64,
-) error {
-	// TODO: Implement me!
-	return nil
-}
+// StageDone does nothing as there is no staging for gauges.
+func (wgJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil }
 
+// CleanUp does nothing as there is nothing to cleanup with gauges.
 func (*WaterwayGauge) CleanUp() error { return nil }
 
 const (
 	selectCurrentUserCountrySQL = `SELECT users.current_user_country()`
+
+	hasGaugeSQL = `
+SELECT true
+FROM waterway.gauges
+WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)`
+
+	deleteReferenceWaterLevelsSQL = `
+DELETE FROM waterway.gauges_reference_water_levels
+WHERE gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)`
+
+	deleteGaugeSQL = `
+DELETE FROM waterway.gauges
+WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)`
 )
 
 func (wg *WaterwayGauge) Do(
@@ -124,20 +135,116 @@
 
 	if err != nil {
 		log.Printf("error: %v\n", err)
+		return nil, fmt.Errorf("Error requesting ERDMS service: %v", err)
+	}
+
+	hasGaugeStmt, err := tx.PrepareContext(ctx, hasGaugeSQL)
+	if err != nil {
 		return nil, err
 	}
+	defer hasGaugeStmt.Close()
+
+	var ignored int
+
+	type idxCode struct {
+		idx  int
+		code *models.Isrs
+	}
 
-	for _, dr := range data.RisdataReturn {
+	var news, olds []idxCode
+
+	for i, dr := range data.RisdataReturn {
 		if dr.RisidxCode == nil {
-			log.Printf("warn: RisidxCode == nil")
+			ignored++
+			continue
+		}
+		code, err := models.IsrsFromString(string(*dr.RisidxCode))
+		if err != nil {
+			feedback.Warn("invalid ISRS code %v", err)
+			ignored++
+			continue
+		}
+
+		if dr.Objname.Loc == nil {
+			feedback.Warn("missing objname: %s", code)
+			ignored++
+			continue
+		}
+
+		if dr.Lat == nil || dr.Lon == nil {
+			feedback.Warn("missing lat/lon: %s", code)
+			ignored++
+			continue
+		}
+
+		if dr.Zeropoint == nil {
+			feedback.Warn("missing zeropoint: %s", code)
+			ignored++
 			continue
 		}
-		if dr.Objname.Loc == nil {
-			log.Printf("warn: Objname == nil")
-			continue
+
+		var dummy bool
+		err = hasGaugeStmt.QueryRowContext(ctx,
+			code.CountryCode,
+			code.LoCode,
+			code.FairwaySection,
+			code.Orc,
+			code.Hectometre,
+		).Scan(&dummy)
+		switch {
+		case err == sql.ErrNoRows:
+			olds = append(olds, idxCode{idx: i, code: code})
+		case err != nil:
+			return nil, err
+		case !dummy:
+			return nil, errors.New("Unexpected result")
+		default:
+			news = append(news, idxCode{idx: i, code: code})
 		}
-		log.Printf("RisidxCode: %s\n", *dr.RisidxCode)
-		log.Printf("\tObjname: %s\n", *dr.Objname.Loc)
+	}
+	feedback.Info("ignored gauges: %d", ignored)
+	feedback.Info("new gauges: %d", len(news))
+	feedback.Info("update gauges: %d", len(olds))
+
+	if len(news) == 0 && len(olds) == 0 {
+		return nil, errors.New("nothing to do")
+	}
+
+	// delete the old
+	if len(olds) > 0 {
+		deleteReferenceWaterLevelsStmt, err := tx.PrepareContext(ctx, deleteReferenceWaterLevelsSQL)
+		if err != nil {
+			return nil, err
+		}
+		defer deleteReferenceWaterLevelsStmt.Close()
+		deleteGaugeStmt, err := tx.PrepareContext(ctx, deleteGaugeSQL)
+		if err != nil {
+			return nil, err
+		}
+		for i := range olds {
+			ic := &olds[i]
+
+			if _, err := deleteReferenceWaterLevelsStmt.ExecContext(ctx,
+				ic.code.CountryCode,
+				ic.code.LoCode,
+				ic.code.FairwaySection,
+				ic.code.Orc,
+				ic.code.Hectometre,
+			); err != nil {
+				return nil, err
+			}
+			if _, err := deleteGaugeStmt.ExecContext(ctx,
+				ic.code.CountryCode,
+				ic.code.LoCode,
+				ic.code.FairwaySection,
+				ic.code.Orc,
+				ic.code.Hectometre,
+			); err != nil {
+				return nil, err
+			}
+		}
+		// treat them as new
+		news = append(news, olds...)
 	}
 
 	// TODO: Implement me!