Mercurial > gemma
view pkg/imports/dma.go @ 5718:3d497077f888 uploadwg
Implemented direct file upload as alternative import method for WG.
For testing and data corrections it is useful to be able to import
waterway gauges data directly by uploading a xml file.
author | Sascha Wilde <wilde@sha-bang.de> |
---|---|
date | Thu, 18 Apr 2024 19:23:19 +0200 |
parents | 6270951dda28 |
children |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Raimund Renkert <raimund.renkert@intevation.de> package imports import ( "context" "database/sql" "encoding/json" "errors" "fmt" "io" "time" "gemma.intevation.de/gemma/pkg/wfs" ) // DistanceMarksAshore is an import job to import // the distance marks ashore in form of point geometries // and attribute data from a WFS service. type DistanceMarksAshore struct { // URL the GetCapabilities URL of the WFS service. URL string `json:"url"` // FeatureType selects the feature type of the WFS service. FeatureType string `json:"feature-type"` // SortBy sorts the feature by this key. SortBy string `json:"sort-by"` // User is an optional username for Basic Auth. User string `json:"user,omitempty"` // Password is an optional password for Basic Auth. Password string `json:"password,omitempty"` } // Description gives a short info about relevant facts of this import. func (dma *DistanceMarksAshore) Description([]string) (string, error) { return dma.URL + "|" + dma.FeatureType, nil } // DMAJobKind is the import queue type identifier. const DMAJobKind JobKind = "dma" type dmaJobCreator struct{} func init() { RegisterJobCreator(DMAJobKind, dmaJobCreator{}) } func (dmaJobCreator) Description() string { return "distance marks" } func (dmaJobCreator) AutoAccept() bool { return true } func (dmaJobCreator) Create() Job { return new(DistanceMarksAshore) } func (dmaJobCreator) Depends() [2][]string { return [2][]string{{"distance_marks"}} } // StageDone is a NOP for distance marks imports. func (dmaJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error { return nil } // CleanUp for distance marks imports is a NOP. func (*DistanceMarksAshore) CleanUp() error { return nil } type distanceMarksAshoreProperties struct { HydroCatdis int `json:"hydro_catdis"` } const ( deleteDistanceMarksSQL = ` WITH resp AS ( SELECT users.current_user_area_utm() AS a ) DELETE FROM waterway.distance_marks WHERE pg_has_role('sys_admin', 'MEMBER') OR ST_Covers((SELECT a FROM resp), ST_Transform(geom::geometry, (SELECT ST_SRID(a) FROM resp))) ` insertDistanceMarksSQL = ` WITH resp AS ( SELECT users.current_user_area_utm() AS a ) INSERT INTO waterway.distance_marks (geom, catdis) SELECT ST_Transform(new_dma, 4326), $3 FROM (SELECT CASE WHEN pg_has_role('sys_admin', 'MEMBER') THEN dma ELSE ST_Intersection((SELECT a FROM resp), ST_Transform(dma, (SELECT ST_SRID(a) FROM resp))) END AS new_dma FROM ST_GeomFromWKB($1, $2::integer) AS dma (dma)) AS new_dma WHERE NOT ST_IsEmpty(new_dma) RETURNING id ` ) // Do executes the actual fairway dimension import. func (dma *DistanceMarksAshore) Do( ctx context.Context, _ int64, conn *sql.Conn, feedback Feedback, ) (any, error) { start := time.Now() feedback.Info("Import distance marks") feedback.Info("Loading capabilities from %s", dma.URL) caps, err := wfs.GetCapabilities(dma.URL) if err != nil { feedback.Error("Loading capabilities failed: %v", err) return nil, err } ft := caps.FindFeatureType(dma.FeatureType) if ft == nil { return nil, fmt.Errorf("unknown feature type '%s'", dma.FeatureType) } feedback.Info("Found feature type '%s'", dma.FeatureType) epsg, err := wfs.CRSToEPSG(ft.DefaultCRS) if err != nil { feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS) return nil, err } dl, err := wfs.GetFeatures(caps, dma.FeatureType, dma.SortBy) if err != nil { feedback.Error("Cannot create GetFeature URLs. %v", err) return nil, err } tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() insertStmt, err := tx.PrepareContext(ctx, insertDistanceMarksSQL) if err != nil { return nil, err } defer insertStmt.Close() // Delete the old features. if _, err := tx.ExecContext(ctx, deleteDistanceMarksSQL); err != nil { return nil, err } var ( unsupported = stringCounter{} missingProperties int badProperties int outside int features int ) if err := dl.Download(dma.User, dma.Password, func(url string, r io.Reader) error { feedback.Info("Get features from: '%s'", url) rfc, err := wfs.ParseRawFeatureCollection(r) if err != nil { return fmt.Errorf("parsing GetFeature document failed: %v", err) } if rfc.CRS != nil { crsName := rfc.CRS.Properties.Name if epsg, err = wfs.CRSToEPSG(crsName); err != nil { feedback.Error("Unsupported CRS: %d", crsName) return err } } // No features -> ignore. if rfc.Features == nil { return nil } feedback.Info("Using EPSG: %d", epsg) for _, feature := range rfc.Features { if feature.Geometry.Coordinates == nil { missingProperties++ continue } var props distanceMarksAshoreProperties if err := json.Unmarshal(*feature.Properties, &props); err != nil { badProperties++ continue } switch feature.Geometry.Type { case "Point": var p pointSlice if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil { return err } var dmaid int64 err := insertStmt.QueryRowContext( ctx, p.asWKB(), epsg, props.HydroCatdis, ).Scan(&dmaid) switch { case err == sql.ErrNoRows: outside++ // ignore -> filtered by responsibility area continue case err != nil: return err } features++ default: unsupported[feature.Geometry.Type]++ } } return nil }); err != nil { return nil, err } if badProperties > 0 { feedback.Warn("Bad properties: %d", badProperties) } if missingProperties > 0 { feedback.Warn("Missing properties: %d", missingProperties) } if len(unsupported) != 0 { feedback.Warn("Unsupported types found: %s", unsupported) } if outside > 0 { feedback.Info("Features outside responsibility area: %d", outside) } if features == 0 { err := errors.New("no features found") feedback.Error("%v", err) return nil, err } if err = tx.Commit(); err == nil { feedback.Info("Storing %d features took %s", features, time.Since(start)) } return nil, err }