diff pkg/imports/wg.go @ 3163:d9903cb34842

Handle failing INSERTs gracefully during gauges import Using the special table EXCLUDED in INSERT statements makes functionally no difference, but makes editing of the statements easier. Since reference water levels are not deleted all at once before (re-)importing anymore, take the chance to report those that were deleted.
author Tom Gottfried <tom@intevation.de>
date Mon, 06 May 2019 13:25:49 +0200
parents eb1d119f253f
children 4acbee65275d
line wrap: on
line diff
--- a/pkg/imports/wg.go	Mon May 06 13:21:53 2019 +0200
+++ b/pkg/imports/wg.go	Mon May 06 13:25:49 2019 +0200
@@ -17,7 +17,6 @@
 import (
 	"context"
 	"database/sql"
-	"errors"
 	"time"
 
 	"github.com/jackc/pgx/pgtype"
@@ -66,14 +65,12 @@
 func (*WaterwayGauge) CleanUp() error { return nil }
 
 const (
-	hasGaugeSQL = `
-SELECT true
-FROM waterway.gauges
-WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)`
-
 	deleteReferenceWaterLevelsSQL = `
 DELETE FROM waterway.gauges_reference_water_levels
-WHERE gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)`
+WHERE gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
+  AND depth_reference <> ALL($6)
+RETURNING depth_reference
+`
 
 	insertGaugeSQL = `
 INSERT INTO waterway.gauges (
@@ -99,16 +96,17 @@
   $14,
   $15
 ) ON CONFLICT (location) DO UPDATE SET
-  objname = $6,
-  geom = ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography,
-  applicability_from_km = $9,
-  applicability_to_km = $10,
-  validity = $11,
-  zero_point = $12,
-  geodref = $13,
-  date_info = $14,
-  source_organization = $15
+    objname = EXCLUDED.objname,
+    geom = EXCLUDED.geom,
+    applicability_from_km = EXCLUDED.applicability_from_km,
+    applicability_to_km = EXCLUDED.applicability_to_km,
+    validity = EXCLUDED.validity,
+    zero_point = EXCLUDED.zero_point,
+    geodref = EXCLUDED.geodref,
+    date_info = EXCLUDED.date_info,
+    source_organization = EXCLUDED.source_organization
 `
+
 	isNtSDepthRefSQL = `
 SELECT EXISTS(SELECT 1 FROM depth_references WHERE depth_reference = $1)`
 
@@ -121,7 +119,8 @@
   ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
   $6,
   $7
-)
+) ON CONFLICT (gauge_id, depth_reference) DO UPDATE SET
+    value = EXCLUDED.value
 `
 )
 
@@ -134,15 +133,9 @@
 
 	start := time.Now()
 
-	tx, err := conn.BeginTx(ctx, nil)
-	if err != nil {
-		return nil, err
-	}
-	defer tx.Rollback()
-
 	responseData, err := getRisData(
-		tx,
 		ctx,
+		conn,
 		feedback,
 		wg.Username,
 		wg.Password,
@@ -153,12 +146,6 @@
 		return nil, err
 	}
 
-	hasGaugeStmt, err := tx.PrepareContext(ctx, hasGaugeSQL)
-	if err != nil {
-		return nil, err
-	}
-	defer hasGaugeStmt.Close()
-
 	var ignored int
 
 	type idxCode struct {
@@ -167,7 +154,7 @@
 		code *models.Isrs
 	}
 
-	var news, olds []idxCode
+	var gauges []idxCode
 
 	for j, data := range responseData {
 		for i, dr := range data.RisdataReturn {
@@ -200,80 +187,37 @@
 				continue
 			}
 
-			var dummy bool
-			err = hasGaugeStmt.QueryRowContext(ctx,
-				code.CountryCode,
-				code.LoCode,
-				code.FairwaySection,
-				code.Orc,
-				code.Hectometre,
-			).Scan(&dummy)
-			switch {
-			case err == sql.ErrNoRows:
-				news = append(news, idxCode{jdx: j, idx: i, code: code})
-			case err != nil:
-				return nil, err
-			case !dummy:
-				return nil, errors.New("Unexpected result")
-			default:
-				olds = append(olds, idxCode{jdx: j, idx: i, code: code})
-			}
+			gauges = append(gauges, idxCode{jdx: j, idx: i, code: code})
 		}
 	}
 	feedback.Info("ignored gauges: %d", ignored)
-	feedback.Info("new gauges: %d", len(news))
-	feedback.Info("update gauges: %d", len(olds))
+	feedback.Info("insert/update gauges: %d", len(gauges))
 
-	if len(news) == 0 && len(olds) == 0 {
+	if len(gauges) == 0 {
 		return nil, UnchangedError("nothing to do")
 	}
 
-	// delete reference water leves of the old.
-	if len(olds) > 0 {
-		deleteReferenceWaterLevelsStmt, err := tx.PrepareContext(
-			ctx, deleteReferenceWaterLevelsSQL)
-		if err != nil {
+	// insert/update the gauges
+	var insertStmt, deleteReferenceWaterLevelsStmt,
+		isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt
+	for _, x := range []struct {
+		sql  string
+		stmt **sql.Stmt
+	}{
+		{insertGaugeSQL, &insertStmt},
+		{deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt},
+		{isNtSDepthRefSQL, &isNtSDepthRefStmt},
+		{insertReferenceWaterLevelsSQL, &insertWaterLevelStmt},
+	} {
+		var err error
+		if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil {
 			return nil, err
 		}
-		defer deleteReferenceWaterLevelsStmt.Close()
-		for i := range olds {
-			code := olds[i].code
-			if _, err := deleteReferenceWaterLevelsStmt.ExecContext(ctx,
-				code.CountryCode,
-				code.LoCode,
-				code.FairwaySection,
-				code.Orc,
-				code.Hectometre,
-			); err != nil {
-				return nil, err
-			}
-		}
-		// treat them as new
-		news = append(news, olds...)
+		defer (*x.stmt).Close()
 	}
 
-	insertStmt, err := tx.PrepareContext(ctx, insertGaugeSQL)
-	if err != nil {
-		return nil, err
-	}
-	defer insertStmt.Close()
-
-	insertWaterLevelStmt, err := tx.PrepareContext(
-		ctx, insertReferenceWaterLevelsSQL)
-	if err != nil {
-		return nil, err
-	}
-	defer insertWaterLevelStmt.Close()
-
-	isNtSDepthRefStmt, err := tx.PrepareContext(ctx, isNtSDepthRefSQL)
-	if err != nil {
-		return nil, err
-	}
-	defer isNtSDepthRefStmt.Close()
-
-	// insert/update the gauges
-	for i := range news {
-		ic := &news[i]
+	for i := range gauges {
+		ic := &gauges[i]
 		dr := responseData[ic.jdx].RisdataReturn[ic.idx]
 
 		feedback.Info("insert/update %s", ic.code)
@@ -352,7 +296,13 @@
 			}
 		}
 
-		if _, err := insertStmt.ExecContext(ctx,
+		tx, err := conn.BeginTx(ctx, nil)
+		if err != nil {
+			return nil, err
+		}
+		defer tx.Rollback()
+
+		if _, err := tx.StmtContext(ctx, insertStmt).ExecContext(ctx,
 			ic.code.CountryCode,
 			ic.code.LoCode,
 			ic.code.FairwaySection,
@@ -368,9 +318,39 @@
 			&dateInfo,
 			source,
 		); err != nil {
-			return nil, err
+			feedback.Warn(handleError(err).Error())
+			tx.Rollback()
+			continue
 		}
 
+		// Remove obsolete reference water levels
+		var currLevels pgtype.VarcharArray
+		currLevels.Set([]string{
+			string(*dr.Reflevel1code),
+			string(*dr.Reflevel2code),
+			string(*dr.Reflevel3code),
+		})
+		var delRef string
+		err = tx.StmtContext(
+			ctx, deleteReferenceWaterLevelsStmt).QueryRowContext(ctx,
+			ic.code.CountryCode,
+			ic.code.LoCode,
+			ic.code.FairwaySection,
+			ic.code.Orc,
+			ic.code.Hectometre,
+			&currLevels,
+		).Scan(&delRef)
+		switch {
+		case err == sql.ErrNoRows:
+			// There was nothing to delete
+		case err != nil:
+			return nil, err
+		default:
+			feedback.Info("Removed reference water level %s from %s",
+				delRef, ic.code)
+		}
+
+		// Insert/update reference water levels
 		for _, wl := range []struct {
 			level **erdms.RisreflevelcodeType
 			value **erdms.RisreflevelvalueType
@@ -384,7 +364,7 @@
 			}
 
 			var isNtSDepthRef bool
-			if err := isNtSDepthRefStmt.QueryRowContext(
+			if err := tx.StmtContext(ctx, isNtSDepthRefStmt).QueryRowContext(
 				ctx,
 				string(**wl.level),
 			).Scan(
@@ -394,11 +374,12 @@
 			}
 			if !isNtSDepthRef {
 				feedback.Warn(
-					"Reference level code '%s' is not in line with the NtS reference_code table",
+					"Reference level code '%s' is not in line "+
+						"with the NtS reference_code table",
 					string(**wl.level))
 			}
 
-			if _, err := insertWaterLevelStmt.ExecContext(
+			if _, err := tx.StmtContext(ctx, insertWaterLevelStmt).ExecContext(
 				ctx,
 				ic.code.CountryCode,
 				ic.code.LoCode,
@@ -408,18 +389,19 @@
 				string(**wl.level),
 				int64(**wl.value),
 			); err != nil {
-				return nil, err
+				feedback.Warn(handleError(err).Error())
+				tx.Rollback()
+				continue
 			}
 		}
+
+		if err = tx.Commit(); err != nil {
+			return nil, err
+		}
 	}
 
-	if err = tx.Commit(); err == nil {
-		feedback.Info("Refreshing gauges successfully took %s.",
-			time.Since(start))
-	} else {
-		feedback.Error("Refreshing gauges failed after %s.",
-			time.Since(start))
-	}
+	feedback.Info("Refreshing gauges took %s.",
+		time.Since(start))
 
 	return nil, err
 }