diff pkg/imports/agm.go @ 1780:48791416bea5

(Approved) gauge measurement import: Fixed row level security.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Fri, 11 Jan 2019 16:48:14 +0100
parents ad1c12e999df
children 0a53c33bc7b2
line wrap: on
line diff
--- a/pkg/imports/agm.go	Fri Jan 11 15:40:50 2019 +0100
+++ b/pkg/imports/agm.go	Fri Jan 11 16:48:14 2019 +0100
@@ -65,14 +65,29 @@
 }
 
 const (
-	// TODO: re-add staging_done field in table and fix RLS policy
-	// issue for raw import.
+	// delete the old  and keep the new measures.
+	agmStageDoneDeleteSQL = `
+WITH staged AS (
+  SELECT key
+  FROM waterway.track_imports
+  WHERE import_id = $1 AND
+        relation = 'waterway.gauge_measurements'::regclass
+),
+to_delete AS (
+  SELECT o.id AS id
+  FROM waterway.gauge_measurements oJOIN waterway.gauge_measurements n
+    ON n.fk_gauge_id = o.fk_gauge_id AND n.measure_date = o.measure_date
+    WHERE n.id     IN (SELECT key FROM staged)
+	  AND o.id NOT IN (SELECT key FROM staged)
+)
+DELETE FROM waterway.gauge_measurements WHERE id IN (SELECT id from to_delete)`
+
 	agmStageDoneSQL = `
 UPDATE waterway.gauge_measurements SET staging_done = true
-WHERE id = (
-  SELECT key from waterway.track_imports
+WHERE id IN (
+  SELECT key FROM waterway.track_imports
   WHERE import_id = $1 AND
-        relation = 'waterway.gauge_measurements'::regclass)`
+    relation = 'waterway.gauge_measurements'::regclass)`
 )
 
 func (agmJobCreator) StageDone(
@@ -80,7 +95,10 @@
 	tx *sql.Tx,
 	id int64,
 ) error {
-	_, err := tx.ExecContext(ctx, agmStageDoneSQL, id)
+	_, err := tx.ExecContext(ctx, agmStageDoneDeleteSQL, id)
+	if err == nil {
+		_, err = tx.ExecContext(ctx, agmStageDoneSQL, id)
+	}
 	return err
 }
 
@@ -197,7 +215,9 @@
 
 	ids := []int64{}
 
-	args := make([]interface{}, 18)
+	args := make([]interface{}, 19)
+
+	args[18] = false // staging_done
 
 lines:
 	for line := 1; ; line++ {
@@ -275,6 +295,8 @@
 
 		args[17] = row[headerIndices["originator"]]
 
+		// args[18] (staging_done) is set to true outside the loop.
+
 		var id int64
 		if err := insertStmt.QueryRowContext(ctx, args...).Scan(&id); err != nil {
 			return nil, fmt.Errorf("Failed to insert line %d: %v", line, err)