Mercurial > gemma
view pkg/imports/fd.go @ 5417:2d294ad81241 marking-single-beam
Dont loose Z values in markings.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 07 Jul 2021 18:14:04 +0200 |
parents | 3bab0e19f08b |
children | f2204f91d286 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018, 2019, 2020 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Raimund Renkert <raimund.renkert@intevation.de> package imports import ( "context" "database/sql" "encoding/json" "fmt" "io" "strings" "time" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/pgxutils" "gemma.intevation.de/gemma/pkg/wfs" ) // FairwayDimension is an import job to import // the fairway dimensions in form of polygon geometries // and attribute data from a WFS service. type FairwayDimension struct { // URL the GetCapabilities URL of the WFS service. URL string `json:"url"` // FeatureType selects the feature type of the WFS service. FeatureType string `json:"feature-type"` SortBy string `json:"sort-by"` LOS int `json:"los"` MinWidth int `json:"min-width"` MaxWidth int `json:"max-width"` Depth int `json:"depth"` SourceOrganization string `json:"source-organization"` // User is an optional username for Basic Auth. User string `json:"user,omitempty"` // Password is an optional password for Basic Auth. Password string `json:"password,omitempty"` } // Description gives a short info about relevant facts of this import. func (fd *FairwayDimension) Description() (string, error) { return strings.Join([]string{ fd.URL, fd.FeatureType, fmt.Sprintf("LOS%d", fd.LOS), }, "|"), nil } type fdTime struct{ time.Time } var guessFDTime = common.TimeParser([]string{ "20060102", "2006", "", }).Parse func (fdt *fdTime) UnmarshalJSON(data []byte) error { var s string if err := json.Unmarshal(data, &s); err != nil { return err } t, err := guessFDTime(s) if err == nil { *fdt = fdTime{t} } return err } // FDJobKind is the import queue type identifier. const FDJobKind JobKind = "fd" type fdJobCreator struct{} func init() { RegisterJobCreator(FDJobKind, fdJobCreator{}) } func (fdJobCreator) Description() string { return "fairway dimension" } func (fdJobCreator) AutoAccept() bool { return false } func (fdJobCreator) Create() Job { return new(FairwayDimension) } func (fdJobCreator) Depends() [2][]string { return [2][]string{ {"fairway_dimensions"}, {"level_of_service"}, } } // StageDone replaces fairway dimensions with those in the staging area func (fdJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, _ Feedback, ) error { // Delete the old features. if _, err := tx.ExecContext(ctx, deleteFairwayDimensionSQL, id); err != nil { return err } _, err := tx.ExecContext(ctx, fdStageDoneSQL, id) return err } // CleanUp for fairway dimension imports is a NOP. func (*FairwayDimension) CleanUp() error { return nil } type fairwayDimensionProperties struct { HydroSorDat *fdTime `json:"hydro_sordat"` } type fdSummary struct { Lat float64 `json:"lat"` Lon float64 `json:"lon"` ID int64 `json:"id"` } const ( fdStageDoneSQL = ` UPDATE waterway.fairway_dimensions SET staging_done = true WHERE id IN ( SELECT key FROM import.track_imports WHERE import_id = $1 AND relation = 'waterway.fairway_dimensions'::regclass)` deleteFairwayDimensionSQL = ` -- Delete entries to be replaced by those in staging area DELETE FROM waterway.fairway_dimensions WHERE id IN ( SELECT key FROM import.track_imports WHERE import_id = $1 AND relation = 'waterway.fairway_dimensions'::regclass AND deletion) ` // Temporary table to collect IDs of unchanged entries tmpTableSQL = ` CREATE TEMP TABLE unchanged (id int PRIMARY KEY) ON COMMIT DROP ` // The ST_MakeValid and ST_Buffer below are a workarround to // avoid errors due to reprojection. insertFairwayDimensionSQL = ` WITH resp AS ( SELECT users.current_user_area_utm() AS a ), g AS ( SELECT ST_Multi(ST_CollectionExtract(ST_MakeValid(ST_Transform( CASE WHEN pg_has_role('sys_admin', 'MEMBER') OR ST_Covers((SELECT a FROM resp), ST_Transform(new_fd, (SELECT ST_SRID(a) FROM resp))) THEN new_fd ELSE ST_Intersection( (SELECT ST_Buffer(a, -0.0001) FROM resp), ST_MakeValid(ST_Transform(new_fd, (SELECT ST_SRID(a) FROM resp)))) END, 4326)), 3)) AS new_fd FROM ST_GeomFromWKB($1, $2::integer) AS new_fd (new_fd) WHERE pg_has_role('sys_admin', 'MEMBER') OR ST_Intersects((SELECT a FROM resp), ST_MakeValid(ST_Transform(new_fd, (SELECT ST_SRID(a) FROM resp)))) ), not_new AS ( -- Collect IDs of unchanged entries in temp table INSERT INTO unchanged SELECT id FROM g, waterway.fairway_dimensions WHERE staging_done AND validity @> current_timestamp AND (area, level_of_service, min_width, max_width, min_depth, source_organization ) IS NOT DISTINCT FROM ( new_fd, $3, $4, $5, $6, $8) -- Return something if a duplicate in the data source is encountered ON CONFLICT (id) DO UPDATE SET id = EXCLUDED.id RETURNING 1 ) INSERT INTO waterway.fairway_dimensions ( area, level_of_service, min_width, max_width, min_depth, date_info, source_organization) SELECT new_fd, $3, $4, $5, $6, $7, $8 FROM g WHERE NOT EXISTS(SELECT 1 FROM not_new) RETURNING id, ST_X(ST_Centroid(area::geometry)), ST_Y(ST_Centroid(area::geometry)) ` // Fetch IDs of entries removed from data source selectOldSQL = ` WITH resp AS ( SELECT users.current_user_area_utm() AS a ) SELECT id FROM waterway.fairway_dimensions WHERE staging_done AND validity @> current_timestamp AND level_of_service = $1 AND (pg_has_role('sys_admin', 'MEMBER') OR ST_Covers((SELECT a FROM resp), ST_Transform(CAST(area AS geometry), (SELECT ST_SRID(a) FROM resp)))) AND id NOT IN (SELECT id FROM unchanged) ` invalidateFairwayDimensionSQL = ` WITH track AS ( -- Mark entry for deletion that has been removed from the data source INSERT INTO import.track_imports (import_id, deletion, relation, key) VALUES($1, true, 'waterway.fairway_dimensions', $2) ) -- Insert historic version with respective validity INSERT INTO waterway.fairway_dimensions ( area, validity, level_of_service, min_width, max_width, min_depth, date_info, source_organization) SELECT area, tstzrange(lower(validity), current_timestamp), level_of_service, min_width, max_width, min_depth, date_info, source_organization FROM waterway.fairway_dimensions WHERE id = $2 RETURNING id, ST_X(ST_Centroid(area::geometry)), ST_Y(ST_Centroid(area::geometry)) ` ) // Do executes the actual fairway dimension import. func (fd *FairwayDimension) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() feedback.Info("Import fairway dimensions") feedback.Info("Loading capabilities from %s", fd.URL) caps, err := wfs.GetCapabilities(fd.URL) if err != nil { feedback.Error("Loading capabilities failed: %v", err) return nil, err } ft := caps.FindFeatureType(fd.FeatureType) if ft == nil { return nil, fmt.Errorf("unknown feature type '%s'", fd.FeatureType) } feedback.Info("Found feature type '%s'", fd.FeatureType) epsg, err := wfs.CRSToEPSG(ft.DefaultCRS) if err != nil { feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS) return nil, err } dl, err := wfs.GetFeatures(caps, fd.FeatureType, fd.SortBy) if err != nil { feedback.Error("Cannot create GetFeature URLs. %v", err) return nil, err } tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() if _, err := tx.ExecContext(ctx, tmpTableSQL); err != nil { return nil, err } insertStmt, err := tx.PrepareContext(ctx, insertFairwayDimensionSQL) if err != nil { return nil, err } defer insertStmt.Close() invalidateStmt, err := tx.PrepareContext( ctx, invalidateFairwayDimensionSQL) if err != nil { return nil, err } defer invalidateStmt.Close() savepoint := Savepoint(ctx, tx, "feature") var ( unsupported = stringCounter{} missingProperties int badProperties int features int outside int fds []fdSummary ) if err := dl.Download(fd.User, fd.Password, func(url string, r io.Reader) error { feedback.Info("Get features from: '%s'", url) rfc, err := wfs.ParseRawFeatureCollection(r) if err != nil { return fmt.Errorf("parsing GetFeature document failed: %v", err) } if rfc.CRS != nil { crsName := rfc.CRS.Properties.Name if epsg, err = wfs.CRSToEPSG(crsName); err != nil { feedback.Error("Unsupported CRS: %d", crsName) return err } } // No features -> ignore. if rfc.Features == nil { return nil } feedback.Info("Using EPSG: %d", epsg) feedback.Info( "Found %d features in data source", len(rfc.Features)) features: for _, feature := range rfc.Features { if feature.Geometry.Coordinates == nil { missingProperties++ continue } var props fairwayDimensionProperties if err := json.Unmarshal(*feature.Properties, &props); err != nil { feedback.Warn("bad property: %v", err) badProperties++ continue } var dateInfo time.Time if props.HydroSorDat == nil || props.HydroSorDat.IsZero() { dateInfo = start } else { dateInfo = (*props.HydroSorDat).Time } var polys multiPolygonSlice switch feature.Geometry.Type { case "Polygon": var p polygonSlice if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil { return err } polys = multiPolygonSlice{p} case "MultiPolygon": if err := json.Unmarshal(*feature.Geometry.Coordinates, &polys); err != nil { return err } default: unsupported[feature.Geometry.Type]++ continue features } // Store the features. storePolygons: for _, p := range polys { var fdid int64 var lat, lon float64 switch err := savepoint(func() error { return insertStmt.QueryRowContext( ctx, p.asWKB(), epsg, fd.LOS, fd.MinWidth, fd.MaxWidth, fd.Depth, dateInfo, fd.SourceOrganization, ).Scan(&fdid, &lat, &lon) }); { case err == sql.ErrNoRows: outside++ // ignore -> filtered by responsibility area (stretches) continue storePolygons case err != nil: feedback.Error(pgxutils.ReadableError{Err: err}.Error()) continue storePolygons } // Store for potential later removal. if err := track( ctx, tx, importID, "waterway.fairway_dimensions", fdid, ); err != nil { return err } fds = append(fds, fdSummary{ID: fdid, Lat: lat, Lon: lon}) features++ } } return nil }); err != nil { return nil, err } if badProperties > 0 { feedback.Warn("Bad properties: %d", badProperties) } if missingProperties > 0 { feedback.Warn("Missing properties: %d", missingProperties) } if len(unsupported) != 0 { feedback.Warn("Unsupported types found: %s", unsupported) } if outside > 0 { feedback.Info( "Features outside responsibility area or unchanged: %d", outside) } if features == 0 { feedback.Info("No new features found") } else { feedback.Info("Stored %d features", features) } // Invalidate features that have been removed from data source res, err := tx.QueryContext(ctx, selectOldSQL, fd.LOS) if err != nil { return nil, err } defer res.Close() var oldIDs []int64 for res.Next() { var oldID int64 if err := res.Scan(&oldID); err != nil { return nil, err } oldIDs = append(oldIDs, oldID) } if err := res.Err(); err != nil { return nil, err } if features == 0 && len(oldIDs) == 0 { return nil, UnchangedError("Nothing changed") } if len(oldIDs) > 0 { feedback.Info( "Number of features removed from data source: %d", len(oldIDs)) var old int for _, oldID := range oldIDs { var fdid int64 var lat, lon float64 if err := savepoint(func() error { return invalidateStmt.QueryRowContext( ctx, importID, oldID, ).Scan(&fdid, &lat, &lon) }); err != nil { feedback.Error(pgxutils.ReadableError{Err: err}.Error() + "- while tracking invalidation of: %d", oldID) continue } fds = append(fds, fdSummary{ID: fdid, Lat: lat, Lon: lon}) if err := track( ctx, tx, importID, "waterway.fairway_dimensions", fdid, ); err != nil { return nil, err } old++ } // Do not fail if features > 0 because otherwise new features are lost if features == 0 && old == 0 { return nil, fmt.Errorf("invalidating features failed") } if old > 0 { feedback.Info("Number of features invalidated: %d", old) } } if err = tx.Commit(); err == nil { feedback.Info("Storing %d features took %s", features, time.Since(start)) } summary := struct { Date time.Time `json:"date"` LOS int `json:"los"` SourceOrganization string `json:"source-organization"` FdArea []fdSummary `json:"fd-area"` }{ Date: time.Now(), LOS: fd.LOS, SourceOrganization: fd.SourceOrganization, FdArea: fds, } return &summary, err }