diff pkg/imports/wg.go @ 3310:e0dabe7b2fcf

Simplify gauges import Instead of checking some NOT NULL constraints in an extra loop, check all of them in the database.
author Tom Gottfried <tom@intevation.de>
date Fri, 17 May 2019 12:49:28 +0200
parents 5932f9574493
children 0f6b156cff55
line wrap: on
line diff
--- a/pkg/imports/wg.go	Fri May 17 12:45:13 2019 +0200
+++ b/pkg/imports/wg.go	Fri May 17 12:49:28 2019 +0200
@@ -172,58 +172,6 @@
 		return nil, err
 	}
 
-	var ignored int
-
-	type idxCode struct {
-		jdx  int
-		idx  int
-		code *models.Isrs
-	}
-
-	var gauges []idxCode
-
-	for j, data := range responseData {
-		for i, dr := range data.RisdataReturn {
-			if dr.RisidxCode == nil {
-				ignored++
-				continue
-			}
-			code, err := models.IsrsFromString(string(*dr.RisidxCode))
-			if err != nil {
-				feedback.Warn("invalid ISRS code %v", err)
-				ignored++
-				continue
-			}
-
-			if dr.Objname.Loc == nil {
-				feedback.Warn("missing objname: %s", code)
-				ignored++
-				continue
-			}
-
-			if dr.Lat == nil || dr.Lon == nil {
-				feedback.Warn("missing lat/lon: %s", code)
-				ignored++
-				continue
-			}
-
-			if dr.Zeropoint == nil {
-				feedback.Warn("missing zeropoint: %s", code)
-				ignored++
-				continue
-			}
-
-			gauges = append(gauges, idxCode{jdx: j, idx: i, code: code})
-		}
-	}
-	feedback.Info("Ignored gauges: %d", ignored)
-	feedback.Info("Further process %d gauges", len(gauges))
-
-	if len(gauges) == 0 {
-		return nil, UnchangedError("Nothing to do")
-	}
-
-	// insert/update the gauges
 	var eraseGaugeStmt, insertStmt, updateStmt,
 		deleteReferenceWaterLevelsStmt,
 		isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt
@@ -245,257 +193,269 @@
 		defer (*x.stmt).Close()
 	}
 
+	var gauges []string
 	var unchanged int
 
-	for i := range gauges {
-		ic := &gauges[i]
-		dr := responseData[ic.jdx].RisdataReturn[ic.idx]
-
-		feedback.Info("Processing %s", ic.code)
+	for _, data := range responseData {
+		for _, dr := range data.RisdataReturn {
 
-		var from, to sql.NullInt64
-
-		if dr.Applicabilityfromkm != nil {
-			from = sql.NullInt64{
-				Int64: int64(*dr.Applicabilityfromkm),
-				Valid: true,
+			isrs := string(*dr.RisidxCode)
+			code, err := models.IsrsFromString(isrs)
+			if err != nil {
+				feedback.Warn("Invalid ISRS code '%s': %v", isrs, err)
+				continue
 			}
-		}
-		if dr.Applicabilitytokm != nil {
-			to = sql.NullInt64{
-				Int64: int64(*dr.Applicabilitytokm),
-				Valid: true,
-			}
-		}
+			gauges = append(gauges, isrs)
+			feedback.Info("Processing %s", code)
+
+			var from, to sql.NullInt64
 
-		var tfrom, tto, dateInfo pgtype.Timestamptz
-
-		if dr.Startdate != nil {
-			tfrom = pgtype.Timestamptz{
-				Time:   time.Time(*dr.Startdate),
-				Status: pgtype.Present,
-			}
-		} else {
-			tfrom = pgtype.Timestamptz{
-				Status: pgtype.Null,
+			if dr.Applicabilityfromkm != nil {
+				from = sql.NullInt64{
+					Int64: int64(*dr.Applicabilityfromkm),
+					Valid: true,
+				}
 			}
-		}
+			if dr.Applicabilitytokm != nil {
+				to = sql.NullInt64{
+					Int64: int64(*dr.Applicabilitytokm),
+					Valid: true,
+				}
+			}
 
-		if dr.Enddate != nil {
-			tto = pgtype.Timestamptz{
-				Time:   time.Time(*dr.Enddate),
-				Status: pgtype.Present,
-			}
-		} else {
-			tto = pgtype.Timestamptz{
-				Status: pgtype.Null,
-			}
-		}
+			var tfrom, tto, dateInfo pgtype.Timestamptz
 
-		validity := pgtype.Tstzrange{
-			Lower:     tfrom,
-			Upper:     tto,
-			LowerType: pgtype.Inclusive,
-			UpperType: pgtype.Exclusive,
-			Status:    pgtype.Present,
-		}
+			if dr.Startdate != nil {
+				tfrom = pgtype.Timestamptz{
+					Time:   time.Time(*dr.Startdate),
+					Status: pgtype.Present,
+				}
+			} else {
+				tfrom = pgtype.Timestamptz{
+					Status: pgtype.Null,
+				}
+			}
 
-		if dr.Infodate != nil {
-			dateInfo = pgtype.Timestamptz{
-				Time:   time.Time(*dr.Infodate),
-				Status: pgtype.Present,
+			if dr.Enddate != nil {
+				tto = pgtype.Timestamptz{
+					Time:   time.Time(*dr.Enddate),
+					Status: pgtype.Present,
+				}
+			} else {
+				tto = pgtype.Timestamptz{
+					Status: pgtype.Null,
+				}
 			}
-		} else {
-			dateInfo = pgtype.Timestamptz{
-				Status: pgtype.Null,
-			}
-		}
 
-		var geodref sql.NullString
-		if dr.Geodref != nil {
-			geodref = sql.NullString{
-				String: string(*dr.Geodref),
-				Valid:  true,
-			}
-		}
-
-		var source sql.NullString
-		if dr.Source != nil {
-			source = sql.NullString{
-				String: string(*dr.Source),
-				Valid:  true,
+			validity := pgtype.Tstzrange{
+				Lower:     tfrom,
+				Upper:     tto,
+				LowerType: pgtype.Inclusive,
+				UpperType: pgtype.Exclusive,
+				Status:    pgtype.Present,
 			}
-		}
-
-		tx, err := conn.BeginTx(ctx, nil)
-		if err != nil {
-			return nil, err
-		}
-		defer tx.Rollback()
 
-		// Mark old entries of gauge as erased, if applicable
-		if _, err := tx.StmtContext(ctx, eraseGaugeStmt).ExecContext(ctx,
-			ic.code.String(),
-			validity,
-		); err != nil {
-			feedback.Warn(handleError(err).Error())
-			if err2 := tx.Rollback(); err2 != nil {
-				return nil, err2
+			if dr.Infodate != nil {
+				dateInfo = pgtype.Timestamptz{
+					Time:   time.Time(*dr.Infodate),
+					Status: pgtype.Present,
+				}
+			} else {
+				dateInfo = pgtype.Timestamptz{
+					Status: pgtype.Null,
+				}
+			}
+
+			var geodref sql.NullString
+			if dr.Geodref != nil {
+				geodref = sql.NullString{
+					String: string(*dr.Geodref),
+					Valid:  true,
+				}
 			}
-			unchanged++
-			continue
-		}
+
+			var source sql.NullString
+			if dr.Source != nil {
+				source = sql.NullString{
+					String: string(*dr.Source),
+					Valid:  true,
+				}
+			}
+
+			tx, err := conn.BeginTx(ctx, nil)
+			if err != nil {
+				return nil, err
+			}
+			defer tx.Rollback()
 
-		// Try to insert gauge entry
-		var dummy int
-		err = tx.StmtContext(ctx, insertStmt).QueryRowContext(ctx,
-			ic.code.CountryCode,
-			ic.code.LoCode,
-			ic.code.FairwaySection,
-			ic.code.Orc,
-			ic.code.Hectometre,
-			string(*dr.Objname.Loc),
-			float64(*dr.Lon), float64(*dr.Lat),
-			from,
-			to,
-			&validity,
-			float64(*dr.Zeropoint),
-			geodref,
-			&dateInfo,
-			source,
-			time.Time(*dr.Lastupdate),
-		).Scan(&dummy)
-		switch {
-		case err == sql.ErrNoRows:
-			// Assume constraint conflict, try to update
-			err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx,
-				ic.code.CountryCode,
-				ic.code.LoCode,
-				ic.code.FairwaySection,
-				ic.code.Orc,
-				ic.code.Hectometre,
-				string(*dr.Objname.Loc),
-				float64(*dr.Lon), float64(*dr.Lat),
+			// Mark old entries of gauge as erased, if applicable
+			if _, err := tx.StmtContext(ctx, eraseGaugeStmt).ExecContext(ctx,
+				code.String(),
+				validity,
+			); err != nil {
+				feedback.Warn(handleError(err).Error())
+				if err2 := tx.Rollback(); err2 != nil {
+					return nil, err2
+				}
+				unchanged++
+				continue
+			}
+
+			// Try to insert gauge entry
+			var dummy int
+			err = tx.StmtContext(ctx, insertStmt).QueryRowContext(ctx,
+				code.CountryCode,
+				code.LoCode,
+				code.FairwaySection,
+				code.Orc,
+				code.Hectometre,
+				dr.Objname.Loc,
+				dr.Lon, dr.Lat,
 				from,
 				to,
-				float64(*dr.Zeropoint),
+				&validity,
+				dr.Zeropoint,
 				geodref,
 				&dateInfo,
 				source,
 				time.Time(*dr.Lastupdate),
 			).Scan(&dummy)
 			switch {
-			case err2 == sql.ErrNoRows:
-				feedback.Info("unchanged")
-				if err3 := tx.Rollback(); err3 != nil {
-					return nil, err3
+			case err == sql.ErrNoRows:
+				// Assume constraint conflict, try to update
+				err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx,
+					code.CountryCode,
+					code.LoCode,
+					code.FairwaySection,
+					code.Orc,
+					code.Hectometre,
+					dr.Objname.Loc,
+					dr.Lon, dr.Lat,
+					from,
+					to,
+					dr.Zeropoint,
+					geodref,
+					&dateInfo,
+					source,
+					time.Time(*dr.Lastupdate),
+				).Scan(&dummy)
+				switch {
+				case err2 == sql.ErrNoRows:
+					feedback.Info("unchanged")
+					if err3 := tx.Rollback(); err3 != nil {
+						return nil, err3
+					}
+					unchanged++
+					continue
+				case err2 != nil:
+					feedback.Warn(handleError(err2).Error())
+					if err3 := tx.Rollback(); err3 != nil {
+						return nil, err3
+					}
+					unchanged++
+					continue
+				default:
+					feedback.Info("update")
 				}
-				unchanged++
-				continue
-			case err2 != nil:
-				feedback.Warn(handleError(err2).Error())
-				if err3 := tx.Rollback(); err3 != nil {
-					return nil, err3
+
+				// Remove obsolete reference water levels
+				var currLevels pgtype.VarcharArray
+				currLevels.Set([]string{
+					string(*dr.Reflevel1code),
+					string(*dr.Reflevel2code),
+					string(*dr.Reflevel3code),
+				})
+				rwls, err := tx.StmtContext(ctx,
+					deleteReferenceWaterLevelsStmt).QueryContext(ctx,
+					code.String(),
+					&validity,
+					&currLevels,
+				)
+				if err != nil {
+					return nil, err
+				}
+				defer rwls.Close()
+				for rwls.Next() {
+					var delRef string
+					if err := rwls.Scan(&delRef); err != nil {
+						return nil, err
+					}
+					feedback.Warn("Removed reference water level %s from %s",
+						delRef, code)
+				}
+				if err := rwls.Err(); err != nil {
+					return nil, err
+				}
+			case err != nil:
+				feedback.Warn(handleError(err).Error())
+				if err2 := tx.Rollback(); err2 != nil {
+					return nil, err2
 				}
 				unchanged++
 				continue
 			default:
-				feedback.Info("update")
+				feedback.Info("insert new version")
 			}
 
-			// Remove obsolete reference water levels
-			var currLevels pgtype.VarcharArray
-			currLevels.Set([]string{
-				string(*dr.Reflevel1code),
-				string(*dr.Reflevel2code),
-				string(*dr.Reflevel3code),
-			})
-			rwls, err := tx.StmtContext(ctx,
-				deleteReferenceWaterLevelsStmt).QueryContext(ctx,
-				ic.code.String(),
-				&validity,
-				&currLevels,
-			)
-			if err != nil {
-				return nil, err
-			}
-			defer rwls.Close()
-			for rwls.Next() {
-				var delRef string
-				if err := rwls.Scan(&delRef); err != nil {
+			// "Upsert" reference water levels
+			for _, wl := range []struct {
+				level **erdms.RisreflevelcodeType
+				value **erdms.RisreflevelvalueType
+			}{
+				{&dr.Reflevel1code, &dr.Reflevel1value},
+				{&dr.Reflevel2code, &dr.Reflevel2value},
+				{&dr.Reflevel3code, &dr.Reflevel3value},
+			} {
+				if *wl.level == nil || *wl.value == nil {
+					continue
+				}
+
+				var isNtSDepthRef bool
+				if err := tx.StmtContext(
+					ctx, isNtSDepthRefStmt).QueryRowContext(ctx,
+					string(**wl.level),
+				).Scan(
+					&isNtSDepthRef,
+				); err != nil {
 					return nil, err
 				}
-				feedback.Warn("Removed reference water level %s from %s",
-					delRef, ic.code)
+				if !isNtSDepthRef {
+					feedback.Warn(
+						"Reference level code '%s' is not in line "+
+							"with the NtS reference_code table",
+						string(**wl.level))
+				}
+
+				if _, err := tx.StmtContext(
+					ctx, insertWaterLevelStmt).ExecContext(ctx,
+					code.CountryCode,
+					code.LoCode,
+					code.FairwaySection,
+					code.Orc,
+					code.Hectometre,
+					&validity,
+					string(**wl.level),
+					int64(**wl.value),
+				); err != nil {
+					feedback.Warn(handleError(err).Error())
+					tx.Rollback()
+					continue
+				}
 			}
-			if err := rwls.Err(); err != nil {
+
+			if err = tx.Commit(); err != nil {
 				return nil, err
 			}
-		case err != nil:
-			feedback.Warn(handleError(err).Error())
-			if err2 := tx.Rollback(); err2 != nil {
-				return nil, err2
-			}
-			unchanged++
-			continue
-		default:
-			feedback.Info("insert new version")
-		}
-
-		// "Upsert" reference water levels
-		for _, wl := range []struct {
-			level **erdms.RisreflevelcodeType
-			value **erdms.RisreflevelvalueType
-		}{
-			{&dr.Reflevel1code, &dr.Reflevel1value},
-			{&dr.Reflevel2code, &dr.Reflevel2value},
-			{&dr.Reflevel3code, &dr.Reflevel3value},
-		} {
-			if *wl.level == nil || *wl.value == nil {
-				continue
-			}
-
-			var isNtSDepthRef bool
-			if err := tx.StmtContext(ctx, isNtSDepthRefStmt).QueryRowContext(
-				ctx,
-				string(**wl.level),
-			).Scan(
-				&isNtSDepthRef,
-			); err != nil {
-				return nil, err
-			}
-			if !isNtSDepthRef {
-				feedback.Warn(
-					"Reference level code '%s' is not in line "+
-						"with the NtS reference_code table",
-					string(**wl.level))
-			}
-
-			if _, err := tx.StmtContext(ctx, insertWaterLevelStmt).ExecContext(
-				ctx,
-				ic.code.CountryCode,
-				ic.code.LoCode,
-				ic.code.FairwaySection,
-				ic.code.Orc,
-				ic.code.Hectometre,
-				&validity,
-				string(**wl.level),
-				int64(**wl.value),
-			); err != nil {
-				feedback.Warn(handleError(err).Error())
-				tx.Rollback()
-				continue
-			}
-		}
-
-		if err = tx.Commit(); err != nil {
-			return nil, err
 		}
 	}
 
 	feedback.Info("Importing gauges took %s",
 		time.Since(start))
 
+	if len(gauges) == 0 {
+		return nil, UnchangedError("No gauges returned from ERDMS")
+	}
+
 	if unchanged == len(gauges) {
 		return nil, UnchangedError("All gauges unchanged")
 	}