Mercurial > gemma
diff pkg/imports/wx.go @ 5016:cf25b23e3eec
Keep historic data of waterway axis
... and accordingly configure the respective layer as WMS-T.
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Fri, 13 Mar 2020 17:34:59 +0100 |
parents | 7dff1015283d |
children | 557afcd9a131 |
line wrap: on
line diff
--- a/pkg/imports/wx.go Fri Mar 13 14:13:32 2020 +0100 +++ b/pkg/imports/wx.go Fri Mar 13 17:34:59 2020 +0100 @@ -14,89 +14,39 @@ package imports -import ( - "context" - "database/sql" - "encoding/json" - "errors" - "fmt" - "io" - "time" - - "gemma.intevation.de/gemma/pkg/pgxutils" - "gemma.intevation.de/gemma/pkg/wfs" -) - -// WaterwayAxis is an import job to import -// the waterway axes in form of line string geometries -// and attribute data from a WFS service. -type WaterwayAxis struct { - // URL the GetCapabilities URL of the WFS service. - URL string `json:"url"` - // FeatureType selects the feature type of the WFS service. - FeatureType string `json:"feature-type"` - // SortBy works around misconfigured services to - // establish a sort order to get the features. - SortBy string `json:"sort-by"` - // User is an optional username for Basic Auth. - User string `json:"user,omitempty"` - // Password is an optional password for Basic Auth. - Password string `json:"password,omitempty"` -} - -// Description gives a short info about relevant facts of this import. -func (wx *WaterwayAxis) Description() (string, error) { - return wx.URL + "|" + wx.FeatureType, nil -} - // WXJobKind is the import queue type identifier. const WXJobKind JobKind = "wx" -type wxJobCreator struct{} - func init() { - RegisterJobCreator(WXJobKind, wxJobCreator{}) + RegisterJobCreator(WXJobKind, + &WFSFeatureJobCreator{ + description: "waterway axis", + depends: [2][]string{{"waterway_axis"}, {}}, + newConsumer: newSQLConsumer( + prepareStmnts(insertWaterwayAxisSQL), + consume, + createAxisInvalidation(), + newMultiLineFeature(func() interface{} { + return new(waterwayAxisProperties) + }), + ), + }) } -func (wxJobCreator) Description() string { return "waterway axis" } - -func (wxJobCreator) AutoAccept() bool { return true } - -func (wxJobCreator) Create() Job { return new(WaterwayAxis) } - -func (wxJobCreator) Depends() [2][]string { - return [2][]string{ - {"waterway_axis"}, - {}, - } -} - -// StageDone is a NOP for waterway axis imports. -func (wxJobCreator) StageDone(context.Context, *sql.Tx, int64) error { - return nil -} - -// CleanUp for waterway axis imports is a NOP. -func (*WaterwayAxis) CleanUp() error { return nil } - type waterwayAxisProperties struct { ObjNam string `json:"hydro_objnam"` NObjNnm *string `json:"hydro_nobjnm"` } const ( - deleteWaterwayAxisSQL = ` -DELETE FROM waterway.waterway_axis -` - insertWaterwayAxisSQL = ` WITH resp AS ( SELECT users.current_user_area_utm() AS a -) -INSERT INTO waterway.waterway_axis (wtwaxs, objnam, nobjnam) -SELECT - ST_Multi(ST_Node(ST_CollectionExtract(ST_Transform(new_ax, 4326), 2))), - $3, $4 +), +g AS ( + SELECT + ST_Multi(ST_Node(ST_CollectionExtract(ST_Transform(new_ax, 4326), 2))) + AS new_ax FROM ST_GeomFromWKB($1, $2::integer) AS new_line (new_line), LATERAL (SELECT CASE WHEN pg_has_role('sys_admin', 'MEMBER') @@ -106,212 +56,46 @@ END) AS new_ax (new_ax) -- Do nothing if intersection is empty: WHERE NOT ST_IsEmpty(new_ax) +), +t AS ( + UPDATE waterway.waterway_axis SET last_found = current_timestamp + WHERE (SELECT new_ax FROM g) IS NOT NULL + AND validity @> current_timestamp + AND ( + wtwaxs, objnam, nobjnam + ) IS NOT DISTINCT FROM ( + (SELECT new_ax FROM g), $3, $4) + RETURNING 1 +) +INSERT INTO waterway.waterway_axis (wtwaxs, objnam, nobjnam) +SELECT new_ax, $3, $4 + FROM g + WHERE NOT EXISTS(SELECT 1 FROM t) RETURNING id ` + invalidateAxisSQL = ` +UPDATE waterway.waterway_axis + SET validity = tstzrange(lower(validity), current_timestamp) + WHERE validity @> current_timestamp + AND last_found < current_timestamp +` ) -// Do executes the actual waterway axis import. -func (wx *WaterwayAxis) Do( - ctx context.Context, - importID int64, - conn *sql.Conn, - feedback Feedback, -) (interface{}, error) { - - start := time.Now() - - feedback.Info("Import waterway axis") - - feedback.Info("Loading capabilities from %s", wx.URL) - caps, err := wfs.GetCapabilities(wx.URL) - if err != nil { - feedback.Error("Loading capabilities failed: %v", err) - return nil, err - } - - ft := caps.FindFeatureType(wx.FeatureType) - if ft == nil { - return nil, fmt.Errorf("unknown feature type '%s'", wx.FeatureType) - } - - feedback.Info("Found feature type '%s'", wx.FeatureType) - - epsg, err := wfs.CRSToEPSG(ft.DefaultCRS) - if err != nil { - feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS) - return nil, err - } - - if wx.SortBy != "" { - feedback.Info("Features will be sorted by '%s'", wx.SortBy) - } - - dl, err := wfs.GetFeatures(caps, wx.FeatureType, wx.SortBy) - if err != nil { - feedback.Error("Cannot create GetFeature URLs. %v", err) - return nil, err - } - - tx, err := conn.BeginTx(ctx, nil) - if err != nil { - return nil, err - } - defer tx.Rollback() - - insertStmt, err := tx.PrepareContext(ctx, insertWaterwayAxisSQL) - if err != nil { - return nil, err - } - defer insertStmt.Close() - - // Delete the old features. - if _, err := tx.ExecContext(ctx, deleteWaterwayAxisSQL); err != nil { - return nil, err - } - - var ( - unsupported = stringCounter{} - missingProperties int - badProperties int - outside int - features int - ) - - if err := dl.Download(wx.User, wx.Password, func(url string, r io.Reader) error { - feedback.Info("Get features from: '%s'", url) - rfc, err := wfs.ParseRawFeatureCollection(r) +func createAxisInvalidation() func(*SQLGeometryConsumer) error { + return func(spc *SQLGeometryConsumer) error { + res, err := spc.tx.ExecContext(spc.ctx, invalidateAxisSQL) if err != nil { - return fmt.Errorf("parsing GetFeature document failed: %v", err) - } - if rfc.CRS != nil { - crsName := rfc.CRS.Properties.Name - if epsg, err = wfs.CRSToEPSG(crsName); err != nil { - feedback.Error("Unsupported CRS: %d", crsName) - return err - } - } - - // No features -> ignore. - if rfc.Features == nil { - return nil + return err } - - feedback.Info("Using EPSG: %d", epsg) - - savepoint := Savepoint(ctx, tx, "feature") - - for _, feature := range rfc.Features { - if feature.Properties == nil || feature.Geometry.Coordinates == nil { - missingProperties++ - continue - } - - var props waterwayAxisProperties - - if err := json.Unmarshal(*feature.Properties, &props); err != nil { - badProperties++ - continue - } - - var nobjnam sql.NullString - if props.NObjNnm != nil { - nobjnam = sql.NullString{String: *props.NObjNnm, Valid: true} - } - - var ls multiLineSlice - switch feature.Geometry.Type { - case "LineString": - var l lineSlice - if err := json.Unmarshal(*feature.Geometry.Coordinates, &l); err != nil { - return err - } - ls = append(ls, l) - case "MultiLineString": - if err := json.Unmarshal(*feature.Geometry.Coordinates, &ls); err != nil { - return err - } - default: - unsupported[feature.Geometry.Type]++ - continue - } - if err := storeLinestring( - ctx, - savepoint, - feedback, - ls, - epsg, - props, - nobjnam, - &outside, - &features, - insertStmt); err != nil { - return err - } + old, err := res.RowsAffected() + if err != nil { + return err } + if old == 0 { + return ErrFeaturesUnmodified + } + spc.feedback.Info( + "Number of features removed from data source: %d", old) return nil - }); err != nil { - return nil, err } - - if badProperties > 0 { - feedback.Warn("Bad properties: %d", badProperties) - } - - if missingProperties > 0 { - feedback.Warn("Missing properties: %d", missingProperties) - } - - if len(unsupported) != 0 { - feedback.Warn("Unsupported types found: %s", unsupported) - } - - if outside > 0 { - feedback.Info("Features outside responsibility area: %d", outside) - } - - if features == 0 { - return nil, errors.New("no features found") - } - - if err = tx.Commit(); err == nil { - feedback.Info("Storing %d features took %s", - features, time.Since(start)) - } - - return nil, err } - -func storeLinestring( - ctx context.Context, - savepoint func(func() error) error, - feedback Feedback, - l multiLineSlice, - epsg int, - props waterwayAxisProperties, - nobjnam sql.NullString, - outside, features *int, - insertStmt *sql.Stmt, -) error { - var id int - err := savepoint(func() error { - err := insertStmt.QueryRowContext( - ctx, - l.asWKB(), - epsg, - props.ObjNam, - nobjnam, - ).Scan(&id) - return err - }) - switch { - case err == sql.ErrNoRows: - *outside++ - // ignore -> filtered by responsibility_areas - return nil - case err != nil: - feedback.Error(pgxutils.ReadableError{Err: err}.Error()) - default: - *features++ - } - return nil -}