diff pkg/imports/wx.go @ 5016:cf25b23e3eec

Keep historic data of waterway axis ... and accordingly configure the respective layer as WMS-T.
author Tom Gottfried <tom@intevation.de>
date Fri, 13 Mar 2020 17:34:59 +0100
parents 7dff1015283d
children 557afcd9a131
line wrap: on
line diff
--- a/pkg/imports/wx.go	Fri Mar 13 14:13:32 2020 +0100
+++ b/pkg/imports/wx.go	Fri Mar 13 17:34:59 2020 +0100
@@ -14,89 +14,39 @@
 
 package imports
 
-import (
-	"context"
-	"database/sql"
-	"encoding/json"
-	"errors"
-	"fmt"
-	"io"
-	"time"
-
-	"gemma.intevation.de/gemma/pkg/pgxutils"
-	"gemma.intevation.de/gemma/pkg/wfs"
-)
-
-// WaterwayAxis is an import job to import
-// the waterway axes in form of line string geometries
-// and attribute data from a WFS service.
-type WaterwayAxis struct {
-	// URL the GetCapabilities URL of the WFS service.
-	URL string `json:"url"`
-	// FeatureType selects the feature type of the WFS service.
-	FeatureType string `json:"feature-type"`
-	// SortBy works around misconfigured services to
-	// establish a sort order to get the features.
-	SortBy string `json:"sort-by"`
-	// User is an optional username for Basic Auth.
-	User string `json:"user,omitempty"`
-	// Password is an optional password for Basic Auth.
-	Password string `json:"password,omitempty"`
-}
-
-// Description gives a short info about relevant facts of this import.
-func (wx *WaterwayAxis) Description() (string, error) {
-	return wx.URL + "|" + wx.FeatureType, nil
-}
-
 // WXJobKind is the import queue type identifier.
 const WXJobKind JobKind = "wx"
 
-type wxJobCreator struct{}
-
 func init() {
-	RegisterJobCreator(WXJobKind, wxJobCreator{})
+	RegisterJobCreator(WXJobKind,
+		&WFSFeatureJobCreator{
+			description: "waterway axis",
+			depends:     [2][]string{{"waterway_axis"}, {}},
+			newConsumer: newSQLConsumer(
+				prepareStmnts(insertWaterwayAxisSQL),
+				consume,
+				createAxisInvalidation(),
+				newMultiLineFeature(func() interface{} {
+					return new(waterwayAxisProperties)
+				}),
+			),
+		})
 }
 
-func (wxJobCreator) Description() string { return "waterway axis" }
-
-func (wxJobCreator) AutoAccept() bool { return true }
-
-func (wxJobCreator) Create() Job { return new(WaterwayAxis) }
-
-func (wxJobCreator) Depends() [2][]string {
-	return [2][]string{
-		{"waterway_axis"},
-		{},
-	}
-}
-
-// StageDone is a NOP for waterway axis imports.
-func (wxJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
-	return nil
-}
-
-// CleanUp for waterway axis imports is a NOP.
-func (*WaterwayAxis) CleanUp() error { return nil }
-
 type waterwayAxisProperties struct {
 	ObjNam  string  `json:"hydro_objnam"`
 	NObjNnm *string `json:"hydro_nobjnm"`
 }
 
 const (
-	deleteWaterwayAxisSQL = `
-DELETE FROM waterway.waterway_axis
-`
-
 	insertWaterwayAxisSQL = `
 WITH resp AS (
   SELECT users.current_user_area_utm() AS a
-)
-INSERT INTO waterway.waterway_axis (wtwaxs, objnam, nobjnam)
-SELECT
-    ST_Multi(ST_Node(ST_CollectionExtract(ST_Transform(new_ax, 4326), 2))),
-    $3, $4
+),
+g AS (
+  SELECT
+    ST_Multi(ST_Node(ST_CollectionExtract(ST_Transform(new_ax, 4326), 2)))
+      AS new_ax
   FROM ST_GeomFromWKB($1, $2::integer) AS new_line (new_line),
     LATERAL (SELECT
       CASE WHEN pg_has_role('sys_admin', 'MEMBER')
@@ -106,212 +56,46 @@
         END) AS new_ax (new_ax)
   -- Do nothing if intersection is empty:
   WHERE NOT ST_IsEmpty(new_ax)
+),
+t AS (
+  UPDATE waterway.waterway_axis SET last_found = current_timestamp
+  WHERE (SELECT new_ax FROM g) IS NOT NULL
+    AND validity @> current_timestamp
+    AND (
+        wtwaxs, objnam, nobjnam
+      ) IS NOT DISTINCT FROM (
+        (SELECT new_ax FROM g), $3, $4)
+  RETURNING 1
+)
+INSERT INTO waterway.waterway_axis (wtwaxs, objnam, nobjnam)
+SELECT new_ax, $3, $4
+  FROM g
+  WHERE NOT EXISTS(SELECT 1 FROM t)
 RETURNING id
 `
+	invalidateAxisSQL = `
+UPDATE waterway.waterway_axis
+  SET validity = tstzrange(lower(validity), current_timestamp)
+  WHERE validity @> current_timestamp
+    AND last_found < current_timestamp
+`
 )
 
-// Do executes the actual waterway axis import.
-func (wx *WaterwayAxis) Do(
-	ctx context.Context,
-	importID int64,
-	conn *sql.Conn,
-	feedback Feedback,
-) (interface{}, error) {
-
-	start := time.Now()
-
-	feedback.Info("Import waterway axis")
-
-	feedback.Info("Loading capabilities from %s", wx.URL)
-	caps, err := wfs.GetCapabilities(wx.URL)
-	if err != nil {
-		feedback.Error("Loading capabilities failed: %v", err)
-		return nil, err
-	}
-
-	ft := caps.FindFeatureType(wx.FeatureType)
-	if ft == nil {
-		return nil, fmt.Errorf("unknown feature type '%s'", wx.FeatureType)
-	}
-
-	feedback.Info("Found feature type '%s'", wx.FeatureType)
-
-	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
-	if err != nil {
-		feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS)
-		return nil, err
-	}
-
-	if wx.SortBy != "" {
-		feedback.Info("Features will be sorted by '%s'", wx.SortBy)
-	}
-
-	dl, err := wfs.GetFeatures(caps, wx.FeatureType, wx.SortBy)
-	if err != nil {
-		feedback.Error("Cannot create GetFeature URLs. %v", err)
-		return nil, err
-	}
-
-	tx, err := conn.BeginTx(ctx, nil)
-	if err != nil {
-		return nil, err
-	}
-	defer tx.Rollback()
-
-	insertStmt, err := tx.PrepareContext(ctx, insertWaterwayAxisSQL)
-	if err != nil {
-		return nil, err
-	}
-	defer insertStmt.Close()
-
-	// Delete the old features.
-	if _, err := tx.ExecContext(ctx, deleteWaterwayAxisSQL); err != nil {
-		return nil, err
-	}
-
-	var (
-		unsupported       = stringCounter{}
-		missingProperties int
-		badProperties     int
-		outside           int
-		features          int
-	)
-
-	if err := dl.Download(wx.User, wx.Password, func(url string, r io.Reader) error {
-		feedback.Info("Get features from: '%s'", url)
-		rfc, err := wfs.ParseRawFeatureCollection(r)
+func createAxisInvalidation() func(*SQLGeometryConsumer) error {
+	return func(spc *SQLGeometryConsumer) error {
+		res, err := spc.tx.ExecContext(spc.ctx, invalidateAxisSQL)
 		if err != nil {
-			return fmt.Errorf("parsing GetFeature document failed: %v", err)
-		}
-		if rfc.CRS != nil {
-			crsName := rfc.CRS.Properties.Name
-			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
-				feedback.Error("Unsupported CRS: %d", crsName)
-				return err
-			}
-		}
-
-		// No features -> ignore.
-		if rfc.Features == nil {
-			return nil
+			return err
 		}
-
-		feedback.Info("Using EPSG: %d", epsg)
-
-		savepoint := Savepoint(ctx, tx, "feature")
-
-		for _, feature := range rfc.Features {
-			if feature.Properties == nil || feature.Geometry.Coordinates == nil {
-				missingProperties++
-				continue
-			}
-
-			var props waterwayAxisProperties
-
-			if err := json.Unmarshal(*feature.Properties, &props); err != nil {
-				badProperties++
-				continue
-			}
-
-			var nobjnam sql.NullString
-			if props.NObjNnm != nil {
-				nobjnam = sql.NullString{String: *props.NObjNnm, Valid: true}
-			}
-
-			var ls multiLineSlice
-			switch feature.Geometry.Type {
-			case "LineString":
-				var l lineSlice
-				if err := json.Unmarshal(*feature.Geometry.Coordinates, &l); err != nil {
-					return err
-				}
-				ls = append(ls, l)
-			case "MultiLineString":
-				if err := json.Unmarshal(*feature.Geometry.Coordinates, &ls); err != nil {
-					return err
-				}
-			default:
-				unsupported[feature.Geometry.Type]++
-				continue
-			}
-			if err := storeLinestring(
-				ctx,
-				savepoint,
-				feedback,
-				ls,
-				epsg,
-				props,
-				nobjnam,
-				&outside,
-				&features,
-				insertStmt); err != nil {
-				return err
-			}
+		old, err := res.RowsAffected()
+		if err != nil {
+			return err
 		}
+		if old == 0 {
+			return ErrFeaturesUnmodified
+		}
+		spc.feedback.Info(
+			"Number of features removed from data source: %d", old)
 		return nil
-	}); err != nil {
-		return nil, err
 	}
-
-	if badProperties > 0 {
-		feedback.Warn("Bad properties: %d", badProperties)
-	}
-
-	if missingProperties > 0 {
-		feedback.Warn("Missing properties: %d", missingProperties)
-	}
-
-	if len(unsupported) != 0 {
-		feedback.Warn("Unsupported types found: %s", unsupported)
-	}
-
-	if outside > 0 {
-		feedback.Info("Features outside responsibility area: %d", outside)
-	}
-
-	if features == 0 {
-		return nil, errors.New("no features found")
-	}
-
-	if err = tx.Commit(); err == nil {
-		feedback.Info("Storing %d features took %s",
-			features, time.Since(start))
-	}
-
-	return nil, err
 }
-
-func storeLinestring(
-	ctx context.Context,
-	savepoint func(func() error) error,
-	feedback Feedback,
-	l multiLineSlice,
-	epsg int,
-	props waterwayAxisProperties,
-	nobjnam sql.NullString,
-	outside, features *int,
-	insertStmt *sql.Stmt,
-) error {
-	var id int
-	err := savepoint(func() error {
-		err := insertStmt.QueryRowContext(
-			ctx,
-			l.asWKB(),
-			epsg,
-			props.ObjNam,
-			nobjnam,
-		).Scan(&id)
-		return err
-	})
-	switch {
-	case err == sql.ErrNoRows:
-		*outside++
-		// ignore -> filtered by responsibility_areas
-		return nil
-	case err != nil:
-		feedback.Error(pgxutils.ReadableError{Err: err}.Error())
-	default:
-		*features++
-	}
-	return nil
-}