Mercurial > gemma
view pkg/imports/wp.go @ 2130:f3aabc05f9b2
Fix constraints on waterway profiles
staging_done in the UNIQUE constraint had no effect, because the
exclusion constraint prevented two rows with equal location and
validity anyhow. Adding staging_done to the exclusion constraint
makes the UNIQUE constraint checking only a corner case of what
the exclusion constraint checks. Thus, remove the UNIQUE constraint.
Casting staging_done to int is needed because there is no appropriate
operator class for booleans. Casting to smallint or even bit would have
been better (i.e. should result in smaller index size), but that would
have required creating such a CAST, in addition.
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Wed, 06 Feb 2019 15:42:32 +0100 |
parents | a1f2cfa624cf |
children | f0641b5ad065 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "bufio" "context" "database/sql" "encoding/csv" "encoding/json" "errors" "fmt" "io" "os" "path/filepath" "strconv" "strings" "time" "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/misc" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/wfs" ) // defaultPointToLinePrecision is the precision in meters // to match from points to lines. const defaultPointToLinePrecision = 10 type WaterwayProfiles struct { Dir string `json:"dir"` // URL the GetCapabilities URL of the WFS service. URL string `json:"url"` // FeatureType selects the feature type of the WFS service. FeatureType string `json:"feature-type"` // SortBy works around misconfigured services to // establish a sort order to get the features. SortBy string `json:"sort-by"` // Precsion of match points to line strings. Precision *float64 `json:"precision,omitempty"` } const WPJobKind JobKind = "wp" type wpJobCreator struct{} func init() { RegisterJobCreator(WPJobKind, wpJobCreator{}) } func (wpJobCreator) Create(_ JobKind, data string) (Job, error) { wp := new(WaterwayProfiles) if err := common.FromJSONString(data, wp); err != nil { return nil, err } return wp, nil } func (wpJobCreator) AutoAccept() bool { return false } func (wpJobCreator) Description() string { return "waterway profiles" } func (wpJobCreator) Depends() []string { return []string{ "waterway_profiles", } } const ( createGeomTempTableSQL = ` CREATE TEMP TABLE wp_geoms ( geom geography(linestring, 4326) ) ON COMMIT DROP` createTempIndexSQL = ` CREATE INDEX ON wp_geoms USING GIST(geom)` analyzeTempTableSQL = `ANALYZE wp_geoms` insertGeomTmpTableSQL = ` INSERT INTO wp_geoms (geom) VALUES ( ST_Transform(ST_GeomFromWKB($1, $2::int), 4326) )` insertWaterwayProfileSQL = ` INSERT INTO waterway.waterway_profiles ( location, geom, validity, lnwl, mwl, hnwl, fe30, fe100, date_info, source_organization ) VALUES ( ($1, $2, $3, $4, $5), ( SELECT wp_geoms.geom FROM wp_geoms, waterway.distance_marks_virtual AS dmv WHERE dmv.location_code = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) AND ST_DWithin(dmv.geom, wp_geoms.geom, $14::float) ORDER BY ST_Distance(dmv.geom, wp_geoms.geom, true) LIMIT 1 ), $6, $7, $8, $9, $10, $11, $12, $13 ) RETURNING id, geom is NULL` wpStageDoneSQL = ` UPDATE waterway.waterway_profiles SET staging_done = true WHERE id IN ( SELECT key FROM import.track_imports WHERE import_id = $1 AND relation = 'waterway.waterway_profiles'::regclass)` ) func (wpJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, ) error { _, err := tx.ExecContext(ctx, wpStageDoneSQL, id) return err } func (wp *WaterwayProfiles) CleanUp() error { return os.RemoveAll(wp.Dir) } func (wp *WaterwayProfiles) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() if err := wp.downloadGeometries( ctx, importID, tx, start, feedback); err != nil { return nil, fmt.Errorf("error downloading geometries: %v", err) } summary, err := wp.processCSV( ctx, importID, tx, start, feedback) if err != nil { return nil, fmt.Errorf("error processing CVS: %v", err) } if err := tx.Commit(); err != nil { return nil, fmt.Errorf( "Importing waterway profiles failed after %s: %v", time.Since(start), err) } feedback.Info("Importing waterway profiles took %s", time.Since(start)) return summary, nil } func (wp *WaterwayProfiles) downloadGeometries( ctx context.Context, importID int64, tx *sql.Tx, start time.Time, feedback Feedback, ) error { feedback.Info("Start downloading geometries from WFS.") feedback.Info("Loading capabilities from %s", wp.URL) caps, err := wfs.GetCapabilities(wp.URL) if err != nil { feedback.Error("Loading capabilities failed: %v", err) return err } ft := caps.FindFeatureType(wp.FeatureType) if ft == nil { return fmt.Errorf("Unknown feature type '%s'", wp.FeatureType) } feedback.Info("Found feature type '%s", wp.FeatureType) epsg, err := wfs.CRSToEPSG(ft.DefaultCRS) if err != nil { feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS) return err } if wp.SortBy != "" { feedback.Info("Features will be sorted by '%s'", wp.SortBy) } urls, err := wfs.GetFeaturesGET( caps, wp.FeatureType, "application/json", wp.SortBy) if err != nil { feedback.Error("Cannot create GetFeature URLs. %v", err) return err } if _, err := tx.ExecContext(ctx, createGeomTempTableSQL); err != nil { return err } if _, err := tx.ExecContext(ctx, createTempIndexSQL); err != nil { return err } insertStmt, err := tx.PrepareContext(ctx, insertGeomTmpTableSQL) if err != nil { return err } defer insertStmt.Close() var ( unsupported = stringCounter{} missingProperties int features int ) if err := wfs.DownloadURLs(urls, func(url string, r io.Reader) error { feedback.Info("Get features from: '%s'", url) rfc, err := wfs.ParseRawFeatureCollection(r) if err != nil { return fmt.Errorf("parsing GetFeature document failed: %v", err) } if rfc.CRS != nil { crsName := rfc.CRS.Properties.Name if epsg, err = wfs.CRSToEPSG(crsName); err != nil { feedback.Error("Unsupported CRS: %d", crsName) return err } } // No features -> ignore. if rfc.Features == nil { return nil } for _, feature := range rfc.Features { if feature.Geometry.Coordinates == nil { missingProperties++ continue } switch feature.Geometry.Type { case "LineString": var l lineSlice if err := json.Unmarshal(*feature.Geometry.Coordinates, &l); err != nil { return err } if _, err := insertStmt.ExecContext( ctx, l.asWKB(), epsg, ); err != nil { return err } features++ default: unsupported[feature.Geometry.Type]++ } } return nil }); err != nil { return err } if missingProperties > 0 { feedback.Warn("Missing properties: %d", missingProperties) } if len(unsupported) != 0 { feedback.Warn("Unsupported types found: %s", unsupported) } if features == 0 { return errors.New("No features found") } if _, err := tx.ExecContext(ctx, analyzeTempTableSQL); err != nil { return err } return nil } func parseFloat64(s string) (sql.NullFloat64, error) { if s == "" { return sql.NullFloat64{}, nil } s = strings.Replace(s, ",", ".", -1) v, err := strconv.ParseFloat(s, 64) if err != nil { return sql.NullFloat64{}, err } return sql.NullFloat64{Float64: v, Valid: true}, nil } func (wp *WaterwayProfiles) processCSV( ctx context.Context, importID int64, tx *sql.Tx, start time.Time, feedback Feedback, ) (interface{}, error) { feedback.Info("Start processing CSV file.") f, err := os.Open(filepath.Join(wp.Dir, "wp.csv")) if err != nil { return nil, err } defer f.Close() r := csv.NewReader(bufio.NewReader(f)) r.Comma = ';' r.ReuseRecord = true headers, err := r.Read() if err != nil { return nil, err } var ( locationIdx = -1 validFromIdx = -1 validToIdx = -1 lnwlIdx = -1 mwlIdx = -1 hnwlIdx = -1 fe30Idx = -1 fe100Idx = -1 dateInfoIdx = -1 sourceIdx = -1 ) fields := []struct { idx *int substr string }{ {&locationIdx, "location"}, {&validFromIdx, "valid_from"}, {&validToIdx, "valid_to"}, {&lnwlIdx, "lnwl"}, {&mwlIdx, "mwl"}, {&hnwlIdx, "hnwl"}, {&fe30Idx, "fe30"}, {&fe100Idx, "fe100"}, {&dateInfoIdx, "date_info"}, {&sourceIdx, "source"}, } nextHeader: for i, h := range headers { h = strings.ToLower(h) for j := range fields { if strings.Contains(h, fields[j].substr) { if *fields[j].idx != -1 { return nil, fmt.Errorf( "CSV has more than one column with name containing '%s'", fields[j].substr) } *fields[j].idx = i continue nextHeader } } } var missing []string for i := range fields { if *fields[i].idx == -1 { missing = append(missing, fields[i].substr) } } if len(missing) > 0 { return nil, fmt.Errorf( "CSV is missing columns: %s", strings.Join(missing, ", ")) } var precision float64 if wp.Precision != nil { if precision = *wp.Precision; precision < 0 { precision = -precision } } else { precision = defaultPointToLinePrecision } feedback.Info( "Matching points to lines with a precision of %.4fm.", precision) parseDate := misc.TimeGuesser([]string{"02.01.2006"}).Guess insertStmt, err := tx.PrepareContext(ctx, insertWaterwayProfileSQL) if err != nil { return nil, err } defer insertStmt.Close() trackStmt, err := tx.PrepareContext(ctx, trackImportSQL) if err != nil { return nil, err } defer trackStmt.Close() var ids []int64 lines: for line := 1; ; line++ { row, err := r.Read() switch { case err == io.EOF || len(row) == 0: break lines case err != nil: return nil, fmt.Errorf("CSV parsing failed: %v", err) } location, err := models.IsrsFromString(row[locationIdx]) if err != nil { return nil, fmt.Errorf( "Invalid ISRS location code in line %d: %v", line, err) } validFromTime, err := parseDate(row[validFromIdx]) if err != nil { return nil, fmt.Errorf( "Invalid 'valid_from' value in line %d: %v", line, err) } validToTime, err := parseDate(row[validToIdx]) if err != nil { return nil, fmt.Errorf( "Invalid 'valid_to' value in line %d: %v", line, err) } validFrom := pgtype.Timestamptz{ Time: validFromTime, Status: pgtype.Present, } validTo := pgtype.Timestamptz{ Time: validToTime, Status: pgtype.Present, } validity := pgtype.Tstzrange{ Lower: validFrom, Upper: validTo, LowerType: pgtype.Inclusive, UpperType: pgtype.Exclusive, Status: pgtype.Present, } lnwl, err := parseFloat64(row[lnwlIdx]) if err != nil { return nil, fmt.Errorf( "Invalid 'lnwl' value in line %d: %v", line, err) } mwl, err := parseFloat64(row[mwlIdx]) if err != nil { return nil, fmt.Errorf( "Invalid 'mwl' value in line %d: %v", line, err) } hnwl, err := parseFloat64(row[hnwlIdx]) if err != nil { return nil, fmt.Errorf( "Invalid 'hnwl' value in line %d: %v", line, err) } fe30, err := parseFloat64(row[fe30Idx]) if err != nil { return nil, fmt.Errorf( "Invalid 'fe30' value in line %d: %v", line, err) } fe100, err := parseFloat64(row[fe100Idx]) if err != nil { return nil, fmt.Errorf( "Invalid 'fe100' value in line %d: %v", line, err) } var dateInfo time.Time if di := row[dateInfoIdx]; di == "" { dateInfo = start } else if dateInfo, err = parseDate(di); err != nil { return nil, fmt.Errorf( "Invalid 'date_info' value in line %d: %v", line, err) } source := row[sourceIdx] var id int64 var noGeom bool if err := insertStmt.QueryRowContext( ctx, location.CountryCode, location.LoCode, location.FairwaySection, location.Orc, location.Hectometre, &validity, lnwl, mwl, hnwl, fe30, fe100, dateInfo, source, precision, ).Scan(&id, &noGeom); err != nil { return nil, err } if _, err := trackStmt.ExecContext( ctx, importID, "waterway.waterway_profiles", id); err != nil { return nil, err } if noGeom { feedback.Warn( "No profile geometry found for %s in line %d.", location, line) } ids = append(ids, id) } if len(ids) == 0 { return nil, UnchangedError("No new entries in waterway profiles.") } feedback.Info("%d new entries in waterway profiles.", len(ids)) summary := struct { IDs []int64 `json:"ids"` }{ IDs: ids, } return &summary, nil }