changeset 5078:81fb077cd3f8 time-sliding

merge default into time-slinding branch
author Fadi Abbud <fadi.abbud@intevation.de>
date Mon, 16 Mar 2020 12:40:58 +0100
parents c9354fcf6050 (current diff) e4ab338e7ba9 (diff)
children 32948cba9824
files
diffstat 21 files changed, 254 insertions(+), 344 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/dma.go	Fri Mar 13 12:29:06 2020 +0100
+++ b/pkg/imports/dma.go	Mon Mar 16 12:40:58 2020 +0100
@@ -62,10 +62,7 @@
 func (dmaJobCreator) Create() Job { return new(DistanceMarksAshore) }
 
 func (dmaJobCreator) Depends() [2][]string {
-	return [2][]string{
-		{"distance_marks"},
-		{},
-	}
+	return [2][]string{{"distance_marks"}}
 }
 
 // StageDone is a NOP for distance marks imports.
--- a/pkg/imports/dmv.go	Fri Mar 13 12:29:06 2020 +0100
+++ b/pkg/imports/dmv.go	Mon Mar 16 12:40:58 2020 +0100
@@ -55,10 +55,7 @@
 func (dmvJobCreator) Create() Job { return new(DistanceMarksVirtual) }
 
 func (dmvJobCreator) Depends() [2][]string {
-	return [2][]string{
-		{"distance_marks_virtual"},
-		{},
-	}
+	return [2][]string{{"distance_marks_virtual"}}
 }
 
 // StageDone does nothing as there is no staging for distance marks virtual.
--- a/pkg/imports/dsec.go	Fri Mar 13 12:29:06 2020 +0100
+++ b/pkg/imports/dsec.go	Mon Mar 16 12:40:58 2020 +0100
@@ -40,10 +40,7 @@
 func (dsecJobCreator) Create() Job { return new(DeleteSection) }
 
 func (dsecJobCreator) Depends() [2][]string {
-	return [2][]string{
-		{"sections"},
-		{},
-	}
+	return [2][]string{{"sections"}}
 }
 
 const (
--- a/pkg/imports/dst.go	Fri Mar 13 12:29:06 2020 +0100
+++ b/pkg/imports/dst.go	Mon Mar 16 12:40:58 2020 +0100
@@ -39,10 +39,7 @@
 func (dstJobCreator) Create() Job { return new(DeleteStretch) }
 
 func (dstJobCreator) Depends() [2][]string {
-	return [2][]string{
-		{"stretches"},
-		{},
-	}
+	return [2][]string{{"stretches"}}
 }
 
 const (
--- a/pkg/imports/fm.go	Fri Mar 13 12:29:06 2020 +0100
+++ b/pkg/imports/fm.go	Mon Mar 16 12:40:58 2020 +0100
@@ -211,7 +211,7 @@
 	RegisterJobCreator(BCNLATHYDROJobKind,
 		&WFSFeatureJobCreator{
 			description: "fairway marks bcnlat (HYDRO)",
-			depends:     [2][]string{{"fairway_marks_bcnlat_hydro"}, {}},
+			depends:     [2][]string{{"fairway_marks_bcnlat_hydro"}},
 			newConsumer: newSQLConsumer(
 				prepareStmnts(
 					createInsertFMSQL("bcnlat_hydro",
@@ -226,7 +226,7 @@
 	RegisterJobCreator(BCNLATIENCJobKind,
 		&WFSFeatureJobCreator{
 			description: "fairway marks bcnlat (IENC)",
-			depends:     [2][]string{{"fairway_marks_bcnlat_ienc"}, {}},
+			depends:     [2][]string{{"fairway_marks_bcnlat_ienc"}},
 			newConsumer: newSQLConsumer(
 				prepareStmnts(
 					createInsertFMSQL("bcnlat_ienc",
@@ -242,7 +242,7 @@
 	RegisterJobCreator(BOYLATHYDROJobKind,
 		&WFSFeatureJobCreator{
 			description: "fairway marks boylat (HYDRO)",
-			depends:     [2][]string{{"fairway_marks_boylat_hydro"}, {}},
+			depends:     [2][]string{{"fairway_marks_boylat_hydro"}},
 			newConsumer: newSQLConsumer(
 				prepareStmnts(
 					createInsertFMSQL("boylat_hydro",
@@ -258,7 +258,7 @@
 	RegisterJobCreator(BOYLATIENCJobKind,
 		&WFSFeatureJobCreator{
 			description: "fairway marks boylat (IENC)",
-			depends:     [2][]string{{"fairway_marks_boylat_ienc"}, {}},
+			depends:     [2][]string{{"fairway_marks_boylat_ienc"}},
 			newConsumer: newSQLConsumer(
 				prepareStmnts(
 					createInsertFMSQL("boylat_ienc",
@@ -274,7 +274,7 @@
 	RegisterJobCreator(BOYCARJobKind,
 		&WFSFeatureJobCreator{
 			description: "fairway marks boycar",
-			depends:     [2][]string{{"fairway_marks_boycar"}, {}},
+			depends:     [2][]string{{"fairway_marks_boycar"}},
 			newConsumer: newSQLConsumer(
 				prepareStmnts(
 					createInsertFMSQL("boycar",
@@ -290,7 +290,7 @@
 	RegisterJobCreator(BOYSAWJobKind,
 		&WFSFeatureJobCreator{
 			description: "fairway marks boysaw",
-			depends:     [2][]string{{"fairway_marks_boysaw"}, {}},
+			depends:     [2][]string{{"fairway_marks_boysaw"}},
 			newConsumer: newSQLConsumer(
 				prepareStmnts(
 					createInsertFMSQL("boysaw",
@@ -305,7 +305,7 @@
 	RegisterJobCreator(BOYSPPJobKind,
 		&WFSFeatureJobCreator{
 			description: "fairway marks boyspp",
-			depends:     [2][]string{{"fairway_marks_boyspp"}, {}},
+			depends:     [2][]string{{"fairway_marks_boyspp"}},
 			newConsumer: newSQLConsumer(
 				prepareStmnts(
 					createInsertFMSQL("boyspp",
@@ -321,7 +321,7 @@
 	RegisterJobCreator(DAYMARHYDROJobKind,
 		&WFSFeatureJobCreator{
 			description: "fairway marks daymar (HYDRO)",
-			depends:     [2][]string{{"fairway_marks_daymar_hydro"}, {}},
+			depends:     [2][]string{{"fairway_marks_daymar_hydro"}},
 			newConsumer: newSQLConsumer(
 				prepareStmnts(
 					createInsertFMSQL("daymar_hydro",
@@ -336,7 +336,7 @@
 	RegisterJobCreator(DAYMARIENCJobKind,
 		&WFSFeatureJobCreator{
 			description: "fairway marks daymar (IENC)",
-			depends:     [2][]string{{"fairway_marks_daymar_ienc"}, {}},
+			depends:     [2][]string{{"fairway_marks_daymar_ienc"}},
 			newConsumer: newSQLConsumer(
 				prepareStmnts(
 					createInsertFMSQL("daymar_ienc",
@@ -352,7 +352,7 @@
 	RegisterJobCreator(LIGHTSJobKind,
 		&WFSFeatureJobCreator{
 			description: "fairway marks lights",
-			depends:     [2][]string{{"fairway_marks_lights"}, {}},
+			depends:     [2][]string{{"fairway_marks_lights"}},
 			newConsumer: newSQLConsumer(
 				prepareStmnts(
 					createInsertFMSQL("lights",
@@ -371,7 +371,7 @@
 	RegisterJobCreator(NOTMRKJobKind,
 		&WFSFeatureJobCreator{
 			description: "fairway marks notmrk",
-			depends:     [2][]string{{"fairway_marks_lights"}, {}},
+			depends:     [2][]string{{"fairway_marks_lights"}},
 			newConsumer: newSQLConsumer(
 				prepareStmnts(
 					createInsertFMSQL("notmrk",
@@ -390,7 +390,7 @@
 	RegisterJobCreator(RTPBCNJobKind,
 		&WFSFeatureJobCreator{
 			description: "fairway marks rtpbcn",
-			depends:     [2][]string{{"fairway_marks_rtpbcn"}, {}},
+			depends:     [2][]string{{"fairway_marks_rtpbcn"}},
 			newConsumer: newSQLConsumer(
 				prepareStmnts(
 					createInsertFMSQL("rtpbcn",
@@ -405,7 +405,7 @@
 	RegisterJobCreator(TOPMARJobKind,
 		&WFSFeatureJobCreator{
 			description: "fairway marks topmar",
-			depends:     [2][]string{{"fairway_marks_topmar"}, {}},
+			depends:     [2][]string{{"fairway_marks_topmar"}},
 			newConsumer: newSQLConsumer(
 				prepareStmnts(
 					createInsertFMSQL("topmar",
--- a/pkg/imports/modelconvert.go	Fri Mar 13 12:29:06 2020 +0100
+++ b/pkg/imports/modelconvert.go	Mon Mar 16 12:40:58 2020 +0100
@@ -21,7 +21,7 @@
 	BNJobKind:          func() interface{} { return new(models.BottleneckImport) },
 	GMJobKind:          func() interface{} { return new(models.GaugeMeasurementImport) },
 	FAJobKind:          func() interface{} { return new(models.FairwayAvailabilityImport) },
-	WXJobKind:          func() interface{} { return new(models.WaterwayAxisImport) },
+	WXJobKind:          func() interface{} { return FindJobCreator(WXJobKind).Create() },
 	WAJobKind:          func() interface{} { return new(models.WaterwayAreaImport) },
 	WGJobKind:          func() interface{} { return new(models.WaterwayGaugeImport) },
 	DMVJobKind:         func() interface{} { return new(models.DistanceMarksVirtualImport) },
@@ -80,17 +80,6 @@
 		}
 	},
 
-	WXJobKind: func(input interface{}) interface{} {
-		wxi := input.(*models.WaterwayAxisImport)
-		return &WaterwayAxis{
-			URL:         wxi.URL,
-			FeatureType: wxi.FeatureType,
-			SortBy:      nilString(wxi.SortBy),
-			User:        nilString(wxi.User),
-			Password:    nilString(wxi.Password),
-		}
-	},
-
 	WAJobKind: func(input interface{}) interface{} {
 		wai := input.(*models.WaterwayAreaImport)
 		return &WaterwayArea{
--- a/pkg/imports/stsh.go	Fri Mar 13 12:29:06 2020 +0100
+++ b/pkg/imports/stsh.go	Mon Mar 16 12:40:58 2020 +0100
@@ -50,10 +50,7 @@
 func (stshJobCreator) Create() Job { return new(StretchShape) }
 
 func (stshJobCreator) Depends() [2][]string {
-	return [2][]string{
-		{"stretches", "stretch_countries"},
-		{},
-	}
+	return [2][]string{{"stretches", "stretch_countries"}}
 }
 
 const (
--- a/pkg/imports/wa.go	Fri Mar 13 12:29:06 2020 +0100
+++ b/pkg/imports/wa.go	Mon Mar 16 12:40:58 2020 +0100
@@ -66,10 +66,7 @@
 func (waJobCreator) Create() Job { return new(WaterwayArea) }
 
 func (waJobCreator) Depends() [2][]string {
-	return [2][]string{
-		{"waterway_area"},
-		{},
-	}
+	return [2][]string{{"waterway_area"}}
 }
 
 // StageDone is a NOP for waterway area imports.
--- a/pkg/imports/wkb.go	Fri Mar 13 12:29:06 2020 +0100
+++ b/pkg/imports/wkb.go	Mon Mar 16 12:40:58 2020 +0100
@@ -35,6 +35,14 @@
 	return func() (string, interface{}) { return "Point", newProperties() }
 }
 
+func newMultiLineFeature(
+	newProperties func() interface{},
+) func() (string, interface{}) {
+	return func() (string, interface{}) {
+		return "MultiLineString", newProperties()
+	}
+}
+
 func (ls lineSlice) toWKB(buf *bytes.Buffer) {
 	binary.Write(buf, binary.LittleEndian, wkb.NDR)
 	binary.Write(buf, binary.LittleEndian, wkb.LineString)
--- a/pkg/imports/wx.go	Fri Mar 13 12:29:06 2020 +0100
+++ b/pkg/imports/wx.go	Mon Mar 16 12:40:58 2020 +0100
@@ -14,310 +14,88 @@
 
 package imports
 
-import (
-	"context"
-	"database/sql"
-	"encoding/json"
-	"errors"
-	"fmt"
-	"io"
-	"time"
-
-	"gemma.intevation.de/gemma/pkg/pgxutils"
-	"gemma.intevation.de/gemma/pkg/wfs"
-)
-
-// WaterwayAxis is an import job to import
-// the waterway axes in form of line string geometries
-// and attribute data from a WFS service.
-type WaterwayAxis struct {
-	// URL the GetCapabilities URL of the WFS service.
-	URL string `json:"url"`
-	// FeatureType selects the feature type of the WFS service.
-	FeatureType string `json:"feature-type"`
-	// SortBy works around misconfigured services to
-	// establish a sort order to get the features.
-	SortBy string `json:"sort-by"`
-	// User is an optional username for Basic Auth.
-	User string `json:"user,omitempty"`
-	// Password is an optional password for Basic Auth.
-	Password string `json:"password,omitempty"`
-}
-
-// Description gives a short info about relevant facts of this import.
-func (wx *WaterwayAxis) Description() (string, error) {
-	return wx.URL + "|" + wx.FeatureType, nil
-}
-
 // WXJobKind is the import queue type identifier.
 const WXJobKind JobKind = "wx"
 
-type wxJobCreator struct{}
-
 func init() {
-	RegisterJobCreator(WXJobKind, wxJobCreator{})
+	RegisterJobCreator(WXJobKind,
+		&WFSFeatureJobCreator{
+			description: "waterway axis",
+			depends:     [2][]string{{"waterway_axis"}},
+			newConsumer: newSQLConsumer(
+				prepareStmnts(insertWaterwayAxisSQL),
+				consume,
+				axisInvalidation,
+				newMultiLineFeature(func() interface{} {
+					return new(waterwayAxisProperties)
+				}),
+			),
+		})
 }
 
-func (wxJobCreator) Description() string { return "waterway axis" }
-
-func (wxJobCreator) AutoAccept() bool { return true }
-
-func (wxJobCreator) Create() Job { return new(WaterwayAxis) }
-
-func (wxJobCreator) Depends() [2][]string {
-	return [2][]string{
-		{"waterway_axis"},
-		{},
-	}
-}
-
-// StageDone is a NOP for waterway axis imports.
-func (wxJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
-	return nil
-}
-
-// CleanUp for waterway axis imports is a NOP.
-func (*WaterwayAxis) CleanUp() error { return nil }
-
 type waterwayAxisProperties struct {
 	ObjNam  string  `json:"hydro_objnam"`
 	NObjNnm *string `json:"hydro_nobjnm"`
 }
 
 const (
-	deleteWaterwayAxisSQL = `
-WITH resp AS (
-  SELECT users.current_user_area_utm() AS a
-)
-DELETE FROM waterway.waterway_axis
-WHERE pg_has_role('sys_admin', 'MEMBER')
-  OR ST_Covers((SELECT a FROM resp),
-    ST_Transform(wtwaxs::geometry, (SELECT ST_SRID(a) FROM resp)))
-`
-
 	insertWaterwayAxisSQL = `
 WITH resp AS (
   SELECT users.current_user_area_utm() AS a
-)
-INSERT INTO waterway.waterway_axis (wtwaxs, objnam, nobjnam)
-SELECT
-    ST_Multi(ST_Node(ST_CollectionExtract(ST_Transform(new_ax, 4326), 2))),
-    $3, $4
+),
+g AS (
+  SELECT
+    ST_Multi(ST_Node(ST_CollectionExtract(ST_Transform(new_ax, 4326), 2)))
+      AS new_ax
   FROM ST_GeomFromWKB($1, $2::integer) AS new_line (new_line),
     LATERAL (SELECT
       CASE WHEN pg_has_role('sys_admin', 'MEMBER')
+          OR ST_Covers((SELECT a FROM resp),
+            ST_Transform(new_line, (SELECT ST_SRID(a) FROM resp)))
         THEN new_line
-        ELSE ST_Intersection((SELECT a FROM resp),
+        ELSE ST_Intersection((SELECT ST_Buffer(a, -0.0001) FROM resp),
           ST_Node(ST_Transform(new_line, (SELECT ST_SRID(a) FROM resp))))
         END) AS new_ax (new_ax)
   -- Do nothing if intersection is empty:
   WHERE NOT ST_IsEmpty(new_ax)
+),
+t AS (
+  UPDATE waterway.waterway_axis SET last_found = current_timestamp
+  WHERE (SELECT new_ax FROM g) IS NOT NULL
+    AND validity @> current_timestamp
+    AND (
+        wtwaxs, objnam, nobjnam
+      ) IS NOT DISTINCT FROM (
+        (SELECT new_ax FROM g), $3, $4)
+  RETURNING 1
+)
+INSERT INTO waterway.waterway_axis (wtwaxs, objnam, nobjnam)
+SELECT new_ax, $3, $4
+  FROM g
+  WHERE NOT EXISTS(SELECT 1 FROM t)
 RETURNING id
 `
+	invalidateAxisSQL = `
+UPDATE waterway.waterway_axis
+  SET validity = tstzrange(lower(validity), current_timestamp)
+  WHERE validity @> current_timestamp
+    AND last_found < current_timestamp
+`
 )
 
-// Do executes the actual waterway axis import.
-func (wx *WaterwayAxis) Do(
-	ctx context.Context,
-	importID int64,
-	conn *sql.Conn,
-	feedback Feedback,
-) (interface{}, error) {
-
-	start := time.Now()
-
-	feedback.Info("Import waterway axis")
-
-	feedback.Info("Loading capabilities from %s", wx.URL)
-	caps, err := wfs.GetCapabilities(wx.URL)
-	if err != nil {
-		feedback.Error("Loading capabilities failed: %v", err)
-		return nil, err
-	}
-
-	ft := caps.FindFeatureType(wx.FeatureType)
-	if ft == nil {
-		return nil, fmt.Errorf("unknown feature type '%s'", wx.FeatureType)
-	}
-
-	feedback.Info("Found feature type '%s'", wx.FeatureType)
-
-	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
+func axisInvalidation(spc *SQLGeometryConsumer) error {
+	res, err := spc.tx.ExecContext(spc.ctx, invalidateAxisSQL)
 	if err != nil {
-		feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS)
-		return nil, err
-	}
-
-	if wx.SortBy != "" {
-		feedback.Info("Features will be sorted by '%s'", wx.SortBy)
-	}
-
-	dl, err := wfs.GetFeatures(caps, wx.FeatureType, wx.SortBy)
-	if err != nil {
-		feedback.Error("Cannot create GetFeature URLs. %v", err)
-		return nil, err
-	}
-
-	tx, err := conn.BeginTx(ctx, nil)
-	if err != nil {
-		return nil, err
-	}
-	defer tx.Rollback()
-
-	insertStmt, err := tx.PrepareContext(ctx, insertWaterwayAxisSQL)
-	if err != nil {
-		return nil, err
-	}
-	defer insertStmt.Close()
-
-	// Delete the old features.
-	if _, err := tx.ExecContext(ctx, deleteWaterwayAxisSQL); err != nil {
-		return nil, err
+		return err
 	}
-
-	var (
-		unsupported       = stringCounter{}
-		missingProperties int
-		badProperties     int
-		outside           int
-		features          int
-	)
-
-	if err := dl.Download(wx.User, wx.Password, func(url string, r io.Reader) error {
-		feedback.Info("Get features from: '%s'", url)
-		rfc, err := wfs.ParseRawFeatureCollection(r)
-		if err != nil {
-			return fmt.Errorf("parsing GetFeature document failed: %v", err)
-		}
-		if rfc.CRS != nil {
-			crsName := rfc.CRS.Properties.Name
-			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
-				feedback.Error("Unsupported CRS: %d", crsName)
-				return err
-			}
-		}
-
-		// No features -> ignore.
-		if rfc.Features == nil {
-			return nil
-		}
-
-		feedback.Info("Using EPSG: %d", epsg)
-
-		savepoint := Savepoint(ctx, tx, "feature")
-
-		for _, feature := range rfc.Features {
-			if feature.Properties == nil || feature.Geometry.Coordinates == nil {
-				missingProperties++
-				continue
-			}
-
-			var props waterwayAxisProperties
-
-			if err := json.Unmarshal(*feature.Properties, &props); err != nil {
-				badProperties++
-				continue
-			}
-
-			var nobjnam sql.NullString
-			if props.NObjNnm != nil {
-				nobjnam = sql.NullString{String: *props.NObjNnm, Valid: true}
-			}
-
-			var ls multiLineSlice
-			switch feature.Geometry.Type {
-			case "LineString":
-				var l lineSlice
-				if err := json.Unmarshal(*feature.Geometry.Coordinates, &l); err != nil {
-					return err
-				}
-				ls = append(ls, l)
-			case "MultiLineString":
-				if err := json.Unmarshal(*feature.Geometry.Coordinates, &ls); err != nil {
-					return err
-				}
-			default:
-				unsupported[feature.Geometry.Type]++
-				continue
-			}
-			if err := storeLinestring(
-				ctx,
-				savepoint,
-				feedback,
-				ls,
-				epsg,
-				props,
-				nobjnam,
-				&outside,
-				&features,
-				insertStmt); err != nil {
-				return err
-			}
-		}
-		return nil
-	}); err != nil {
-		return nil, err
+	old, err := res.RowsAffected()
+	if err != nil {
+		return err
 	}
-
-	if badProperties > 0 {
-		feedback.Warn("Bad properties: %d", badProperties)
-	}
-
-	if missingProperties > 0 {
-		feedback.Warn("Missing properties: %d", missingProperties)
-	}
-
-	if len(unsupported) != 0 {
-		feedback.Warn("Unsupported types found: %s", unsupported)
-	}
-
-	if outside > 0 {
-		feedback.Info("Features outside responsibility area: %d", outside)
-	}
-
-	if features == 0 {
-		return nil, errors.New("no features found")
-	}
-
-	if err = tx.Commit(); err == nil {
-		feedback.Info("Storing %d features took %s",
-			features, time.Since(start))
+	if old == 0 {
+		return ErrFeaturesUnmodified
 	}
-
-	return nil, err
-}
-
-func storeLinestring(
-	ctx context.Context,
-	savepoint func(func() error) error,
-	feedback Feedback,
-	l multiLineSlice,
-	epsg int,
-	props waterwayAxisProperties,
-	nobjnam sql.NullString,
-	outside, features *int,
-	insertStmt *sql.Stmt,
-) error {
-	var id int
-	err := savepoint(func() error {
-		err := insertStmt.QueryRowContext(
-			ctx,
-			l.asWKB(),
-			epsg,
-			props.ObjNam,
-			nobjnam,
-		).Scan(&id)
-		return err
-	})
-	switch {
-	case err == sql.ErrNoRows:
-		*outside++
-		// ignore -> filtered by responsibility_areas
-		return nil
-	case err != nil:
-		feedback.Error(pgxutils.ReadableError{Err: err}.Error())
-	default:
-		*features++
-	}
+	spc.feedback.Info(
+		"Number of features removed from data source: %d", old)
 	return nil
 }
--- a/schema/auth.sql	Fri Mar 13 12:29:06 2020 +0100
+++ b/schema/auth.sql	Mon Mar 16 12:40:58 2020 +0100
@@ -106,6 +106,14 @@
 END;
 $$;
 
+-- Tables without staging area
+CREATE POLICY hide_nothing ON waterway.waterway_axis
+    FOR SELECT TO waterway_user USING (true);
+CREATE POLICY sys_admin ON waterway.waterway_axis
+    FOR ALL TO sys_admin USING (true);
+ALTER TABLE waterway.waterway_axis ENABLE ROW LEVEL SECURITY;
+
+
 --
 -- RLS policies for templates
 --
@@ -151,18 +159,33 @@
 
 CREATE POLICY responsibility_area ON waterway.bottlenecks
     FOR ALL TO waterway_admin
-    USING (staging_done OR users.utm_covers(area))
-    WITH CHECK (users.utm_covers(area));
+    USING (staging_done
+        OR (SELECT ST_Covers(a,
+                ST_Transform(CAST(area AS geometry), ST_SRID(a)))
+            FROM users.current_user_area_utm() AS a (a)))
+    WITH CHECK ((SELECT ST_Covers(a,
+            ST_Transform(CAST(area AS geometry), ST_SRID(a)))
+        FROM users.current_user_area_utm() AS a (a)));
 
 CREATE POLICY responsibility_area ON waterway.sounding_results
     FOR ALL TO waterway_admin
-    USING (staging_done OR users.utm_covers(area))
-    WITH CHECK (users.utm_covers(area));
+    USING (staging_done
+        OR (SELECT ST_Covers(a,
+                ST_Transform(CAST(area AS geometry), ST_SRID(a)))
+            FROM users.current_user_area_utm() AS a (a)))
+    WITH CHECK ((SELECT ST_Covers(a,
+            ST_Transform(CAST(area AS geometry), ST_SRID(a)))
+        FROM users.current_user_area_utm() AS a (a)));
 
 CREATE POLICY responsibility_area ON waterway.fairway_dimensions
     FOR ALL TO waterway_admin
-    USING (staging_done OR users.utm_covers(area))
-    WITH CHECK (users.utm_covers(area));
+    USING (staging_done
+        OR (SELECT ST_Covers(a,
+                ST_Transform(CAST(area AS geometry), ST_SRID(a)))
+            FROM users.current_user_area_utm() AS a (a)))
+    WITH CHECK ((SELECT ST_Covers(a,
+            ST_Transform(CAST(area AS geometry), ST_SRID(a)))
+        FROM users.current_user_area_utm() AS a (a)));
 
 -- In the case of sections differentiating between read and write
 -- access is not neccessary: the country code based access check is
@@ -178,6 +201,22 @@
     USING (true);
 
 --
+-- Tables without staging area
+--
+-- Use three policies instead of one FOR ALL to avoid costly expressions
+-- being added in SELECT queries.
+CREATE POLICY responsibility_area_insert ON waterway.waterway_axis
+    FOR INSERT TO waterway_admin
+    WITH CHECK ((SELECT ST_Covers(a,
+            ST_Transform(CAST(wtwaxs AS geometry), ST_SRID(a)))
+        FROM users.current_user_area_utm() AS a (a)));
+CREATE POLICY responsibility_area_update ON waterway.waterway_axis
+    FOR UPDATE TO waterway_admin
+    USING ((SELECT ST_Covers(a,
+            ST_Transform(CAST(wtwaxs AS geometry), ST_SRID(a)))
+        FROM users.current_user_area_utm() AS a (a)));
+
+--
 -- RLS policies for imports and import config
 --
 
--- a/schema/default_sysconfig.sql	Fri Mar 13 12:29:06 2020 +0100
+++ b/schema/default_sysconfig.sql	Mon Mar 16 12:40:58 2020 +0100
@@ -111,7 +111,6 @@
 
 -- Directly accessed tables
 INSERT INTO sys_admin.published_services (schema, name) VALUES
-    ('waterway', 'waterway_axis'),
     ('waterway', 'waterway_area'),
     ('waterway', 'waterway_profiles'),
     ('waterway', 'fairway_dimensions');
@@ -305,6 +304,16 @@
     wmst_attribute, wmst_end_attribute,
     view_def
 ) VALUES
+    ('waterway', 'waterway_axis', 4326, 'id',
+        'valid_from', 'valid_to', $$
+        SELECT id,
+            lower(validity) AS valid_from,
+            COALESCE(upper(validity), current_timestamp) AS valid_to,
+            wtwaxs,
+            objnam,
+            nobjnam
+        FROM waterway.waterway_axis
+        $$),
     ('waterway', 'fairway_marks_bcnlat_hydro', 4326, 'id',
         'valid_from', 'valid_to', format(
             (SELECT def FROM base_views WHERE name = 'fairway_marks_tmpl'),
--- a/schema/gemma.sql	Fri Mar 13 12:29:06 2020 +0100
+++ b/schema/gemma.sql	Mon Mar 16 12:40:58 2020 +0100
@@ -624,11 +624,17 @@
             CHECK(ST_IsSimple(CAST(wtwaxs AS geometry))),
         -- TODO: Do we need to check data set quality (DRC 2.1.6)?
         objnam varchar NOT NULL,
-        nobjnam varchar
+        nobjnam varchar,
+        validity tstzrange NOT NULL DEFAULT tstzrange(current_timestamp, NULL)
+            CHECK (NOT isempty(validity)),
+        -- Last time an import job found this entry in a data source:
+        last_found timestamp with time zone NOT NULL DEFAULT current_timestamp
     )
     CREATE CONSTRAINT TRIGGER waterway_axis_wtwaxs_unique
-        AFTER INSERT OR UPDATE OF wtwaxs ON waterway_axis
-        FOR EACH ROW EXECUTE FUNCTION prevent_st_equals('wtwaxs')
+        AFTER INSERT OR UPDATE OF wtwaxs, validity ON waterway_axis
+        FOR EACH ROW EXECUTE FUNCTION prevent_st_equals('wtwaxs', 'validity')
+    CREATE INDEX waterway_axis_validity
+        ON waterway_axis USING GiST (validity)
 
     -- This table allows linkage between 1D ISRS location codes and 2D space
     -- e.g. for cutting bottleneck area out of waterway area based on virtual
--- a/schema/gemma_tests.sql	Fri Mar 13 12:29:06 2020 +0100
+++ b/schema/gemma_tests.sql	Mon Mar 16 12:40:58 2020 +0100
@@ -32,6 +32,19 @@
     23505, NULL,
     'No duplicate geometries can be inserted into waterway_axis');
 
+SELECT lives_ok($$
+    INSERT INTO waterway.waterway_axis (wtwaxs, objnam, validity) VALUES (
+        ST_GeogFromText('MULTILINESTRING((0 0, 1 1))'),
+        'test',
+        tstzrange(NULL, current_timestamp)
+    ), (
+        ST_GeogFromText('MULTILINESTRING((0 0, 1 1))'),
+        'test',
+        tstzrange(current_timestamp, NULL)
+    )
+    $$,
+    'Duplicate axis geometries can be inserted if validity differs');
+
 SELECT throws_ok($$
     INSERT INTO waterway.waterway_area (area) VALUES
         (ST_GeogFromText('POLYGON((0 0, 0 1, 1 1, 1 0, 0 0))')),
--- a/schema/manage_users.sql	Fri Mar 13 12:29:06 2020 +0100
+++ b/schema/manage_users.sql	Mon Mar 16 12:40:58 2020 +0100
@@ -60,15 +60,6 @@
     STABLE PARALLEL SAFE;
 
 
-CREATE OR REPLACE FUNCTION users.utm_covers(g geography) RETURNS boolean AS
-    $$
-        SELECT ST_Covers(a, ST_Transform(g::geometry, ST_SRID(a)))
-            FROM users.current_user_area_utm() AS a (a)
-    $$
-    LANGUAGE SQL
-    STABLE PARALLEL SAFE;
-
-
 CREATE OR REPLACE FUNCTION internal.create_user() RETURNS trigger
 AS $$
 BEGIN
--- a/schema/run_tests.sh	Fri Mar 13 12:29:06 2020 +0100
+++ b/schema/run_tests.sh	Mon Mar 16 12:40:58 2020 +0100
@@ -80,7 +80,7 @@
     -c 'SET client_min_messages TO WARNING' \
     -c "DROP ROLE IF EXISTS $TEST_ROLES" \
     -f "$BASEDIR"/tap_tests_data.sql \
-    -c "SELECT plan(81 + (
+    -c "SELECT plan(82 + (
             SELECT count(*)::int
                 FROM information_schema.tables
                 WHERE table_schema = 'waterway'))" \
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/schema/updates/1424/01.add_axis_rls.sql	Mon Mar 16 12:40:58 2020 +0100
@@ -0,0 +1,16 @@
+CREATE POLICY hide_nothing ON waterway.waterway_axis
+    FOR SELECT TO waterway_user USING (true);
+CREATE POLICY sys_admin ON waterway.waterway_axis
+    FOR ALL TO sys_admin USING (true);
+
+CREATE POLICY responsibility_area_insert ON waterway.waterway_axis
+    FOR INSERT TO waterway_admin
+    WITH CHECK (users.utm_covers(wtwaxs));
+CREATE POLICY responsibility_area_update ON waterway.waterway_axis
+    FOR UPDATE TO waterway_admin
+    USING (users.utm_covers(wtwaxs));
+CREATE POLICY responsibility_area_delete ON waterway.waterway_axis
+    FOR DELETE TO waterway_admin
+    USING (users.utm_covers(wtwaxs));
+
+ALTER TABLE waterway.waterway_axis ENABLE ROW LEVEL SECURITY;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/schema/updates/1425/01.inline_utm_covers.sql	Mon Mar 16 12:40:58 2020 +0100
@@ -0,0 +1,43 @@
+ALTER POLICY responsibility_area ON waterway.bottlenecks
+    USING (staging_done
+        OR (SELECT ST_Covers(a,
+                ST_Transform(CAST(area AS geometry), ST_SRID(a)))
+            FROM users.current_user_area_utm() AS a (a)))
+    WITH CHECK ((SELECT ST_Covers(a,
+            ST_Transform(CAST(area AS geometry), ST_SRID(a)))
+        FROM users.current_user_area_utm() AS a (a)));
+
+ALTER POLICY responsibility_area ON waterway.sounding_results
+    USING (staging_done
+        OR (SELECT ST_Covers(a,
+                ST_Transform(CAST(area AS geometry), ST_SRID(a)))
+            FROM users.current_user_area_utm() AS a (a)))
+    WITH CHECK ((SELECT ST_Covers(a,
+            ST_Transform(CAST(area AS geometry), ST_SRID(a)))
+        FROM users.current_user_area_utm() AS a (a)));
+
+ALTER POLICY responsibility_area ON waterway.fairway_dimensions
+    USING (staging_done
+        OR (SELECT ST_Covers(a,
+                ST_Transform(CAST(area AS geometry), ST_SRID(a)))
+            FROM users.current_user_area_utm() AS a (a)))
+    WITH CHECK ((SELECT ST_Covers(a,
+            ST_Transform(CAST(area AS geometry), ST_SRID(a)))
+        FROM users.current_user_area_utm() AS a (a)));
+
+ALTER POLICY responsibility_area_insert ON waterway.waterway_axis
+    WITH CHECK ((SELECT ST_Covers(a,
+            ST_Transform(CAST(wtwaxs AS geometry), ST_SRID(a)))
+        FROM users.current_user_area_utm() AS a (a)));
+
+ALTER POLICY responsibility_area_update ON waterway.waterway_axis
+    USING ((SELECT ST_Covers(a,
+            ST_Transform(CAST(wtwaxs AS geometry), ST_SRID(a)))
+        FROM users.current_user_area_utm() AS a (a)));
+
+ALTER POLICY responsibility_area_delete ON waterway.waterway_axis
+    USING ((SELECT ST_Covers(a,
+            ST_Transform(CAST(wtwaxs AS geometry), ST_SRID(a)))
+        FROM users.current_user_area_utm() AS a (a)));
+
+DROP FUNCTION users.utm_covers;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/schema/updates/1426/01.historicise_axis.sql	Mon Mar 16 12:40:58 2020 +0100
@@ -0,0 +1,20 @@
+ALTER TABLE waterway.waterway_axis
+    ADD validity tstzrange NOT NULL DEFAULT tstzrange(current_timestamp, NULL)
+        CHECK (NOT isempty(validity)),
+    ADD last_found timestamp with time zone NOT NULL DEFAULT current_timestamp;
+
+-- Assume existing entries have been valid since last import
+UPDATE waterway.waterway_axis SET validity = tstzrange(
+    (SELECT max(changed) FROM import.imports WHERE kind = 'wx'),
+    NULL);
+
+DROP TRIGGER waterway_axis_wtwaxs_unique ON waterway.waterway_axis;
+CREATE CONSTRAINT TRIGGER waterway_axis_wtwaxs_unique
+    AFTER INSERT OR UPDATE OF wtwaxs, validity ON waterway.waterway_axis
+    FOR EACH ROW EXECUTE FUNCTION prevent_st_equals('wtwaxs', 'validity');
+
+CREATE INDEX waterway_axis_validity
+    ON waterway.waterway_axis USING GiST (validity);
+
+-- No more need to delete
+DROP POLICY responsibility_area_delete ON waterway.waterway_axis;
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/schema/updates/1426/02.configure_wms-t.sql	Mon Mar 16 12:40:58 2020 +0100
@@ -0,0 +1,16 @@
+UPDATE sys_admin.published_services
+    SET
+        srid = 4326,
+        key_column = 'id',
+        wmst_attribute = 'valid_from',
+        wmst_end_attribute = 'valid_to',
+        view_def = $$
+            SELECT id,
+                lower(validity) AS valid_from,
+                COALESCE(upper(validity), current_timestamp) AS valid_to,
+                wtwaxs,
+                objnam,
+                nobjnam
+            FROM waterway.waterway_axis
+            $$
+    WHERE schema = 'waterway' AND name = 'waterway_axis';
--- a/schema/version.sql	Fri Mar 13 12:29:06 2020 +0100
+++ b/schema/version.sql	Mon Mar 16 12:40:58 2020 +0100
@@ -1,1 +1,1 @@
-INSERT INTO gemma_schema_version(version) VALUES (1423);
+INSERT INTO gemma_schema_version(version) VALUES (1426);