diff pkg/imports/wg.go @ 1836:4dcdd8891770

Waterway gauges import: Fixed insert/update of gauges. TODO: Re-insert reference water levels.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 16 Jan 2019 22:39:24 +0100
parents f7b926440449
children 00d63eb9306a
line wrap: on
line diff
--- a/pkg/imports/wg.go	Wed Jan 16 18:34:43 2019 +0100
+++ b/pkg/imports/wg.go	Wed Jan 16 22:39:24 2019 +0100
@@ -87,10 +87,6 @@
 DELETE FROM waterway.gauges_reference_water_levels
 WHERE gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)`
 
-	deleteGaugeSQL = `
-DELETE FROM waterway.gauges
-WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)`
-
 	insertGaugeSQL = `
 INSERT INTO waterway.gauges (
   location,
@@ -114,7 +110,17 @@
   $13,
   $14,
   $15
-)`
+) ON CONFLICT (location) DO UPDATE SET
+  objname = $6,
+  geom = ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography,
+  applicability_from_km = $9,
+  applicability_to_km = $10,
+  validity = $11,
+  zero_point = $12,
+  geodref = $13,
+  date_info = $14,
+  source_organization = $15
+`
 )
 
 func (wg *WaterwayGauge) Do(
@@ -181,8 +187,6 @@
 
 	var news, olds []idxCode
 
-	const layout = "2006-01-02T15:04:05.999-07:00"
-
 	for i, dr := range data.RisdataReturn {
 		if dr.RisidxCode == nil {
 			ignored++
@@ -223,13 +227,13 @@
 		).Scan(&dummy)
 		switch {
 		case err == sql.ErrNoRows:
-			olds = append(olds, idxCode{idx: i, code: code})
+			news = append(news, idxCode{idx: i, code: code})
 		case err != nil:
 			return nil, err
 		case !dummy:
 			return nil, errors.New("Unexpected result")
 		default:
-			news = append(news, idxCode{idx: i, code: code})
+			olds = append(olds, idxCode{idx: i, code: code})
 		}
 	}
 	feedback.Info("ignored gauges: %d", ignored)
@@ -240,35 +244,22 @@
 		return nil, errors.New("nothing to do")
 	}
 
-	// delete the old
+	// delete reference water leves of the old.
 	if len(olds) > 0 {
-		deleteReferenceWaterLevelsStmt, err := tx.PrepareContext(ctx, deleteReferenceWaterLevelsSQL)
+		deleteReferenceWaterLevelsStmt, err := tx.PrepareContext(
+			ctx, deleteReferenceWaterLevelsSQL)
 		if err != nil {
 			return nil, err
 		}
 		defer deleteReferenceWaterLevelsStmt.Close()
-		deleteGaugeStmt, err := tx.PrepareContext(ctx, deleteGaugeSQL)
-		if err != nil {
-			return nil, err
-		}
 		for i := range olds {
-			ic := &olds[i]
-
+			code := olds[i].code
 			if _, err := deleteReferenceWaterLevelsStmt.ExecContext(ctx,
-				ic.code.CountryCode,
-				ic.code.LoCode,
-				ic.code.FairwaySection,
-				ic.code.Orc,
-				ic.code.Hectometre,
-			); err != nil {
-				return nil, err
-			}
-			if _, err := deleteGaugeStmt.ExecContext(ctx,
-				ic.code.CountryCode,
-				ic.code.LoCode,
-				ic.code.FairwaySection,
-				ic.code.Orc,
-				ic.code.Hectometre,
+				code.CountryCode,
+				code.LoCode,
+				code.FairwaySection,
+				code.Orc,
+				code.Hectometre,
 			); err != nil {
 				return nil, err
 			}
@@ -277,21 +268,19 @@
 		news = append(news, olds...)
 	}
 
-	if len(news) == 0 {
-		return nil, errors.New("nothing to do")
-	}
-
 	insertStmt, err := tx.PrepareContext(ctx, insertGaugeSQL)
 	if err != nil {
 		return nil, err
 	}
 	defer insertStmt.Close()
 
-	// (re)-insert the gauges
+	// insert/update the gauges
 	for i := range news {
 		ic := &news[i]
 		dr := data.RisdataReturn[ic.idx]
 
+		feedback.Info("insert/update %s", ic.code)
+
 		var from, to sql.NullInt64
 
 		if dr.Applicabilityfromkm != nil {
@@ -310,36 +299,60 @@
 		var tfrom, tto, dateInfo pgtype.Timestamptz
 
 		if dr.Startdate != nil {
-			tfrom = pgtype.Timestamptz{Time: time.Time(*dr.Startdate)}
+			tfrom = pgtype.Timestamptz{
+				Time:   time.Time(*dr.Startdate),
+				Status: pgtype.Present,
+			}
 		} else {
-			tfrom = pgtype.Timestamptz{Status: pgtype.Null}
+			tfrom = pgtype.Timestamptz{
+				Status: pgtype.Null,
+			}
 		}
 
 		if dr.Enddate != nil {
-			tto = pgtype.Timestamptz{Time: time.Time(*dr.Enddate)}
+			tto = pgtype.Timestamptz{
+				Time:   time.Time(*dr.Enddate),
+				Status: pgtype.Present,
+			}
 		} else {
-			tto = pgtype.Timestamptz{Status: pgtype.Null}
+			tto = pgtype.Timestamptz{
+				Status: pgtype.Null,
+			}
 		}
 
 		validity := pgtype.Tstzrange{
-			Lower: tfrom,
-			Upper: tto,
+			Lower:     tfrom,
+			Upper:     tto,
+			LowerType: pgtype.Inclusive,
+			UpperType: pgtype.Inclusive,
+			Status:    pgtype.Present,
 		}
 
 		if dr.Infodate != nil {
-			dateInfo = pgtype.Timestamptz{Time: time.Time(*dr.Infodate)}
+			dateInfo = pgtype.Timestamptz{
+				Time:   time.Time(*dr.Infodate),
+				Status: pgtype.Present,
+			}
 		} else {
-			dateInfo = pgtype.Timestamptz{Status: pgtype.Null}
+			dateInfo = pgtype.Timestamptz{
+				Status: pgtype.Null,
+			}
 		}
 
 		var geodref sql.NullString
 		if dr.Geodref != nil {
-			geodref = sql.NullString{String: string(*dr.Geodref), Valid: true}
+			geodref = sql.NullString{
+				String: string(*dr.Geodref),
+				Valid:  true,
+			}
 		}
 
 		var source sql.NullString
 		if dr.Source != nil {
-			source = sql.NullString{String: string(*dr.Source), Valid: true}
+			source = sql.NullString{
+				String: string(*dr.Source),
+				Valid:  true,
+			}
 		}
 
 		if _, err := insertStmt.ExecContext(ctx,
@@ -352,10 +365,10 @@
 			int64(*dr.Lat), int64(*dr.Lon),
 			from,
 			to,
-			validity,
+			&validity,
 			float64(*dr.Zeropoint),
 			geodref,
-			dateInfo,
+			&dateInfo,
 			source,
 		); err != nil {
 			return nil, err