changeset 5620:165e77c5736a erdms2

Simplified WG import logging.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 28 Nov 2022 18:17:11 +0100
parents f0413b20ad4d
children cf1e8ffe1ed5
files pkg/imports/wg.go
diffstat 1 files changed, 210 insertions(+), 178 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/wg.go	Mon Nov 28 17:40:44 2022 +0100
+++ b/pkg/imports/wg.go	Mon Nov 28 18:17:11 2022 +0100
@@ -17,6 +17,8 @@
 import (
 	"context"
 	"database/sql"
+	"errors"
+	"strings"
 	"time"
 
 	"github.com/jackc/pgx/pgtype"
@@ -177,6 +179,8 @@
 `
 )
 
+var continueErr = errors.New("continue")
+
 // Do implements the actual import.
 func (wg *WaterwayGauge) Do(
 	ctx context.Context,
@@ -226,17 +230,20 @@
 	var gauges []string
 	var unchanged int
 
+	var invalidISRS, startEndOrder, missingObjname, missingZeropoint []string
+
+	databaseErrors := map[string][]string{}
+
 	for _, data := range responseData {
 		for _, dr := range data.RisdataReturn {
 
 			isrs := string(*dr.RisidxCode)
 			code, err := models.IsrsFromString(isrs)
 			if err != nil {
-				feedback.Warn("Invalid ISRS code '%s': %v", isrs, err)
+				invalidISRS = append(invalidISRS, isrs)
 				continue
 			}
 			gauges = append(gauges, isrs)
-			feedback.Info("Processing %s", code)
 
 			// We need a valid, non-empty time range to identify gauge versions
 			if dr.Enddate != nil && dr.Startdate != nil {
@@ -244,12 +251,24 @@
 				sd := dr.Startdate.ToGoTime()
 				// log.Debugf("start date: %v end date: %v\n", sd, ed)
 				if !ed.After(sd) {
-					feedback.Error("End date not after start date")
+					startEndOrder = append(startEndOrder, isrs)
 					unchanged++
 					continue
 				}
 			}
 
+			if dr.Zeropoint == nil {
+				missingZeropoint = append(missingZeropoint, isrs)
+				unchanged++
+				continue
+			}
+
+			if dr.Objname.Loc == nil {
+				missingObjname = append(missingObjname, isrs)
+				unchanged++
+				continue
+			}
+
 			var from, to sql.NullInt64
 
 			if dr.Applicabilityfromkm != nil {
@@ -324,199 +343,212 @@
 				}
 			}
 
-			tx, err := conn.BeginTx(ctx, nil)
-			if err != nil {
-				return nil, err
-			}
-			defer tx.Rollback()
-
-			// Mark old entry of gauge as erased, if applicable
-			var isNew bool
-			err = tx.StmtContext(ctx, eraseGaugeStmt).QueryRowContext(ctx,
-				code.String(),
-				validity,
-			).Scan(&isNew)
-			switch {
-			case err != nil:
-				feedback.Error(pgxutils.ReadableError{Err: err}.Error())
-				if err2 := tx.Rollback(); err2 != nil {
-					return nil, err2
+			err = func() error {
+				tx, err := conn.BeginTx(ctx, nil)
+				if err != nil {
+					return err
 				}
-				unchanged++
-				continue
-			case isNew:
-				var lu *time.Time
-				if dr.Lastupdate != nil {
-					t := dr.Lastupdate.ToGoTime()
-					lu = &t
-				}
-				// insert gauge version entry
-				if _, err = tx.StmtContext(ctx, insertStmt).ExecContext(ctx,
-					code.CountryCode,
-					code.LoCode,
-					code.FairwaySection,
-					code.Orc,
-					code.Hectometre,
-					dr.Objname.Loc,
-					dr.Lon, dr.Lat,
-					from,
-					to,
-					&validity,
-					dr.Zeropoint,
-					geodref,
-					&dateInfo,
-					source,
-					lu,
-				); err != nil {
+				defer tx.Rollback()
+
+				// Mark old entry of gauge as erased, if applicable
+				var isNew bool
+				err = tx.StmtContext(ctx, eraseGaugeStmt).QueryRowContext(ctx,
+					code.String(),
+					validity,
+				).Scan(&isNew)
+				switch {
+				case err != nil:
 					feedback.Error(pgxutils.ReadableError{Err: err}.Error())
-					if err2 := tx.Rollback(); err2 != nil {
-						return nil, err2
+					unchanged++
+					return continueErr
+				case isNew:
+					var lu *time.Time
+					if dr.Lastupdate != nil {
+						t := dr.Lastupdate.ToGoTime()
+						lu = &t
+					}
+					// insert gauge version entry
+					if _, err = tx.StmtContext(ctx, insertStmt).ExecContext(ctx,
+						code.CountryCode,
+						code.LoCode,
+						code.FairwaySection,
+						code.Orc,
+						code.Hectometre,
+						dr.Objname.Loc,
+						dr.Lon, dr.Lat,
+						from,
+						to,
+						&validity,
+						dr.Zeropoint,
+						geodref,
+						&dateInfo,
+						source,
+						lu,
+					); err != nil {
+						key := pgxutils.ReadableError{Err: err}.Error()
+						databaseErrors[key] = append(databaseErrors[key], isrs)
+						return continueErr
+					}
+					//feedback.Info("insert new version")
+				case !isNew:
+					var lu *time.Time
+					if dr.Lastupdate != nil {
+						t := dr.Lastupdate.ToGoTime()
+						lu = &t
 					}
-					unchanged++
-					continue
-				}
-				feedback.Info("insert new version")
-			case !isNew:
-				var lu *time.Time
-				if dr.Lastupdate != nil {
-					t := dr.Lastupdate.ToGoTime()
-					lu = &t
-				}
-				// try to update
-				var dummy int
-				err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx,
-					code.CountryCode,
-					code.LoCode,
-					code.FairwaySection,
-					code.Orc,
-					code.Hectometre,
-					dr.Objname.Loc,
-					dr.Lon, dr.Lat,
-					from,
-					to,
-					dr.Zeropoint,
-					geodref,
-					&dateInfo,
-					source,
-					lu,
-					&validity,
-				).Scan(&dummy)
-				switch {
-				case err2 == sql.ErrNoRows:
-					feedback.Info("unchanged")
-					if err3 := tx.Rollback(); err3 != nil {
-						return nil, err3
+					// try to update
+					var dummy int
+					err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx,
+						code.CountryCode,
+						code.LoCode,
+						code.FairwaySection,
+						code.Orc,
+						code.Hectometre,
+						dr.Objname.Loc,
+						dr.Lon, dr.Lat,
+						from,
+						to,
+						dr.Zeropoint,
+						geodref,
+						&dateInfo,
+						source,
+						lu,
+						&validity,
+					).Scan(&dummy)
+					switch {
+					case err2 == sql.ErrNoRows:
+						//feedback.Info("unchanged")
+						unchanged++
+						return continueErr
+					case err2 != nil:
+						key := pgxutils.ReadableError{Err: err}.Error()
+						databaseErrors[key] = append(databaseErrors[key], isrs)
+						unchanged++
+						return continueErr
+					default:
+						//feedback.Info("update")
 					}
-					unchanged++
-					continue
-				case err2 != nil:
-					feedback.Error(pgxutils.ReadableError{Err: err2}.Error())
-					if err3 := tx.Rollback(); err3 != nil {
-						return nil, err3
+
+					// Remove obsolete reference water levels
+					var currLevels pgtype.VarcharArray
+					currLevels.Set([]string{
+						string(*dr.Reflevel1code),
+						string(*dr.Reflevel2code),
+						string(*dr.Reflevel3code),
+					})
+					rwls, err := tx.StmtContext(ctx,
+						deleteReferenceWaterLevelsStmt).QueryContext(ctx,
+						code.String(),
+						&validity,
+						&currLevels,
+					)
+					if err != nil {
+						return err
 					}
-					unchanged++
-					continue
-				default:
-					feedback.Info("update")
+					defer rwls.Close()
+					for rwls.Next() {
+						var delRef string
+						if err := rwls.Scan(&delRef); err != nil {
+							return err
+						}
+						feedback.Warn("Removed reference water level %s from %s",
+							delRef, code)
+					}
+					if err := rwls.Err(); err != nil {
+						return err
+					}
 				}
 
-				// Remove obsolete reference water levels
-				var currLevels pgtype.VarcharArray
-				currLevels.Set([]string{
-					string(*dr.Reflevel1code),
-					string(*dr.Reflevel2code),
-					string(*dr.Reflevel3code),
-				})
-				rwls, err := tx.StmtContext(ctx,
-					deleteReferenceWaterLevelsStmt).QueryContext(ctx,
+				// Set end of validity of old version to start of new version
+				// in case of overlap
+				if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext(
+					ctx,
 					code.String(),
 					&validity,
-					&currLevels,
-				)
-				if err != nil {
-					return nil, err
-				}
-				defer rwls.Close()
-				for rwls.Next() {
-					var delRef string
-					if err := rwls.Scan(&delRef); err != nil {
-						return nil, err
-					}
-					feedback.Warn("Removed reference water level %s from %s",
-						delRef, code)
-				}
-				if err := rwls.Err(); err != nil {
-					return nil, err
-				}
-			}
-
-			// Set end of validity of old version to start of new version
-			// in case of overlap
-			if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext(
-				ctx,
-				code.String(),
-				&validity,
-			); err != nil {
-				feedback.Error(pgxutils.ReadableError{Err: err}.Error())
-				if err2 := tx.Rollback(); err2 != nil {
-					return nil, err2
-				}
-				unchanged++
-				continue
-			}
-
-			// "Upsert" reference water levels
-			for _, wl := range []struct {
-				level **erdms.RisreflevelcodeType
-				value **erdms.RisreflevelvalueType
-			}{
-				{&dr.Reflevel1code, &dr.Reflevel1value},
-				{&dr.Reflevel2code, &dr.Reflevel2value},
-				{&dr.Reflevel3code, &dr.Reflevel3value},
-			} {
-				if *wl.level == nil || *wl.value == nil {
-					continue
+				); err != nil {
+					key := pgxutils.ReadableError{Err: err}.Error()
+					databaseErrors[key] = append(databaseErrors[key], isrs)
+					unchanged++
+					return continueErr
 				}
 
-				var isNtSDepthRef bool
-				if err := tx.StmtContext(
-					ctx, isNtSDepthRefStmt).QueryRowContext(ctx,
-					string(**wl.level),
-				).Scan(
-					&isNtSDepthRef,
-				); err != nil {
-					return nil, err
-				}
-				if !isNtSDepthRef {
-					feedback.Warn(
-						"Reference level code '%s' is not in line "+
-							"with the NtS reference_code table",
-						string(**wl.level))
+				// "Upsert" reference water levels
+				for _, wl := range []struct {
+					level **erdms.RisreflevelcodeType
+					value **erdms.RisreflevelvalueType
+				}{
+					{&dr.Reflevel1code, &dr.Reflevel1value},
+					{&dr.Reflevel2code, &dr.Reflevel2value},
+					{&dr.Reflevel3code, &dr.Reflevel3value},
+				} {
+					if *wl.level == nil || *wl.value == nil {
+						continue
+					}
+
+					var isNtSDepthRef bool
+					if err := tx.StmtContext(
+						ctx, isNtSDepthRefStmt).QueryRowContext(ctx,
+						string(**wl.level),
+					).Scan(
+						&isNtSDepthRef,
+					); err != nil {
+						return err
+					}
+					if !isNtSDepthRef {
+						feedback.Warn(
+							"Reference level code '%s' is not in line "+
+								"with the NtS reference_code table",
+							string(**wl.level))
+					}
+
+					if _, err := tx.StmtContext(
+						ctx, insertWaterLevelStmt).ExecContext(ctx,
+						code.CountryCode,
+						code.LoCode,
+						code.FairwaySection,
+						code.Orc,
+						code.Hectometre,
+						&validity,
+						string(**wl.level),
+						int64(**wl.value),
+					); err != nil {
+						key := pgxutils.ReadableError{Err: err}.Error()
+						databaseErrors[key] = append(databaseErrors[key], isrs)
+						continue
+					}
 				}
 
-				if _, err := tx.StmtContext(
-					ctx, insertWaterLevelStmt).ExecContext(ctx,
-					code.CountryCode,
-					code.LoCode,
-					code.FairwaySection,
-					code.Orc,
-					code.Hectometre,
-					&validity,
-					string(**wl.level),
-					int64(**wl.value),
-				); err != nil {
-					feedback.Error(pgxutils.ReadableError{Err: err}.Error())
-					tx.Rollback()
-					continue
-				}
-			}
+				return tx.Commit()
+			}()
 
-			if err = tx.Commit(); err != nil {
-				return nil, err
+			if err != nil && err != continueErr {
+				return err, nil
 			}
 		}
 	}
 
+	if len(invalidISRS) > 0 {
+		feedback.Error("Invalid ISRS code: '%s'", strings.Join(invalidISRS, "', '"))
+	}
+
+	if len(startEndOrder) > 0 {
+		feedback.Error("start date not before end date: %s",
+			strings.Join(startEndOrder, ", "))
+	}
+
+	if len(databaseErrors) > 0 {
+		for err, iris := range databaseErrors {
+			feedback.Error("%s: %s", err, strings.Join(iris, ", "))
+		}
+	}
+
+	if len(missingObjname) > 0 {
+		feedback.Error("Missing zeropoint: %s", strings.Join(missingZeropoint, ", "))
+	}
+	if len(missingObjname) > 0 {
+		feedback.Error("Missing objname: %s", strings.Join(missingObjname, ", "))
+	}
+
 	if len(gauges) == 0 {
 		return nil, UnchangedError("No gauges returned from ERDMS")
 	}