Mercurial > gemma
view pkg/imports/dma.go @ 2307:e1aa9bb65da6
import_schedule: send email when manually triggering import
author | Thomas Junk <thomas.junk@intevation.de> |
---|---|
date | Mon, 18 Feb 2019 13:32:44 +0100 |
parents | 7c83b5277c1c |
children | 02505fcff63c |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Raimund Renkert <raimund.renkert@intevation.de> package imports import ( "context" "database/sql" "encoding/json" "errors" "fmt" "io" "time" "gemma.intevation.de/gemma/pkg/wfs" ) // FairwayDimension is an import job to import // the fairway dimensions in form of polygon geometries // and attribute data from a WFS service. type DistanceMarksAshore struct { // URL the GetCapabilities URL of the WFS service. URL string `json:"url"` // FeatureType selects the feature type of the WFS service. FeatureType string `json:"feature-type"` // SortBy sorts the feature by this key. SortBy string `json:"sort-by"` } // DMAJobKind is the import queue type identifier. const DMAJobKind JobKind = "dma" type dmaJobCreator struct{} func init() { RegisterJobCreator(DMAJobKind, dmaJobCreator{}) } func (dmaJobCreator) Description() string { return "distance marks" } func (dmaJobCreator) AutoAccept() bool { return true } func (dmaJobCreator) Create() Job { return new(DistanceMarksAshore) } func (dmaJobCreator) Depends() []string { return []string{ "distance_marks", } } // StageDone is a NOP for distance marks imports. func (dmaJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil } // CleanUp for distance marks imports is a NOP. func (*DistanceMarksAshore) CleanUp() error { return nil } type distanceMarksAshoreProperties struct { HydroCatdis int `json:"hydro_catdis"` } const ( deleteDistanceMarksSQL = ` WITH resp AS ( SELECT best_utm(area::geometry) AS t, ST_Transform(area::geometry, best_utm(area::geometry)) AS a FROM users.responsibility_areas WHERE country = users.current_user_country() ) DELETE FROM waterway.distance_marks WHERE ST_Covers( (SELECT a FROM resp), ST_Transform(geom::geometry, (SELECT t FROM resp))) ` insertDistanceMarksSQL = ` WITH resp AS ( SELECT best_utm(area::geometry) AS t, ST_Transform(area::geometry, best_utm(area::geometry)) AS a FROM users.responsibility_areas WHERE country = users.current_user_country() ) INSERT INTO waterway.distance_marks (geom, catdis) SELECT ST_Transform(clipped.geom, 4326)::geography, $3 FROM ( SELECT (ST_Dump( ST_Intersection( (SELECT a FROM resp), ST_Transform( ST_GeomFromWKB($1, $2::integer), (SELECT t FROM resp) ) ) )).geom AS geom ) AS clipped WHERE clipped.geom IS NOT NULL ` ) // Do executes the actual fairway dimension import. func (dma *DistanceMarksAshore) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() feedback.Info("Import distance marks") feedback.Info("Loading capabilities from %s", dma.URL) caps, err := wfs.GetCapabilities(dma.URL) if err != nil { feedback.Error("Loading capabilities failed: %v", err) return nil, err } ft := caps.FindFeatureType(dma.FeatureType) if ft == nil { return nil, fmt.Errorf("Unknown feature type '%s'", dma.FeatureType) } feedback.Info("Found feature type '%s'", dma.FeatureType) epsg, err := wfs.CRSToEPSG(ft.DefaultCRS) if err != nil { feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS) return nil, err } urls, err := wfs.GetFeaturesGET( caps, dma.FeatureType, "application/json", dma.SortBy) if err != nil { feedback.Error("Cannot create GetFeature URLs. %v", err) return nil, err } tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() insertStmt, err := tx.PrepareContext(ctx, insertDistanceMarksSQL) if err != nil { return nil, err } defer insertStmt.Close() // Delete the old features. if _, err := tx.ExecContext(ctx, deleteDistanceMarksSQL); err != nil { return nil, err } var ( unsupported = stringCounter{} missingProperties int badProperties int features int ) if err := wfs.DownloadURLs(urls, func(url string, r io.Reader) error { feedback.Info("Get features from: '%s'", url) rfc, err := wfs.ParseRawFeatureCollection(r) if err != nil { return fmt.Errorf("parsing GetFeature document failed: %v", err) } if rfc.CRS != nil { crsName := rfc.CRS.Properties.Name if epsg, err = wfs.CRSToEPSG(crsName); err != nil { feedback.Error("Unsupported CRS: %d", crsName) return err } } // No features -> ignore. if rfc.Features == nil { return nil } feedback.Info("Using EPSG: %d", epsg) for _, feature := range rfc.Features { if feature.Geometry.Coordinates == nil { missingProperties++ continue } var props distanceMarksAshoreProperties if err := json.Unmarshal(*feature.Properties, &props); err != nil { badProperties++ continue } switch feature.Geometry.Type { case "Point": var p pointSlice if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil { return err } if _, err := insertStmt.ExecContext( ctx, p.asWKB(), epsg, props.HydroCatdis, ); err != nil { feedback.Error("error: %s", err) return err } features++ default: unsupported[feature.Geometry.Type]++ } } return nil }); err != nil { return nil, err } if badProperties > 0 { feedback.Warn("Bad properties: %d", badProperties) } if missingProperties > 0 { feedback.Warn("Missing properties: %d", missingProperties) } if len(unsupported) != 0 { feedback.Warn("Unsupported types found: %s", unsupported) } if features == 0 { err := errors.New("No features found") feedback.Error("%v", err) return nil, err } if err = tx.Commit(); err == nil { feedback.Info("Storing %d features took %s", features, time.Since(start)) } return nil, err }