changeset 4958:6d5d6b27c3c3 fairway-marks-import

Added pre commit hooks before commiting fairway marks to invalidate old.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 26 Feb 2020 12:54:33 +0100
parents b0607611bcdf
children 2ab75c48e8e7
files pkg/imports/fm.go pkg/imports/pointwfs.go
diffstat 2 files changed, 39 insertions(+), 3 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/fm.go	Wed Feb 26 12:34:11 2020 +0100
+++ b/pkg/imports/fm.go	Wed Feb 26 12:54:33 2020 +0100
@@ -179,6 +179,7 @@
 					insertBcnlatDirimpSQL,
 				),
 				consumeBCNLAT,
+				createInvalidation("bcnlat"),
 				func() interface{} { return new(bcnlatProperties) },
 			),
 		})
@@ -194,6 +195,7 @@
 						"marsys", "boyshp", "catlam"),
 				),
 				consumeBOYLAT,
+				createInvalidation("boylat"),
 				func() interface{} { return new(boylatProperties) },
 			),
 		})
@@ -209,6 +211,7 @@
 						"marsys", "boyshp", "catcam"),
 				),
 				consumeBOYCAR,
+				createInvalidation("boycar"),
 				func() interface{} { return new(boycarProperties) },
 			),
 		})
@@ -223,6 +226,7 @@
 						"colour", "colpat", "conrad", "marsys", "boyshp"),
 				),
 				consumeBOYSAW,
+				createInvalidation("boysaw"),
 				func() interface{} { return new(boysawProperties) },
 			),
 		})
@@ -238,6 +242,7 @@
 						"marsys", "boyshp", "catspm"),
 				),
 				consumeBOYSPP,
+				createInvalidation("boyspp"),
 				func() interface{} { return new(boysppProperties) },
 			),
 		})
@@ -253,6 +258,7 @@
 					insertDaymarDirimpSQL,
 				),
 				consumeDAYMAR,
+				createInvalidation("daymar"),
 				func() interface{} { return new(daymarProperties) },
 			),
 		})
@@ -271,6 +277,7 @@
 						"sigseq", "status"),
 				),
 				consumeLIGHTS,
+				createInvalidation("lights"),
 				func() interface{} { return new(lightsProperties) },
 			),
 		})
@@ -289,6 +296,7 @@
 					insertNotmrkDirimpSQL,
 				),
 				consumeNOTMRK,
+				createInvalidation("notmark"),
 				func() interface{} { return new(notmrkProperties) },
 			),
 		})
@@ -303,6 +311,7 @@
 						"condtn", "siggrp", "catrtb", "radwal"),
 				),
 				consumeRTPBCN,
+				createInvalidation("rtpbcn"),
 				func() interface{} { return new(rtpbcnProperties) },
 			),
 		})
@@ -317,6 +326,7 @@
 						"colour", "colpat", "condtn", "topshp"),
 				),
 				consumeTOPMAR,
+				createInvalidation("topmar"),
 				func() interface{} { return new(topmarProperties) },
 			),
 		})
@@ -425,6 +435,24 @@
 `
 )
 
+func createInvalidation(fmType string) func(*SQLPointConsumer) error {
+
+	invalidateFairwayMarksSQL := fmt.Sprintf(invalidateFairwayMarksSQLtmpl, fmType)
+
+	return func(spc *SQLPointConsumer) error {
+		res, err := spc.tx.ExecContext(spc.ctx, invalidateFairwayMarksSQL)
+		if err != nil {
+			return err
+		}
+		nOld, err := res.RowsAffected()
+		if err != nil {
+			return err
+		}
+		spc.feedback.Info("Number of features removed from data source: %d", nOld)
+		return nil
+	}
+}
+
 // Create INSERT statement for specific fairway marks type
 func createInsertFMSQL(fmType string, attributes ...string) string {
 	attNums := "$16"
--- a/pkg/imports/pointwfs.go	Wed Feb 26 12:34:11 2020 +0100
+++ b/pkg/imports/pointwfs.go	Wed Feb 26 12:54:33 2020 +0100
@@ -206,7 +206,7 @@
 
 	if dupes > 0 {
 		feedback.Info(
-			"Features outside responsibility area, duplicates or unchanged: %d",
+			"Features outside responsibility area and duplicates: %d",
 			dupes)
 	}
 
@@ -222,13 +222,13 @@
 		feedback.Warn("Unsupported types found: %s", unsupported)
 	}
 
-	// Commit before eventually returning UnchangedError because we might
-	// have updated last_found
 	if err = consumer.Commit(); err == nil {
 		feedback.Info("Storing %d features took %s",
 			features, time.Since(start))
 	}
 
+	// Commit before eventually returning UnchangedError because we might
+	// have updated last_found
 	if features == 0 {
 		return nil, UnchangedError("no valid new features found")
 	}
@@ -243,6 +243,7 @@
 		feedback      Feedback
 		newProperties func() interface{}
 		consume       func(*SQLPointConsumer, pointSlice, interface{}, int) error
+		preCommit     func(*SQLPointConsumer) error
 		savepoint     func(func() error) error
 		stmts         []*sql.Stmt
 	}
@@ -260,6 +261,11 @@
 
 func (spc *SQLPointConsumer) Commit() error {
 	if tx := spc.tx; tx != nil {
+		if spc.preCommit != nil {
+			if err := spc.preCommit(spc); err != nil {
+				return err
+			}
+		}
 		spc.releaseStmts()
 		spc.tx = nil
 		spc.ctx = nil
@@ -283,6 +289,7 @@
 func newSQLConsumer(
 	init func(*SQLPointConsumer) error,
 	consume func(*SQLPointConsumer, pointSlice, interface{}, int) error,
+	preCommit func(*SQLPointConsumer) error,
 	newProperties func() interface{},
 
 ) func(context.Context, *sql.Conn, Feedback) (WFSPointConsumer, error) {
@@ -297,6 +304,7 @@
 			feedback:      feedback,
 			newProperties: newProperties,
 			consume:       consume,
+			preCommit:     preCommit,
 			savepoint:     Savepoint(ctx, tx, "feature"),
 		}
 		if err := init(spc); err != nil {