changeset 5086:f572b553cd8a time-sliding

merge default into time-sliding branch
author Fadi Abbud <fadi.abbud@intevation.de>
date Fri, 20 Mar 2020 15:20:55 +0100
parents fe63733750d7 (current diff) 59a99655f34d (diff)
children d77dd0220780
files
diffstat 35 files changed, 465 insertions(+), 184 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/controllers/importqueue.go	Thu Mar 19 17:43:23 2020 +0100
+++ b/pkg/controllers/importqueue.go	Fri Mar 20 15:20:55 2020 +0100
@@ -583,28 +583,6 @@
 	return
 }
 
-const (
-	isPendingSQL = `
-SELECT state = 'pending'::import_state, kind
-FROM import.imports
-WHERE id = $1`
-
-	reviewSQL = `
-UPDATE import.imports SET
-  state = $1::import_state,
-  changed = CURRENT_TIMESTAMP,
-  signer = $2
-WHERE id = $3`
-
-	deleteImportDataSQL = `SELECT import.del_import($1)`
-
-	deleteImportTrackSQL = `
-DELETE FROM import.track_imports WHERE import_id = $1`
-
-	logDecisionSQL = `
-INSERT INTO import.import_logs (import_id, msg) VALUES ($1, $2)`
-)
-
 func reviewImports(req *http.Request) (mw.JSONResult, error) {
 
 	rs := *mw.JSONInput(req).(*[]models.Review)
@@ -617,11 +595,9 @@
 
 	results := make([]reviewResult, len(rs))
 
-	conn := mw.JSONConn(req)
-
 	for i := range rs {
 		rev := &rs[i]
-		msg, err := decideImport(req, conn, rev.ID, string(rev.State))
+		msg, err := decideImport(req, rev.ID, string(rev.State))
 		var errString string
 		if err != nil {
 			errString = err.Error()
@@ -643,7 +619,7 @@
 	state := vars["state"]
 
 	var msg string
-	if msg, err = decideImport(req, mw.JSONConn(req), id, state); err != nil {
+	if msg, err = decideImport(req, id, state); err != nil {
 		return
 	}
 
@@ -659,70 +635,21 @@
 
 func decideImport(
 	req *http.Request,
-	conn *sql.Conn,
 	id int64,
 	state string,
 ) (message string, err error) {
 	ctx := req.Context()
-	var tx *sql.Tx
-	if tx, err = conn.BeginTx(ctx, nil); err != nil {
-		return
-	}
-	defer tx.Rollback()
 
-	var pending bool
-	var kind string
+	accepted := state == "accepted"
 
-	err = tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind)
-	switch {
-	case err == sql.ErrNoRows:
-		err = mw.JSONError{
-			Code:    http.StatusNotFound,
-			Message: fmt.Sprintf("cannot find import #%d", id),
-		}
-		return
-	case err != nil:
-		return
-	case !pending:
-		err = mw.JSONError{
-			Code:    http.StatusConflict,
-			Message: fmt.Sprintf("import #%d is not pending", id),
-		}
-		return
-	}
+	session, _ := auth.GetSession(req)
+	reviewer := session.User
 
-	if state == "accepted" {
-		if jc := imports.FindJobCreator(imports.JobKind(kind)); jc != nil {
-			if err = jc.StageDone(ctx, tx, id); err != nil {
-				return
-			}
-		}
-
-	} else {
-		if _, err = tx.ExecContext(ctx, deleteImportDataSQL, id); err != nil {
-			return
+	if err = imports.DecideImport(ctx, id, accepted, reviewer); err != nil {
+		err = mw.JSONError{
+			Code:    http.StatusBadRequest,
+			Message: err.Error(),
 		}
-	}
-
-	// Remove the import track
-	if _, err = tx.ExecContext(ctx, deleteImportTrackSQL, id); err != nil {
-		return
-	}
-
-	// Log the decision and set the final state.
-	session, _ := auth.GetSession(req)
-	who := session.User
-
-	if _, err = tx.ExecContext(ctx, logDecisionSQL, id,
-		fmt.Sprintf("User '%s' %s import %d.", who, state, id)); err != nil {
-		return
-	}
-
-	if _, err = tx.ExecContext(ctx, reviewSQL, state, who, id); err != nil {
-		return
-	}
-
-	if err = tx.Commit(); err != nil {
 		return
 	}
 
--- a/pkg/controllers/routes.go	Thu Mar 19 17:43:23 2020 +0100
+++ b/pkg/controllers/routes.go	Fri Mar 20 15:20:55 2020 +0100
@@ -313,6 +313,7 @@
 	api.Handle("/imports", waterwayAdmin(&mw.JSONHandler{
 		Input:  func(*http.Request) interface{} { return &[]models.Review{} },
 		Handle: reviewImports,
+		NoConn: true,
 	})).Methods(http.MethodPatch)
 
 	api.Handle("/imports/{id:[0-9]+}", waterwayAdmin(&mw.JSONHandler{
@@ -323,6 +324,7 @@
 	api.Handle("/imports/{id:[0-9]+}/{state:(?:accepted|declined)}",
 		waterwayAdmin(&mw.JSONHandler{
 			Handle: reviewImport,
+			NoConn: true,
 		})).Methods(http.MethodPut)
 
 	// Handler to serve data to the client.
--- a/pkg/imports/agm.go	Thu Mar 19 17:43:23 2020 +0100
+++ b/pkg/imports/agm.go	Fri Mar 20 15:20:55 2020 +0100
@@ -92,6 +92,7 @@
 	ctx context.Context,
 	tx *sql.Tx,
 	id int64,
+	_ Feedback,
 ) error {
 	_, err := tx.ExecContext(ctx, agmStageDoneDeleteSQL, id)
 	if err == nil {
--- a/pkg/imports/bn.go	Thu Mar 19 17:43:23 2020 +0100
+++ b/pkg/imports/bn.go	Fri Mar 20 15:20:55 2020 +0100
@@ -188,6 +188,7 @@
 	ctx context.Context,
 	tx *sql.Tx,
 	id int64,
+	_ Feedback,
 ) error {
 	_, err := tx.ExecContext(ctx, bnStageDoneDeleteSQL, id)
 	if err == nil {
--- a/pkg/imports/dma.go	Thu Mar 19 17:43:23 2020 +0100
+++ b/pkg/imports/dma.go	Fri Mar 20 15:20:55 2020 +0100
@@ -66,7 +66,7 @@
 }
 
 // StageDone is a NOP for distance marks imports.
-func (dmaJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
+func (dmaJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error {
 	return nil
 }
 
--- a/pkg/imports/dmv.go	Thu Mar 19 17:43:23 2020 +0100
+++ b/pkg/imports/dmv.go	Fri Mar 20 15:20:55 2020 +0100
@@ -59,7 +59,7 @@
 }
 
 // StageDone does nothing as there is no staging for distance marks virtual.
-func (dmvJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil }
+func (dmvJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error { return nil }
 
 // CleanUp does nothing as there is nothing to cleanup with distance marks virtual.
 func (*DistanceMarksVirtual) CleanUp() error { return nil }
--- a/pkg/imports/dsec.go	Thu Mar 19 17:43:23 2020 +0100
+++ b/pkg/imports/dsec.go	Fri Mar 20 15:20:55 2020 +0100
@@ -63,6 +63,7 @@
 	ctx context.Context,
 	tx *sql.Tx,
 	id int64,
+	_ Feedback,
 ) error {
 	res, err := tx.ExecContext(ctx, dsecStageDoneSQL, id)
 	if err != nil {
--- a/pkg/imports/dsr.go	Thu Mar 19 17:43:23 2020 +0100
+++ b/pkg/imports/dsr.go	Fri Mar 20 15:20:55 2020 +0100
@@ -73,6 +73,7 @@
 	ctx context.Context,
 	tx *sql.Tx,
 	id int64,
+	_ Feedback,
 ) error {
 	result, err := tx.ExecContext(ctx, dsrStageDoneSQL, id)
 	if err != nil {
--- a/pkg/imports/dst.go	Thu Mar 19 17:43:23 2020 +0100
+++ b/pkg/imports/dst.go	Fri Mar 20 15:20:55 2020 +0100
@@ -62,6 +62,7 @@
 	ctx context.Context,
 	tx *sql.Tx,
 	id int64,
+	_ Feedback,
 ) error {
 	_, err := tx.ExecContext(ctx, dstStageDoneSQL, id)
 	return err
--- a/pkg/imports/fa.go	Thu Mar 19 17:43:23 2020 +0100
+++ b/pkg/imports/fa.go	Fri Mar 20 15:20:55 2020 +0100
@@ -189,7 +189,7 @@
 
 // StageDone moves the imported fairway availablities out of the staging area.
 // Currently doing nothing.
-func (faJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
+func (faJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error {
 	return nil
 }
 
--- a/pkg/imports/fd.go	Thu Mar 19 17:43:23 2020 +0100
+++ b/pkg/imports/fd.go	Fri Mar 20 15:20:55 2020 +0100
@@ -102,17 +102,10 @@
 	ctx context.Context,
 	tx *sql.Tx,
 	id int64,
+	_ Feedback,
 ) error {
-	// We only want to delete the features of the same LOS
-	// as the imported.
-	var los int64
-	if err := tx.QueryRowContext(ctx, fdFindLOSSQL, id).Scan(&los); err != nil {
-		// Should always return a row because import will exit as unchanged
-		// if no new features were found while inserting.
-		return err
-	}
 	// Delete the old features.
-	if _, err := tx.ExecContext(ctx, deleteFairwayDimensionSQL, los); err != nil {
+	if _, err := tx.ExecContext(ctx, deleteFairwayDimensionSQL, id); err != nil {
 		return err
 	}
 
@@ -134,14 +127,6 @@
 }
 
 const (
-	fdFindLOSSQL = `
-SELECT level_of_service FROM waterway.fairway_dimensions
-WHERE id IN (
-  SELECT key FROM import.track_imports
-  WHERE import_id = $1 AND
-        relation = 'waterway.fairway_dimensions'::regclass)
-LIMIT 1`
-
 	fdStageDoneSQL = `
 UPDATE waterway.fairway_dimensions SET staging_done = true
 WHERE id IN (
@@ -150,15 +135,15 @@
         relation = 'waterway.fairway_dimensions'::regclass)`
 
 	deleteFairwayDimensionSQL = `
-WITH resp AS (
-  SELECT users.current_user_area_utm() AS a
-)
 DELETE FROM waterway.fairway_dimensions
-WHERE (pg_has_role('sys_admin', 'MEMBER')
-    OR ST_Covers((SELECT a FROM resp),
-      ST_Transform(area::geometry, (SELECT ST_SRID(a) FROM resp))))
-  AND staging_done
-  AND level_of_service = $1::smallint`
+WHERE staging_done
+  AND level_of_service = (
+    SELECT DISTINCT level_of_service FROM waterway.fairway_dimensions
+      WHERE id IN (
+        SELECT key FROM import.track_imports
+          WHERE import_id = $1
+            AND relation = 'waterway.fairway_dimensions'::regclass))
+`
 
 	// The ST_MakeValid and ST_Buffer below are a workarround to
 	// avoid errors due to reprojection.
@@ -174,16 +159,22 @@
   min_depth,
   date_info,
   source_organization)
-SELECT dmp.geom, $3, $4, $5, $6, $7, $8
-  FROM ST_GeomFromWKB($1, $2::integer) AS new_fd (new_fd),
-    ST_Dump(ST_CollectionExtract(ST_MakeValid(ST_Transform(
+SELECT
+    ST_Multi(ST_CollectionExtract(ST_MakeValid(ST_Transform(
       CASE WHEN pg_has_role('sys_admin', 'MEMBER')
+          OR ST_Covers((SELECT a FROM resp),
+            ST_Transform(new_fd, (SELECT ST_SRID(a) FROM resp)))
         THEN new_fd
         ELSE ST_Intersection(
             (SELECT ST_Buffer(a, -0.0001) FROM resp),
             ST_MakeValid(ST_Transform(new_fd, (SELECT ST_SRID(a) FROM resp))))
         END,
-      4326)), 3)) AS dmp
+      4326)), 3)),
+    $3, $4, $5, $6, $7, $8
+  FROM ST_GeomFromWKB($1, $2::integer) AS new_fd (new_fd)
+  WHERE pg_has_role('sys_admin', 'MEMBER')
+    OR ST_Intersects((SELECT a FROM resp),
+      ST_MakeValid(ST_Transform(new_fd, (SELECT ST_SRID(a) FROM resp))))
 RETURNING id,
   ST_X(ST_Centroid(area::geometry)),
   ST_Y(ST_Centroid(area::geometry))
@@ -355,9 +346,7 @@
 	}
 
 	if outside > 0 {
-		feedback.Info(
-			"Features outside responsibility area or no valid polygon: %d",
-			outside)
+		feedback.Info("Features outside responsibility area: %d", outside)
 	}
 
 	if features == 0 {
--- a/pkg/imports/gm.go	Thu Mar 19 17:43:23 2020 +0100
+++ b/pkg/imports/gm.go	Fri Mar 20 15:20:55 2020 +0100
@@ -147,7 +147,7 @@
 
 // StageDone moves the imported gauge measurement out of the staging area.
 // Currently doing nothing.
-func (gmJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
+func (gmJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error {
 	return nil
 }
 
--- a/pkg/imports/isr.go	Thu Mar 19 17:43:23 2020 +0100
+++ b/pkg/imports/isr.go	Fri Mar 20 15:20:55 2020 +0100
@@ -41,7 +41,7 @@
 
 func (isrJobCreator) Create() Job { return new(IsoRefresh) }
 
-func (isrJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
+func (isrJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error {
 	return nil
 }
 
--- a/pkg/imports/queue.go	Thu Mar 19 17:43:23 2020 +0100
+++ b/pkg/imports/queue.go	Fri Mar 20 15:20:55 2020 +0100
@@ -86,7 +86,7 @@
 		// StageDone is called if an import is positively reviewed
 		// (state = accepted). This can be used to finalize the imported
 		// data to move it e.g from the staging area.
-		StageDone(context.Context, *sql.Tx, int64) error
+		StageDone(context.Context, *sql.Tx, int64, Feedback) error
 		// AutoAccept indicates that imports of this kind
 		// don't need a review.
 		AutoAccept() bool
@@ -429,6 +429,136 @@
 		data)
 }
 
+const (
+	isPendingSQL = `
+SELECT
+	state = 'pending'::import_state,
+	kind,
+	username
+FROM import.imports
+WHERE id = $1`
+
+	reviewSQL = `
+UPDATE import.imports SET
+  state = $1::import_state,
+  changed = CURRENT_TIMESTAMP,
+  signer = $2
+WHERE id = $3`
+
+	deleteImportDataSQL = `SELECT import.del_import($1)`
+
+	deleteImportTrackSQL = `
+DELETE FROM import.track_imports WHERE import_id = $1`
+
+	logDecisionSQL = `
+INSERT INTO import.import_logs (import_id, msg) VALUES ($1, $2)`
+)
+
+func (q *importQueue) decideImportTx(
+	ctx context.Context,
+	tx *sql.Tx,
+	id int64,
+	accepted bool,
+	reviewer string,
+) error {
+	var (
+		pending bool
+		kind    string
+		user    string
+	)
+
+	switch err := tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind, &user); {
+	case err == sql.ErrNoRows:
+		return fmt.Errorf("cannot find import #%d", id)
+	case err != nil:
+		return err
+	case !pending:
+		return fmt.Errorf("#%d is not pending", id)
+	}
+
+	jc := q.jobCreator(JobKind(kind))
+	if jc == nil {
+		return fmt.Errorf("no job creator for kind '%s'", kind)
+	}
+
+	if err := auth.RunAs(ctx, user, func(conn *sql.Conn) error {
+		txUser, err := conn.BeginTx(ctx, nil)
+		if err != nil {
+			return err
+		}
+		defer txUser.Rollback()
+
+		if accepted {
+			feedback := logFeedback(id)
+			err = jc.StageDone(ctx, txUser, id, feedback)
+		} else {
+			_, err = txUser.ExecContext(ctx, deleteImportDataSQL, id)
+		}
+
+		if err == nil {
+			err = txUser.Commit()
+		}
+
+		return err
+	}); err != nil {
+		return err
+	}
+
+	// Remove the import track
+	if _, err := tx.ExecContext(ctx, deleteImportTrackSQL, id); err != nil {
+		return err
+	}
+
+	var state string
+	if accepted {
+		state = "accepted"
+	} else {
+		state = "declined"
+	}
+
+	logMsg := fmt.Sprintf("User '%s' %s import %d.", reviewer, state, id)
+
+	if _, err := tx.ExecContext(ctx, logDecisionSQL, id, logMsg); err != nil {
+		return err
+	}
+
+	_, err := tx.ExecContext(ctx, reviewSQL, state, reviewer, id)
+	return err
+}
+
+func (q *importQueue) decideImport(
+	ctx context.Context,
+	id int64,
+	accepted bool,
+	reviewer string,
+) error {
+	if ctx == nil {
+		ctx = context.Background()
+	}
+
+	return auth.RunAs(ctx, reviewer, func(conn *sql.Conn) error {
+		tx, err := conn.BeginTx(ctx, nil)
+		if err != nil {
+			return err
+		}
+		defer tx.Rollback()
+		err = q.decideImportTx(ctx, tx, id, accepted, reviewer)
+		if err == nil {
+			err = tx.Commit()
+		}
+		return err
+	})
+}
+
+func DecideImport(
+	ctx context.Context,
+	id int64,
+	accepted bool,
+	reviewer string,
+) error {
+	return iqueue.decideImport(ctx, id, accepted, reviewer)
+}
+
 type logFeedback int64
 
 func (lf logFeedback) log(kind, format string, args ...interface{}) {
--- a/pkg/imports/sec.go	Thu Mar 19 17:43:23 2020 +0100
+++ b/pkg/imports/sec.go	Fri Mar 20 15:20:55 2020 +0100
@@ -130,6 +130,7 @@
 	ctx context.Context,
 	tx *sql.Tx,
 	id int64,
+	_ Feedback,
 ) error {
 	if _, err := tx.ExecContext(ctx, secDeleteSQL, id); err != nil {
 		return err
--- a/pkg/imports/sr.go	Thu Mar 19 17:43:23 2020 +0100
+++ b/pkg/imports/sr.go	Fri Mar 20 15:20:55 2020 +0100
@@ -113,6 +113,7 @@
 	ctx context.Context,
 	tx *sql.Tx,
 	id int64,
+	_ Feedback,
 ) error {
 	_, err := tx.ExecContext(ctx, srStageDoneSQL, id)
 	return err
--- a/pkg/imports/st.go	Thu Mar 19 17:43:23 2020 +0100
+++ b/pkg/imports/st.go	Fri Mar 20 15:20:55 2020 +0100
@@ -130,6 +130,7 @@
 	ctx context.Context,
 	tx *sql.Tx,
 	id int64,
+	_ Feedback,
 ) error {
 	if _, err := tx.ExecContext(ctx, stDeleteSQL, id); err != nil {
 		return err
--- a/pkg/imports/stsh.go	Thu Mar 19 17:43:23 2020 +0100
+++ b/pkg/imports/stsh.go	Fri Mar 20 15:20:55 2020 +0100
@@ -79,8 +79,9 @@
 	ctx context.Context,
 	tx *sql.Tx,
 	id int64,
+	feedback Feedback,
 ) error {
-	return stJobCreator{}.StageDone(ctx, tx, id)
+	return stJobCreator{}.StageDone(ctx, tx, id, feedback)
 }
 
 // CleanUp removes the folder with the uploaded shape file.
--- a/pkg/imports/ubn.go	Thu Mar 19 17:43:23 2020 +0100
+++ b/pkg/imports/ubn.go	Fri Mar 20 15:20:55 2020 +0100
@@ -55,9 +55,10 @@
 	ctx context.Context,
 	tx *sql.Tx,
 	id int64,
+	feedback Feedback,
 ) error {
 	// Same as normal bottleneck import.
-	return bnJobCreator{}.StageDone(ctx, tx, id)
+	return bnJobCreator{}.StageDone(ctx, tx, id, feedback)
 }
 
 // CleanUp of a uploaded bottleneck import removes the temp dir.
--- a/pkg/imports/ufa.go	Thu Mar 19 17:43:23 2020 +0100
+++ b/pkg/imports/ufa.go	Fri Mar 20 15:20:55 2020 +0100
@@ -51,7 +51,7 @@
 
 func (ufaJobCreator) AutoAccept() bool { return true }
 
-func (ufaJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
+func (ufaJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error {
 	return nil
 }
 
--- a/pkg/imports/ugm.go	Thu Mar 19 17:43:23 2020 +0100
+++ b/pkg/imports/ugm.go	Fri Mar 20 15:20:55 2020 +0100
@@ -45,7 +45,7 @@
 
 func (ugmJobCreator) AutoAccept() bool { return true }
 
-func (ugmJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil }
+func (ugmJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error { return nil }
 
 // CleanUp removes the temporary files from the filesystem.
 func (ugm *UploadedGaugeMeasurement) CleanUp() error { return os.RemoveAll(ugm.Dir) }
--- a/pkg/imports/wa.go	Thu Mar 19 17:43:23 2020 +0100
+++ b/pkg/imports/wa.go	Fri Mar 20 15:20:55 2020 +0100
@@ -70,7 +70,7 @@
 }
 
 // StageDone is a NOP for waterway area imports.
-func (waJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
+func (waJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error {
 	return nil
 }
 
--- a/pkg/imports/wfsjob.go	Thu Mar 19 17:43:23 2020 +0100
+++ b/pkg/imports/wfsjob.go	Fri Mar 20 15:20:55 2020 +0100
@@ -49,7 +49,7 @@
 
 		newConsumer func(context.Context, *sql.Conn, Feedback) (WFSFeatureConsumer, error)
 
-		stageDone func(context.Context, *sql.Tx, int64) error
+		stageDone func(context.Context, *sql.Tx, int64, Feedback) error
 	}
 
 	WFSFeatureJob struct {
@@ -86,11 +86,16 @@
 	return wfjc.stageDone == nil
 }
 
-func (wfjc *WFSFeatureJobCreator) StageDone(ctx context.Context, tx *sql.Tx, id int64) error {
+func (wfjc *WFSFeatureJobCreator) StageDone(
+	ctx context.Context,
+	tx *sql.Tx,
+	id int64,
+	feedback Feedback,
+) error {
 	if wfjc.stageDone == nil {
 		return nil
 	}
-	return wfjc.stageDone(ctx, tx, id)
+	return wfjc.stageDone(ctx, tx, id, feedback)
 }
 
 func (wfjc *WFSFeatureJobCreator) Create() Job {
--- a/pkg/imports/wg.go	Thu Mar 19 17:43:23 2020 +0100
+++ b/pkg/imports/wg.go	Fri Mar 20 15:20:55 2020 +0100
@@ -66,7 +66,7 @@
 }
 
 // StageDone does nothing as there is no staging for gauges.
-func (wgJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil }
+func (wgJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error { return nil }
 
 // CleanUp does nothing as there is nothing to cleanup with gauges.
 func (*WaterwayGauge) CleanUp() error { return nil }
--- a/pkg/imports/wp.go	Thu Mar 19 17:43:23 2020 +0100
+++ b/pkg/imports/wp.go	Fri Mar 20 15:20:55 2020 +0100
@@ -153,6 +153,7 @@
 	ctx context.Context,
 	tx *sql.Tx,
 	id int64,
+	_ Feedback,
 ) error {
 	_, err := tx.ExecContext(ctx, wpStageDoneSQL, id)
 	return err
--- a/pkg/pgxutils/errors.go	Thu Mar 19 17:43:23 2020 +0100
+++ b/pkg/pgxutils/errors.go	Fri Mar 20 15:20:55 2020 +0100
@@ -158,6 +158,12 @@
 					c = http.StatusConflict
 					return
 				}
+			case "fairway_dimensions":
+				switch err.ConstraintName {
+				case "fairway_dimensions_area_check":
+					m = "Geometry could not be stored as valid, non-empty polygon"
+					return
+				}
 			}
 		case "internal":
 			switch err.TableName {
--- a/schema/auth.sql	Thu Mar 19 17:43:23 2020 +0100
+++ b/schema/auth.sql	Fri Mar 20 15:20:55 2020 +0100
@@ -134,58 +134,87 @@
 
 -- Staging area
 
--- In many cases it is more efficient to check for "staging_done" to
--- prevent the more expensive checks for read only access (which is
--- allowed for all users, when staging is done).
-CREATE POLICY same_country ON waterway.gauge_measurements
-    FOR ALL TO waterway_admin
-    USING (staging_done
-           OR (location).country_code =
-               (SELECT country FROM users.list_users
-                WHERE username = current_user))
-    WITH CHECK ((location).country_code =
-                 (SELECT country FROM users.list_users
-                  WHERE username = current_user));
-
-CREATE POLICY same_country ON waterway.waterway_profiles
-    FOR ALL TO waterway_admin
-    USING (staging_done
-           OR (location).country_code =
-               (SELECT country FROM users.list_users
-                WHERE username = current_user))
-    WITH CHECK ((location).country_code =
-                (SELECT country FROM users.list_users
-                 WHERE username = current_user));
+DO LANGUAGE plpgsql
+$do$
+DECLARE
+    the_table varchar;
+    condition CONSTANT text = $$
+        (location).country_code =
+            (SELECT country FROM users.list_users
+                WHERE username = current_user)
+        $$;
+BEGIN
+    FOREACH the_table IN ARRAY ARRAY[
+        'gauge_measurements',
+        'waterway_profiles']
+    LOOP
+        EXECUTE format($$
+            CREATE POLICY same_country_insert ON waterway.%I
+                FOR INSERT TO waterway_admin
+                WITH CHECK (%s)
+            $$, the_table, condition);
+        -- In many cases it is more efficient to check for "staging_done" to
+        -- prevent the more expensive checks for read only access (which is
+        -- allowed for all users, when staging is done).
+        EXECUTE format($$
+            CREATE POLICY same_country_select ON waterway.%I
+                FOR SELECT TO waterway_admin
+                USING (staging_done OR %s)
+            $$, the_table, condition);
+        EXECUTE format($$
+            CREATE POLICY same_country_update ON waterway.%I
+                FOR UPDATE TO waterway_admin
+                USING (%s)
+            $$, the_table, condition);
+        EXECUTE format($$
+            CREATE POLICY same_country_delete ON waterway.%I
+                FOR DELETE TO waterway_admin
+                USING (%s)
+            $$, the_table, condition);
+    END LOOP;
+END;
+$do$;
 
-CREATE POLICY responsibility_area ON waterway.bottlenecks
-    FOR ALL TO waterway_admin
-    USING (staging_done
-        OR (SELECT ST_Covers(a,
-                ST_Transform(CAST(area AS geometry), ST_SRID(a)))
-            FROM users.current_user_area_utm() AS a (a)))
-    WITH CHECK ((SELECT ST_Covers(a,
-            ST_Transform(CAST(area AS geometry), ST_SRID(a)))
-        FROM users.current_user_area_utm() AS a (a)));
-
-CREATE POLICY responsibility_area ON waterway.sounding_results
-    FOR ALL TO waterway_admin
-    USING (staging_done
-        OR (SELECT ST_Covers(a,
-                ST_Transform(CAST(area AS geometry), ST_SRID(a)))
-            FROM users.current_user_area_utm() AS a (a)))
-    WITH CHECK ((SELECT ST_Covers(a,
-            ST_Transform(CAST(area AS geometry), ST_SRID(a)))
-        FROM users.current_user_area_utm() AS a (a)));
-
-CREATE POLICY responsibility_area ON waterway.fairway_dimensions
-    FOR ALL TO waterway_admin
-    USING (staging_done
-        OR (SELECT ST_Covers(a,
-                ST_Transform(CAST(area AS geometry), ST_SRID(a)))
-            FROM users.current_user_area_utm() AS a (a)))
-    WITH CHECK ((SELECT ST_Covers(a,
-            ST_Transform(CAST(area AS geometry), ST_SRID(a)))
-        FROM users.current_user_area_utm() AS a (a)));
+DO LANGUAGE plpgsql
+$do$
+DECLARE
+    the_table varchar;
+    condition CONSTANT text = $$
+        (SELECT ST_Covers(a, ST_Transform(CAST(area AS geometry), ST_SRID(a)))
+            FROM users.current_user_area_utm() AS a (a))
+        $$;
+BEGIN
+    FOREACH the_table IN ARRAY ARRAY[
+        'fairway_dimensions',
+        'bottlenecks',
+        'sounding_results']
+    LOOP
+        EXECUTE format($$
+            CREATE POLICY responsibility_area_insert ON waterway.%I
+                FOR INSERT TO waterway_admin
+                WITH CHECK (%s)
+            $$, the_table, condition);
+        -- In many cases it is more efficient to check for "staging_done" to
+        -- prevent the more expensive checks for read only access (which is
+        -- allowed for all users, when staging is done).
+        EXECUTE format($$
+            CREATE POLICY responsibility_area_select ON waterway.%I
+                FOR SELECT TO waterway_admin
+                USING (staging_done OR %s)
+            $$, the_table, condition);
+        EXECUTE format($$
+            CREATE POLICY responsibility_area_update ON waterway.%I
+                FOR UPDATE TO waterway_admin
+                USING (%s)
+            $$, the_table, condition);
+        EXECUTE format($$
+            CREATE POLICY responsibility_area_delete ON waterway.%I
+                FOR DELETE TO waterway_admin
+                USING (%s)
+            $$, the_table, condition);
+    END LOOP;
+END;
+$do$;
 
 -- In the case of sections differentiating between read and write
 -- access is not neccessary: the country code based access check is
--- a/schema/auth_tests.sql	Thu Mar 19 17:43:23 2020 +0100
+++ b/schema/auth_tests.sql	Fri Mar 20 15:20:55 2020 +0100
@@ -102,6 +102,41 @@
     42501, NULL,
     'Waterway admin cannot insert data outside his region');
 
+-- Ensure a USING clause prevents access in an UPDATE
+SELECT is_empty($$
+    WITH a AS (SELECT users.current_user_area_utm() AS a)
+    UPDATE waterway.bottlenecks
+        SET objnam = 'Now it''s mine',
+            area = ST_geomfromtext(
+                'MULTIPOLYGON(((0 0, 0 1, 1 1, 1 0, 0 0)))', 4326)
+        WHERE bottleneck_id = 'testbottleneck3'
+        RETURNING *
+    $$,
+    'Waterway admin cannot move data from outside his region inside');
+
+-- Ensure a WITH CHECK or USING clause prevents writing such rows
+SELECT throws_ok($$
+    WITH a AS (SELECT users.current_user_area_utm() AS a)
+    UPDATE waterway.bottlenecks
+        SET objnam = 'Give-away',
+            area = ST_geomfromtext(
+                'MULTIPOLYGON(((1 0, 1 1, 2 1, 2 0, 1 0)))', 4326)
+        WHERE bottleneck_id = 'testbottleneck2'
+        RETURNING *
+    $$,
+    42501, NULL,
+    'Waterway admin cannot move data from inside his region outside');
+
+SELECT is_empty($$
+    WITH a AS (SELECT users.current_user_area_utm() AS a)
+    DELETE FROM waterway.bottlenecks
+        WHERE NOT ST_Covers((SELECT a FROM a),
+            ST_Transform(
+                CAST(area AS geometry), ST_SRID((SELECT a FROM a))))
+        RETURNING *
+    $$,
+    'Waterway admin cannot delete data outside his region');
+
 -- template management
 SELECT lives_ok($$
     INSERT INTO users.templates (template_name, template_data, country)
--- a/schema/gemma.sql	Thu Mar 19 17:43:23 2020 +0100
+++ b/schema/gemma.sql	Fri Mar 20 15:20:55 2020 +0100
@@ -707,8 +707,9 @@
 
     CREATE TABLE fairway_dimensions (
         id int PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
-        area geography(POLYGON, 4326) NOT NULL
-            CHECK(ST_IsValid(CAST(area AS geometry))),
+        area geography(MULTIPOLYGON, 4326) NOT NULL
+            CHECK(ST_IsValid(CAST(area AS geometry))
+                AND NOT ST_IsEmpty(CAST(area AS geometry))),
         level_of_service smallint NOT NULL REFERENCES levels_of_service,
         min_width smallint NOT NULL,
         max_width smallint NOT NULL,
--- a/schema/run_tests.sh	Thu Mar 19 17:43:23 2020 +0100
+++ b/schema/run_tests.sh	Fri Mar 20 15:20:55 2020 +0100
@@ -80,7 +80,7 @@
     -c 'SET client_min_messages TO WARNING' \
     -c "DROP ROLE IF EXISTS $TEST_ROLES" \
     -f "$BASEDIR"/tap_tests_data.sql \
-    -c "SELECT plan(82 + (
+    -c "SELECT plan(85 + (
             SELECT count(*)::int
                 FROM information_schema.tables
                 WHERE table_schema = 'waterway'))" \
--- a/schema/tap_tests_data.sql	Thu Mar 19 17:43:23 2020 +0100
+++ b/schema/tap_tests_data.sql	Fri Mar 20 15:20:55 2020 +0100
@@ -102,6 +102,20 @@
         ST_geomfromtext('MULTIPOLYGON(((0 0, 0 1, 1 1, 1 0, 0 0)))', 4326),
         'AT', 'AT', 'AT',
         1, 'depth', current_timestamp, 'testorganization', true
+    ), (
+        'testbottleneck3',
+        isrsrange(('RO', 'XXX', '00001', '00000', 0)::isrs,
+            ('RO', 'XXX', '00001', '00000', 2)::isrs),
+        ST_geomfromtext('MULTIPOLYGON(((1 0, 1 1, 2 1, 2 0, 1 0)))', 4326),
+        'RO', 'RO', 'RO',
+        1, 'depth', current_timestamp, 'testorganization', true
+    ), (
+        'testbottleneck4',
+        isrsrange(('RO', 'XXX', '00001', '00000', 0)::isrs,
+            ('RO', 'XXX', '00001', '00000', 2)::isrs),
+        ST_geomfromtext('MULTIPOLYGON(((1 0, 1 1, 2 1, 2 0, 1 0)))', 4326),
+        'RO', 'RO', 'RO',
+        1, 'depth', current_timestamp, 'testorganization', true
     ))
 INSERT INTO waterway.bottlenecks (
     gauge_location, validity,
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/schema/updates/1427/01.fix_rls_policies.sql	Fri Mar 20 15:20:55 2020 +0100
@@ -0,0 +1,86 @@
+DROP POLICY same_country ON waterway.gauge_measurements;
+DROP POLICY same_country ON waterway.waterway_profiles;
+
+DO LANGUAGE plpgsql
+$do$
+DECLARE
+    the_table varchar;
+    condition CONSTANT text = $$
+        (location).country_code =
+            (SELECT country FROM users.list_users
+                WHERE username = current_user)
+        $$;
+BEGIN
+    FOREACH the_table IN ARRAY ARRAY[
+        'gauge_measurements',
+        'waterway_profiles']
+    LOOP
+        EXECUTE format($$
+            CREATE POLICY same_country_insert ON waterway.%I
+                FOR INSERT TO waterway_admin
+                WITH CHECK (%s)
+            $$, the_table, condition);
+        EXECUTE format($$
+            CREATE POLICY same_country_select ON waterway.%I
+                FOR SELECT TO waterway_admin
+                USING (staging_done OR %s)
+            $$, the_table, condition);
+        EXECUTE format($$
+            CREATE POLICY same_country_update ON waterway.%I
+                FOR UPDATE TO waterway_admin
+                USING (%s)
+            $$, the_table, condition);
+        EXECUTE format($$
+            CREATE POLICY same_country_delete ON waterway.%I
+                FOR DELETE TO waterway_admin
+                USING (%s)
+            $$, the_table, condition);
+    END LOOP;
+END;
+$do$;
+
+
+DROP POLICY responsibility_area ON waterway.bottlenecks;
+DROP POLICY responsibility_area ON waterway.sounding_results;
+DROP POLICY responsibility_area ON waterway.fairway_dimensions;
+
+DO LANGUAGE plpgsql
+$do$
+DECLARE
+    the_table varchar;
+    condition CONSTANT text = $$
+        (SELECT ST_Covers(a, ST_Transform(CAST(area AS geometry), ST_SRID(a)))
+            FROM users.current_user_area_utm() AS a (a))
+        $$;
+BEGIN
+    FOREACH the_table IN ARRAY ARRAY[
+        'fairway_dimensions',
+        'bottlenecks',
+        'sounding_results']
+    LOOP
+        EXECUTE format($$
+            CREATE POLICY responsibility_area_insert ON waterway.%I
+                FOR INSERT TO waterway_admin
+                WITH CHECK (%s)
+            $$, the_table, condition);
+        -- In many cases it is more efficient to check for "staging_done" to
+        -- prevent the more expensive checks for read only access (which is
+        -- allowed for all users, when staging is done).
+        EXECUTE format($$
+            CREATE POLICY responsibility_area_select ON waterway.%I
+                FOR SELECT TO waterway_admin
+                USING (staging_done OR %s)
+            $$, the_table, condition);
+        EXECUTE format($$
+            CREATE POLICY responsibility_area_update ON waterway.%I
+                FOR UPDATE TO waterway_admin
+                USING (%s)
+            $$, the_table, condition);
+        EXECUTE format($$
+            CREATE POLICY responsibility_area_delete ON waterway.%I
+                FOR DELETE TO waterway_admin
+                USING (%s)
+            $$, the_table, condition);
+    END LOOP;
+END;
+$do$;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/schema/updates/1428/01.fairway_dimensions_as_multipolygon.sql	Fri Mar 20 15:20:55 2020 +0100
@@ -0,0 +1,42 @@
+-- Cannot alter type of a column used in a policy definition
+DROP POLICY responsibility_area_insert ON waterway.fairway_dimensions;
+DROP POLICY responsibility_area_select ON waterway.fairway_dimensions;
+DROP POLICY responsibility_area_update ON waterway.fairway_dimensions;
+DROP POLICY responsibility_area_delete ON waterway.fairway_dimensions;
+
+ALTER TABLE waterway.fairway_dimensions
+    ALTER area TYPE geography(MULTIPOLYGON, 4326)
+        USING ST_Multi(CAST(area AS geometry));
+
+-- Re-create policies
+DO LANGUAGE plpgsql
+$do$
+DECLARE
+    the_table CONSTANT varchar = 'fairway_dimensions';
+    condition CONSTANT text = $$
+        (SELECT ST_Covers(a, ST_Transform(CAST(area AS geometry), ST_SRID(a)))
+            FROM users.current_user_area_utm() AS a (a))
+        $$;
+BEGIN
+    EXECUTE format($$
+        CREATE POLICY responsibility_area_insert ON waterway.%I
+            FOR INSERT TO waterway_admin
+            WITH CHECK (%s)
+        $$, the_table, condition);
+    EXECUTE format($$
+        CREATE POLICY responsibility_area_select ON waterway.%I
+            FOR SELECT TO waterway_admin
+            USING (staging_done OR %s)
+        $$, the_table, condition);
+    EXECUTE format($$
+        CREATE POLICY responsibility_area_update ON waterway.%I
+            FOR UPDATE TO waterway_admin
+            USING (%s)
+        $$, the_table, condition);
+    EXECUTE format($$
+        CREATE POLICY responsibility_area_delete ON waterway.%I
+            FOR DELETE TO waterway_admin
+            USING (%s)
+        $$, the_table, condition);
+END;
+$do$;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/schema/updates/1429/01.disallow_empty_fairway_dimensions.sql	Fri Mar 20 15:20:55 2020 +0100
@@ -0,0 +1,4 @@
+ALTER TABLE waterway.fairway_dimensions
+    DROP CONSTRAINT fairway_dimensions_area_check,
+    ADD CHECK(ST_IsValid(CAST(area AS geometry))
+        AND NOT ST_IsEmpty(CAST(area AS geometry)))
--- a/schema/version.sql	Thu Mar 19 17:43:23 2020 +0100
+++ b/schema/version.sql	Fri Mar 20 15:20:55 2020 +0100
@@ -1,1 +1,1 @@
-INSERT INTO gemma_schema_version(version) VALUES (1426);
+INSERT INTO gemma_schema_version(version) VALUES (1429);