Mercurial > gemma
view pkg/imports/wp.go @ 2307:e1aa9bb65da6
import_schedule: send email when manually triggering import
author | Thomas Junk <thomas.junk@intevation.de> |
---|---|
date | Mon, 18 Feb 2019 13:32:44 +0100 |
parents | 7c83b5277c1c |
children | 02505fcff63c |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "bufio" "context" "database/sql" "encoding/csv" "encoding/json" "errors" "fmt" "io" "os" "path/filepath" "strconv" "strings" "time" "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/misc" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/wfs" ) // defaultPointToLinePrecision is the precision in meters // to match from points to lines. const defaultPointToLinePrecision = 10 type WaterwayProfiles struct { Dir string `json:"dir"` // URL the GetCapabilities URL of the WFS service. URL string `json:"url"` // FeatureType selects the feature type of the WFS service. FeatureType string `json:"feature-type"` // SortBy works around misconfigured services to // establish a sort order to get the features. SortBy string `json:"sort-by"` // Precsion of match points to line strings. Precision *float64 `json:"precision,omitempty"` } const WPJobKind JobKind = "wp" type wpJobCreator struct{} func init() { RegisterJobCreator(WPJobKind, wpJobCreator{}) } func (wpJobCreator) Create() Job { return new(WaterwayProfiles) } func (wpJobCreator) AutoAccept() bool { return false } func (wpJobCreator) Description() string { return "waterway profiles" } func (wpJobCreator) Depends() []string { return []string{ "waterway_profiles", } } const ( createGeomTempTableSQL = ` CREATE TEMP TABLE wp_geoms ( geom geography(linestring, 4326) ) ON COMMIT DROP` createTempIndexSQL = ` CREATE INDEX ON wp_geoms USING GIST(geom)` analyzeTempTableSQL = `ANALYZE wp_geoms` insertGeomTmpTableSQL = ` INSERT INTO wp_geoms (geom) VALUES ( ST_Transform(ST_GeomFromWKB($1, $2::int), 4326) )` hasDistanceMarkSQL = ` SELECT true FROM waterway.distance_marks_virtual WHERE location_code = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) LIMIT 1` insertWaterwayProfileSQL = ` INSERT INTO waterway.waterway_profiles ( location, geom, validity, lnwl, mwl, hnwl, fe30, fe100, date_info, source_organization ) VALUES ( ($1, $2, $3, $4, $5), ( SELECT wp_geoms.geom FROM wp_geoms, waterway.distance_marks_virtual AS dmv WHERE dmv.location_code = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) AND ST_DWithin(dmv.geom, wp_geoms.geom, $14::float) ORDER BY ST_Distance(dmv.geom, wp_geoms.geom, true) LIMIT 1 ), $6, $7, $8, $9, $10, $11, $12, $13 ) RETURNING id, geom is NULL` wpStageDoneSQL = ` UPDATE waterway.waterway_profiles SET staging_done = true WHERE id IN ( SELECT key FROM import.track_imports WHERE import_id = $1 AND relation = 'waterway.waterway_profiles'::regclass)` ) func (wpJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, ) error { _, err := tx.ExecContext(ctx, wpStageDoneSQL, id) return err } func (wp *WaterwayProfiles) CleanUp() error { return os.RemoveAll(wp.Dir) } func (wp *WaterwayProfiles) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() if err := wp.downloadGeometries( ctx, importID, tx, start, feedback); err != nil { return nil, fmt.Errorf("error downloading geometries: %v", err) } summary, err := wp.processCSV( ctx, importID, tx, start, feedback) if err != nil { return nil, fmt.Errorf("error processing CSV: %v", err) } if err := tx.Commit(); err != nil { return nil, fmt.Errorf( "Importing waterway profiles failed after %s: %v", time.Since(start), err) } feedback.Info("Importing waterway profiles took %s", time.Since(start)) return summary, nil } func (wp *WaterwayProfiles) downloadGeometries( ctx context.Context, importID int64, tx *sql.Tx, start time.Time, feedback Feedback, ) error { feedback.Info("Start downloading geometries from WFS.") feedback.Info("Loading capabilities from %s", wp.URL) caps, err := wfs.GetCapabilities(wp.URL) if err != nil { feedback.Error("Loading capabilities failed: %v", err) return err } ft := caps.FindFeatureType(wp.FeatureType) if ft == nil { return fmt.Errorf("Unknown feature type '%s'", wp.FeatureType) } feedback.Info("Found feature type '%s", wp.FeatureType) epsg, err := wfs.CRSToEPSG(ft.DefaultCRS) if err != nil { feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS) return err } if wp.SortBy != "" { feedback.Info("Features will be sorted by '%s'", wp.SortBy) } urls, err := wfs.GetFeaturesGET( caps, wp.FeatureType, "application/json", wp.SortBy) if err != nil { feedback.Error("Cannot create GetFeature URLs. %v", err) return err } if _, err := tx.ExecContext(ctx, createGeomTempTableSQL); err != nil { return err } if _, err := tx.ExecContext(ctx, createTempIndexSQL); err != nil { return err } insertStmt, err := tx.PrepareContext(ctx, insertGeomTmpTableSQL) if err != nil { return err } defer insertStmt.Close() var ( unsupported = stringCounter{} missingProperties int features int ) if err := wfs.DownloadURLs(urls, func(url string, r io.Reader) error { feedback.Info("Get features from: '%s'", url) rfc, err := wfs.ParseRawFeatureCollection(r) if err != nil { return fmt.Errorf("parsing GetFeature document failed: %v", err) } if rfc.CRS != nil { crsName := rfc.CRS.Properties.Name if epsg, err = wfs.CRSToEPSG(crsName); err != nil { feedback.Error("Unsupported CRS: %d", crsName) return err } } // No features -> ignore. if rfc.Features == nil { return nil } for _, feature := range rfc.Features { if feature.Geometry.Coordinates == nil { missingProperties++ continue } switch feature.Geometry.Type { case "LineString": var l lineSlice if err := json.Unmarshal(*feature.Geometry.Coordinates, &l); err != nil { return err } if _, err := insertStmt.ExecContext( ctx, l.asWKB(), epsg, ); err != nil { return err } features++ default: unsupported[feature.Geometry.Type]++ } } return nil }); err != nil { return err } if missingProperties > 0 { feedback.Warn("Missing properties: %d", missingProperties) } if len(unsupported) != 0 { feedback.Warn("Unsupported types found: %s", unsupported) } if features == 0 { return errors.New("No features found") } if _, err := tx.ExecContext(ctx, analyzeTempTableSQL); err != nil { return err } return nil } func parseFloat64(s string) (sql.NullFloat64, error) { if s == "" { return sql.NullFloat64{}, nil } s = strings.Replace(s, ",", ".", -1) v, err := strconv.ParseFloat(s, 64) if err != nil { return sql.NullFloat64{}, err } return sql.NullFloat64{Float64: v, Valid: true}, nil } func (wp *WaterwayProfiles) processCSV( ctx context.Context, importID int64, tx *sql.Tx, start time.Time, feedback Feedback, ) (interface{}, error) { feedback.Info("Start processing CSV file.") f, err := os.Open(filepath.Join(wp.Dir, "wp.csv")) if err != nil { return nil, err } defer f.Close() r := csv.NewReader(bufio.NewReader(f)) r.Comma = ';' r.ReuseRecord = true headers, err := r.Read() if err != nil { return nil, err } var ( locationIdx = -1 validFromIdx = -1 validToIdx = -1 lnwlIdx = -1 mwlIdx = -1 hnwlIdx = -1 fe30Idx = -1 fe100Idx = -1 dateInfoIdx = -1 sourceIdx = -1 ) fields := []struct { idx *int substr string }{ {&locationIdx, "location"}, {&validFromIdx, "valid_from"}, {&validToIdx, "valid_to"}, {&lnwlIdx, "lnwl"}, {&mwlIdx, "mwl"}, {&hnwlIdx, "hnwl"}, {&fe30Idx, "fe30"}, {&fe100Idx, "fe100"}, {&dateInfoIdx, "date_info"}, {&sourceIdx, "source"}, } nextHeader: for i, h := range headers { h = strings.ToLower(h) for j := range fields { if strings.Contains(h, fields[j].substr) { if *fields[j].idx != -1 { return nil, fmt.Errorf( "CSV has more than one column with name containing '%s'", fields[j].substr) } *fields[j].idx = i continue nextHeader } } } var missing []string for i := range fields { if *fields[i].idx == -1 { missing = append(missing, fields[i].substr) } } if len(missing) > 0 { return nil, fmt.Errorf( "CSV is missing columns: %s", strings.Join(missing, ", ")) } var precision float64 if wp.Precision != nil { if precision = *wp.Precision; precision < 0 { precision = -precision } } else { precision = defaultPointToLinePrecision } feedback.Info( "Matching points to lines with a precision of %.4fm.", precision) parseDate := misc.TimeGuesser([]string{"02.01.2006"}).Guess insertStmt, err := tx.PrepareContext(ctx, insertWaterwayProfileSQL) if err != nil { return nil, err } defer insertStmt.Close() trackStmt, err := tx.PrepareContext(ctx, trackImportSQL) if err != nil { return nil, err } defer trackStmt.Close() hasDistanceMarkStmt, err := tx.PrepareContext(ctx, hasDistanceMarkSQL) if err != nil { return nil, err } defer hasDistanceMarkStmt.Close() var ids []int64 lines: for line := 1; ; line++ { row, err := r.Read() switch { case err == io.EOF || len(row) == 0: break lines case err != nil: return nil, fmt.Errorf("CSV parsing failed: %v", err) } location, err := models.IsrsFromString(row[locationIdx]) if err != nil { return nil, fmt.Errorf( "Invalid ISRS location code in line %d: %v", line, err) } var dummy bool err = hasDistanceMarkStmt.QueryRowContext( ctx, location.CountryCode, location.LoCode, location.FairwaySection, location.Orc, location.Hectometre, ).Scan(&dummy) switch { case err == sql.ErrNoRows: feedback.Warn("No virtual distance mark found for %s.", location) continue lines case err != nil: return nil, err case !dummy: return nil, errors.New("unexpected result form database") } validFromTime, err := parseDate(row[validFromIdx]) if err != nil { return nil, fmt.Errorf( "Invalid 'valid_from' value in line %d: %v", line, err) } validToTime, err := parseDate(row[validToIdx]) if err != nil { return nil, fmt.Errorf( "Invalid 'valid_to' value in line %d: %v", line, err) } validFrom := pgtype.Timestamptz{ Time: validFromTime, Status: pgtype.Present, } validTo := pgtype.Timestamptz{ Time: validToTime, Status: pgtype.Present, } validity := pgtype.Tstzrange{ Lower: validFrom, Upper: validTo, LowerType: pgtype.Inclusive, UpperType: pgtype.Exclusive, Status: pgtype.Present, } lnwl, err := parseFloat64(row[lnwlIdx]) if err != nil { return nil, fmt.Errorf( "Invalid 'lnwl' value in line %d: %v", line, err) } mwl, err := parseFloat64(row[mwlIdx]) if err != nil { return nil, fmt.Errorf( "Invalid 'mwl' value in line %d: %v", line, err) } hnwl, err := parseFloat64(row[hnwlIdx]) if err != nil { return nil, fmt.Errorf( "Invalid 'hnwl' value in line %d: %v", line, err) } fe30, err := parseFloat64(row[fe30Idx]) if err != nil { return nil, fmt.Errorf( "Invalid 'fe30' value in line %d: %v", line, err) } fe100, err := parseFloat64(row[fe100Idx]) if err != nil { return nil, fmt.Errorf( "Invalid 'fe100' value in line %d: %v", line, err) } var dateInfo time.Time if di := row[dateInfoIdx]; di == "" { dateInfo = start } else if dateInfo, err = parseDate(di); err != nil { return nil, fmt.Errorf( "Invalid 'date_info' value in line %d: %v", line, err) } source := row[sourceIdx] var id int64 var noGeom bool if err := insertStmt.QueryRowContext( ctx, location.CountryCode, location.LoCode, location.FairwaySection, location.Orc, location.Hectometre, &validity, lnwl, mwl, hnwl, fe30, fe100, dateInfo, source, precision, ).Scan(&id, &noGeom); err != nil { return nil, err } if _, err := trackStmt.ExecContext( ctx, importID, "waterway.waterway_profiles", id); err != nil { return nil, err } if noGeom { feedback.Warn( "No profile geometry found for %s in line %d.", location, line) } ids = append(ids, id) } if len(ids) == 0 { return nil, UnchangedError("No new entries in waterway profiles.") } feedback.Info("%d new entries in waterway profiles.", len(ids)) summary := struct { IDs []int64 `json:"ids"` }{ IDs: ids, } return &summary, nil }