Mercurial > gemma
view pkg/imports/wp.go @ 5718:3d497077f888 uploadwg
Implemented direct file upload as alternative import method for WG.
For testing and data corrections it is useful to be able to import
waterway gauges data directly by uploading a xml file.
author | Sascha Wilde <wilde@sha-bang.de> |
---|---|
date | Thu, 18 Apr 2024 19:23:19 +0200 |
parents | 6270951dda28 |
children |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "bufio" "context" "database/sql" "encoding/csv" "encoding/json" "errors" "fmt" "io" "os" "path/filepath" "strconv" "strings" "time" "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/wfs" ) // defaultPointToLinePrecision is the precision in meters // to match from points to lines. const defaultPointToLinePrecision = 10 // WaterwayProfiles is a Job to import waterway profiles // from a given WFS service plus some uploaded CSV file // and stores them into the database. type WaterwayProfiles struct { Dir string `json:"dir"` // URL the GetCapabilities URL of the WFS service. URL string `json:"url"` // FeatureType selects the feature type of the WFS service. FeatureType string `json:"feature-type"` // SortBy works around misconfigured services to // establish a sort order to get the features. SortBy string `json:"sort-by"` // Precsion of match points to line strings. Precision *float64 `json:"precision,omitempty"` // User is an optional username for Basic Auth. User string `json:"user,omitempty"` // Password is an optional password for Basic Auth. Password string `json:"password,omitempty"` } // Description gives a short info about relevant facts of this import. func (wp *WaterwayProfiles) Description([]string) (string, error) { return wp.URL + "|" + wp.FeatureType, nil } // WPJobKind is the unique name of this import job type. const WPJobKind JobKind = "wp" type wpJobCreator struct{} func init() { RegisterJobCreator(WPJobKind, wpJobCreator{}) } func (wpJobCreator) Create() Job { return new(WaterwayProfiles) } func (wpJobCreator) AutoAccept() bool { return false } func (wpJobCreator) Description() string { return "waterway profiles" } func (wpJobCreator) Depends() [2][]string { return [2][]string{ {"waterway_profiles"}, {"distance_marks_virtual"}, } } const ( createGeomTempTableSQL = ` CREATE TEMP TABLE wp_geoms ( geom geography(linestring, 4326) ) ON COMMIT DROP` createTempIndexSQL = ` CREATE INDEX ON wp_geoms USING GIST(geom)` analyzeTempTableSQL = `ANALYZE wp_geoms` insertGeomTmpTableSQL = ` INSERT INTO wp_geoms (geom) VALUES ( ST_Transform(ST_GeomFromWKB($1, $2::int), 4326) )` hasDistanceMarkSQL = ` SELECT true FROM waterway.distance_marks_virtual WHERE location_code = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) LIMIT 1` insertWaterwayProfileSQL = ` INSERT INTO waterway.waterway_profiles ( location, geom, validity, lnwl, mwl, hnwl, fe30, fe100, date_info, source_organization ) VALUES ( ($1, $2, $3, $4, $5), ( SELECT wp_geoms.geom FROM wp_geoms, waterway.distance_marks_virtual AS dmv WHERE dmv.location_code = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) AND ST_DWithin(dmv.geom, wp_geoms.geom, $14::float) ORDER BY ST_Distance(dmv.geom, wp_geoms.geom, true) LIMIT 1 ), $6, $7, $8, $9, $10, $11, $12, $13 ) RETURNING id, geom is NULL` wpStageDoneSQL = ` UPDATE waterway.waterway_profiles SET staging_done = true WHERE id IN ( SELECT key FROM import.track_imports WHERE import_id = $1 AND relation = 'waterway.waterway_profiles'::regclass)` ) // StageDone moves the imported waterway profiles out of the staging area. func (wpJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, _ Feedback, ) error { _, err := tx.ExecContext(ctx, wpStageDoneSQL, id) return err } // CleanUp deletes temporary files from the filesystem. func (wp *WaterwayProfiles) CleanUp() error { return os.RemoveAll(wp.Dir) } // Do performs the actual import. func (wp *WaterwayProfiles) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (any, error) { start := time.Now() tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() if err := wp.downloadGeometries(ctx, tx, feedback); err != nil { return nil, fmt.Errorf("error downloading geometries: %v", err) } summary, err := wp.processCSV( ctx, importID, tx, start, feedback) if err != nil { return nil, fmt.Errorf("error processing CSV: %v", err) } if err := tx.Commit(); err != nil { return nil, fmt.Errorf( "importing waterway profiles failed after %s: %v", time.Since(start), err) } feedback.Info("Importing waterway profiles took %s", time.Since(start)) return summary, nil } func (wp *WaterwayProfiles) downloadGeometries( ctx context.Context, tx *sql.Tx, feedback Feedback, ) error { feedback.Info("Start downloading geometries from WFS.") feedback.Info("Loading capabilities from %s", wp.URL) caps, err := wfs.GetCapabilities(wp.URL) if err != nil { feedback.Error("Loading capabilities failed: %v", err) return err } ft := caps.FindFeatureType(wp.FeatureType) if ft == nil { return fmt.Errorf("unknown feature type '%s'", wp.FeatureType) } feedback.Info("Found feature type '%s'", wp.FeatureType) epsg, err := wfs.CRSToEPSG(ft.DefaultCRS) if err != nil { feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS) return err } if wp.SortBy != "" { feedback.Info("Features will be sorted by '%s'", wp.SortBy) } dl, err := wfs.GetFeatures(caps, wp.FeatureType, wp.SortBy) if err != nil { feedback.Error("Cannot create GetFeature URLs. %v", err) return err } if _, err := tx.ExecContext(ctx, createGeomTempTableSQL); err != nil { return err } if _, err := tx.ExecContext(ctx, createTempIndexSQL); err != nil { return err } insertStmt, err := tx.PrepareContext(ctx, insertGeomTmpTableSQL) if err != nil { return err } defer insertStmt.Close() var ( unsupported = stringCounter{} missingProperties int features int ) if err := dl.Download(wp.User, wp.Password, func(url string, r io.Reader) error { feedback.Info("Get features from: '%s'", url) rfc, err := wfs.ParseRawFeatureCollection(r) if err != nil { return fmt.Errorf("parsing GetFeature document failed: %v", err) } if rfc.CRS != nil { crsName := rfc.CRS.Properties.Name if epsg, err = wfs.CRSToEPSG(crsName); err != nil { feedback.Error("Unsupported CRS: %d", crsName) return err } } // No features -> ignore. if rfc.Features == nil { return nil } for _, feature := range rfc.Features { if feature.Geometry.Coordinates == nil { missingProperties++ continue } switch feature.Geometry.Type { case "LineString": var l lineSlice if err := json.Unmarshal(*feature.Geometry.Coordinates, &l); err != nil { return err } if _, err := insertStmt.ExecContext( ctx, l.asWKB(), epsg, ); err != nil { return err } features++ case "MultiLineString": var ml multiLineSlice if err := json.Unmarshal(*feature.Geometry.Coordinates, &ml); err != nil { return err } for i := range ml { if _, err := insertStmt.ExecContext( ctx, ml[i].asWKB(), epsg, ); err != nil { return err } features++ } default: unsupported[feature.Geometry.Type]++ } } return nil }); err != nil { return err } if missingProperties > 0 { feedback.Warn("Missing properties: %d", missingProperties) } if len(unsupported) != 0 { feedback.Warn("Unsupported types found: %s", unsupported) } if features == 0 { return errors.New("no features found") } if _, err := tx.ExecContext(ctx, analyzeTempTableSQL); err != nil { return err } return nil } func parseFloat64(s string) (sql.NullFloat64, error) { if s == "" { return sql.NullFloat64{}, nil } s = strings.Replace(s, ",", ".", -1) v, err := strconv.ParseFloat(s, 64) if err != nil { return sql.NullFloat64{}, err } return sql.NullFloat64{Float64: v, Valid: true}, nil } func (wp *WaterwayProfiles) processCSV( ctx context.Context, importID int64, tx *sql.Tx, start time.Time, feedback Feedback, ) (any, error) { feedback.Info("Start processing CSV file.") f, err := os.Open(filepath.Join(wp.Dir, "wp.csv")) if err != nil { return nil, err } defer f.Close() r := csv.NewReader(bufio.NewReader(f)) r.Comma = ';' r.ReuseRecord = true headers, err := r.Read() if err != nil { return nil, err } var ( locationIdx = -1 validFromIdx = -1 validToIdx = -1 lnwlIdx = -1 mwlIdx = -1 hnwlIdx = -1 fe30Idx = -1 fe100Idx = -1 dateInfoIdx = -1 sourceIdx = -1 ) fields := []struct { idx *int substr string }{ {&locationIdx, "location"}, {&validFromIdx, "valid_from"}, {&validToIdx, "valid_to"}, {&lnwlIdx, "lnwl"}, {&mwlIdx, "mwl"}, {&hnwlIdx, "hnwl"}, {&fe30Idx, "fe30"}, {&fe100Idx, "fe100"}, {&dateInfoIdx, "date_info"}, {&sourceIdx, "source"}, } nextHeader: for i, h := range headers { h = strings.ToLower(h) for j := range fields { if strings.Contains(h, fields[j].substr) { if *fields[j].idx != -1 { return nil, fmt.Errorf( "CSV has more than one column with name containing '%s'", fields[j].substr) } *fields[j].idx = i continue nextHeader } } } var missing []string for i := range fields { if *fields[i].idx == -1 { missing = append(missing, fields[i].substr) } } if len(missing) > 0 { return nil, fmt.Errorf( "CSV is missing columns: %s", strings.Join(missing, ", ")) } var precision float64 if wp.Precision != nil { if precision = *wp.Precision; precision < 0 { precision = -precision } } else { precision = defaultPointToLinePrecision } feedback.Info( "Matching points to lines with a precision of %.4fm.", precision) parseDate := common.TimeParser([]string{"02.01.2006"}).Parse insertStmt, err := tx.PrepareContext(ctx, insertWaterwayProfileSQL) if err != nil { return nil, err } defer insertStmt.Close() trackStmt, err := tx.PrepareContext(ctx, trackImportSQL) if err != nil { return nil, err } defer trackStmt.Close() hasDistanceMarkStmt, err := tx.PrepareContext(ctx, hasDistanceMarkSQL) if err != nil { return nil, err } defer hasDistanceMarkStmt.Close() var ids []int64 lines: for line := 1; ; line++ { row, err := r.Read() switch { case err == io.EOF || len(row) == 0: break lines case err != nil: return nil, fmt.Errorf("CSV parsing failed: %v", err) } location, err := models.IsrsFromString(row[locationIdx]) if err != nil { return nil, fmt.Errorf( "invalid ISRS location code in line %d: %v", line, err) } var dummy bool err = hasDistanceMarkStmt.QueryRowContext( ctx, location.CountryCode, location.LoCode, location.FairwaySection, location.Orc, location.Hectometre, ).Scan(&dummy) switch { case err == sql.ErrNoRows: feedback.Warn("No virtual distance mark found for %s.", location) continue lines case err != nil: return nil, err case !dummy: return nil, errors.New("unexpected result form database") } validFromTime, err := parseDate(row[validFromIdx]) if err != nil { return nil, fmt.Errorf( "invalid 'valid_from' value in line %d: %v", line, err) } validToTime, err := parseDate(row[validToIdx]) if err != nil { return nil, fmt.Errorf( "invalid 'valid_to' value in line %d: %v", line, err) } validFrom := pgtype.Timestamptz{ Time: validFromTime, Status: pgtype.Present, } validTo := pgtype.Timestamptz{ Time: validToTime, Status: pgtype.Present, } validity := pgtype.Tstzrange{ Lower: validFrom, Upper: validTo, LowerType: pgtype.Inclusive, UpperType: pgtype.Exclusive, Status: pgtype.Present, } lnwl, err := parseFloat64(row[lnwlIdx]) if err != nil { return nil, fmt.Errorf( "invalid 'lnwl' value in line %d: %v", line, err) } mwl, err := parseFloat64(row[mwlIdx]) if err != nil { return nil, fmt.Errorf( "invalid 'mwl' value in line %d: %v", line, err) } hnwl, err := parseFloat64(row[hnwlIdx]) if err != nil { return nil, fmt.Errorf( "invalid 'hnwl' value in line %d: %v", line, err) } fe30, err := parseFloat64(row[fe30Idx]) if err != nil { return nil, fmt.Errorf( "invalid 'fe30' value in line %d: %v", line, err) } fe100, err := parseFloat64(row[fe100Idx]) if err != nil { return nil, fmt.Errorf( "invalid 'fe100' value in line %d: %v", line, err) } var dateInfo time.Time if di := row[dateInfoIdx]; di == "" { dateInfo = start } else if dateInfo, err = parseDate(di); err != nil { return nil, fmt.Errorf( "invalid 'date_info' value in line %d: %v", line, err) } source := row[sourceIdx] var id int64 var noGeom bool if err := insertStmt.QueryRowContext( ctx, location.CountryCode, location.LoCode, location.FairwaySection, location.Orc, location.Hectometre, &validity, lnwl, mwl, hnwl, fe30, fe100, dateInfo, source, precision, ).Scan(&id, &noGeom); err != nil { return nil, err } if _, err := trackStmt.ExecContext( ctx, importID, "waterway.waterway_profiles", id); err != nil { return nil, err } if noGeom { feedback.Warn( "No profile geometry found for %s in line %d.", location, line) } ids = append(ids, id) } if len(ids) == 0 { return nil, UnchangedError("No new entries in waterway profiles.") } feedback.Info("%d new entries in waterway profiles.", len(ids)) summary := struct { IDs []int64 `json:"ids"` }{ IDs: ids, } return &summary, nil }