Mercurial > gemma
changeset 4904:53d929f658f3 fairway-marks-import
Separate code common to all types of fairway mark imports
Currently the only implemented type of fairway marks is BCNLAT.
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Thu, 06 Feb 2020 19:09:14 +0100 |
parents | 69cc3d3047ab |
children | 8cb201b551b3 |
files | pkg/imports/fm.go pkg/imports/fm_bcnlat.go |
diffstat | 2 files changed, 283 insertions(+), 213 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/imports/fm.go Wed Feb 05 17:00:29 2020 +0100 +++ b/pkg/imports/fm.go Thu Feb 06 19:09:14 2020 +0100 @@ -14,22 +14,16 @@ package imports import ( - "context" - "database/sql" "encoding/json" - "errors" "fmt" "io" - "strconv" - "time" - "gemma.intevation.de/gemma/pkg/pgxutils" "gemma.intevation.de/gemma/pkg/wfs" ) -// FairwayMarks is an import job to import -// fairway marks in form of point geometries -// and attribute data from a WFS service. +// FairwayMarks is a struct +// to be used as the basis for imports of +// specific types for fairway marks. type FairwayMarks struct { // URL the GetCapabilities URL of the WFS service. URL string `json:"url"` @@ -44,138 +38,56 @@ Password string `json:"password,omitempty"` } -// Description gives a short info about relevant facts of this import. -func (fm *FairwayMarks) Description() (string, error) { - return fm.URL + "|" + fm.FeatureType, nil -} - -// FMJobKind is the import queue type identifier. -const FMJobKind JobKind = "fm" - -type fmJobCreator struct{} - -func init() { - RegisterJobCreator(FMJobKind, fmJobCreator{}) -} - -func (fmJobCreator) Description() string { return "fairway marks" } - -func (fmJobCreator) AutoAccept() bool { return true } - -func (fmJobCreator) Create() Job { return new(FairwayMarks) } - -func (fmJobCreator) Depends() [2][]string { - return [2][]string{ - {"fairway_marks"}, - {}, - } -} - -// StageDone is a NOP for fairway marks imports. -func (fmJobCreator) StageDone(context.Context, *sql.Tx, int64) error { - return nil +// Properties common to all types of fairway marks +type fairwayMarksProperties struct { + Datsta *string `json:"hydro_datsta"` + Datend *string `json:"hydro_datend"` + Persta *string `json:"hydro_persta"` + Perend *string `json:"hydro_perend"` + Objnam *string `json:"hydro_objnam"` + Nobjnm *string `json:"hydro_nobjnm"` + Inform *string `json:"hydro_inform"` + Ninfom *string `json:"hydro_ninfom"` + Scamin *int `json:"hydro_scamin"` + Picrep *string `json:"hydro_picrep"` + Txtdsc *string `json:"hydro_txtdsc"` + Sordat *string `json:"hydro_sordat"` + Sorind *string `json:"hydro_sorind"` } -// CleanUp for fairway marks imports is a NOP. -func (*FairwayMarks) CleanUp() error { return nil } - -type fairwayMarksProperties struct { - Datsta *string `json:"hydro_datsta"` - Datend *string `json:"hydro_datend"` - Persta *string `json:"hydro_persta"` - Perend *string `json:"hydro_perend"` - Objnam *string `json:"hydro_objnam"` - Nobjnm *string `json:"hydro_nobjnm"` - Inform *string `json:"hydro_inform"` - Ninfom *string `json:"hydro_ninfom"` - Scamin *int `json:"hydro_scamin"` - Picrep *string `json:"hydro_picrep"` - Txtdsc *string `json:"hydro_txtdsc"` - Sordat *string `json:"hydro_sordat"` - Sorind *string `json:"hydro_sorind"` - Colour *string `json:"hydro_colour"` - Colpat *string `json:"hydro_colpat"` - Condtn *int `json:"hydro_condtn"` - Bcnshp *int `json:"hydro_bcnshp"` - HydroCatlam *int64 `json:"hydro_catlam,omitempty"` - IENCCatlam *int64 `json:"ienc_catlam,omitempty"` - Dirimp *string `json:"ienc_dirimp,omitempty"` -} - -const ( - insertFairwayMarksSQL = ` -with a as ( - select users.current_user_area_utm() AS a -) -INSERT INTO waterway.fairway_marks ( - geom, - datsta, - datend, - persta, - perend, - objnam, - nobjnm, - inform, - ninfom, - scamin, - picrep, - txtdsc, - sordat, - sorind, - colour, - colpat, - condtn, - bcnshp, - catlam, - dirimp -) -SELECT newfm, $3, $4, $5, $6, $7, $8, $9, - $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21 - FROM ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326) AS newfm (newfm) - WHERE pg_has_role('sys_admin', 'MEMBER') - OR ST_Intersects((select a from a), - ST_Transform(newfm, (select ST_SRID(a) from a))) -ON CONFLICT ( - CAST((0, geom, - datsta, datend, persta, perend, objnam, nobjnm, inform, ninfom, - scamin, picrep, txtdsc, sordat, sorind, colour, colpat, condtn, - bcnshp, catlam, dirimp) AS waterway.fairway_marks) - ) - DO NOTHING -RETURNING id -` -) - -// Do executes the actual fairway marks import. -func (fm *FairwayMarks) Do( - ctx context.Context, - importID int64, - conn *sql.Conn, +// Common operation of FM imports to get features from WFS service +func getFMFeatures( feedback Feedback, -) (interface{}, error) { - - start := time.Now() - - feedback.Info("Import fairway marks") + fm FairwayMarks, + // Pointer to a struct representing featuretype's properties + props interface{}, +) ( + // Elements can be converted to []interface{}{p, fp} with + // p being a pointSlice and fp of the type of argument props. + fms [][]interface{}, + epsg int, + err error, +) { feedback.Info("Loading capabilities from %s", fm.URL) caps, err := wfs.GetCapabilities(fm.URL) if err != nil { feedback.Error("Loading capabilities failed: %v", err) - return nil, err + return } ft := caps.FindFeatureType(fm.FeatureType) if ft == nil { - return nil, fmt.Errorf("unknown feature type '%s'", fm.FeatureType) + err = fmt.Errorf("unknown feature type '%s'", fm.FeatureType) + return } feedback.Info("Found feature type '%s", fm.FeatureType) - epsg, err := wfs.CRSToEPSG(ft.DefaultCRS) + epsg, err = wfs.CRSToEPSG(ft.DefaultCRS) if err != nil { feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS) - return nil, err + return } if fm.SortBy != "" { @@ -185,30 +97,16 @@ dl, err := wfs.GetFeatures(caps, fm.FeatureType, fm.SortBy) if err != nil { feedback.Error("Cannot create GetFeature URLs. %v", err) - return nil, err - } - - tx, err := conn.BeginTx(ctx, nil) - if err != nil { - return nil, err + return } - defer tx.Rollback() - - insertStmt, err := tx.PrepareContext(ctx, insertFairwayMarksSQL) - if err != nil { - return nil, err - } - defer insertStmt.Close() var ( unsupported = stringCounter{} missingProperties int badProperties int - outsideOrDup int - features int ) - if err := dl.Download(fm.User, fm.Password, func(url string, r io.Reader) error { + err = dl.Download(fm.User, fm.Password, func(url string, r io.Reader) error { feedback.Info("Get features from: '%s'", url) rfc, err := wfs.ParseRawFeatureCollection(r) if err != nil { @@ -229,85 +127,34 @@ feedback.Info("Using EPSG: %d", epsg) - savepoint := Savepoint(ctx, tx, "feature") - for _, feature := range rfc.Features { if feature.Properties == nil || feature.Geometry.Coordinates == nil { missingProperties++ continue } - var props fairwayMarksProperties - - if err := json.Unmarshal(*feature.Properties, &props); err != nil { + if err := json.Unmarshal(*feature.Properties, props); err != nil { badProperties++ continue } - var catlam sql.NullInt64 - if props.HydroCatlam != nil { - catlam = sql.NullInt64{Int64: *props.HydroCatlam, Valid: true} - } else if props.IENCCatlam != nil { - catlam = sql.NullInt64{Int64: *props.IENCCatlam, Valid: true} - } - - var dirimp sql.NullInt64 - if props.Dirimp != nil { - if value, err := strconv.ParseInt(*props.Dirimp, 10, 64); err == nil { - dirimp = sql.NullInt64{Int64: value, Valid: true} - } - } - switch feature.Geometry.Type { case "Point": var p pointSlice if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil { return err } - var fmid int64 - err := savepoint(func() error { - err := insertStmt.QueryRowContext( - ctx, - p.asWKB(), - epsg, - props.Datsta, - props.Datend, - props.Persta, - props.Perend, - props.Objnam, - props.Nobjnm, - props.Inform, - props.Ninfom, - props.Scamin, - props.Picrep, - props.Txtdsc, - props.Sordat, - props.Sorind, - props.Colour, - props.Colpat, - props.Condtn, - props.Bcnshp, - catlam, - dirimp, - ).Scan(&fmid) - return err - }) - switch { - case err == sql.ErrNoRows: - outsideOrDup++ - // ignore -> filtered by responsibility_areas - case err != nil: - feedback.Error(pgxutils.ReadableError{Err: err}.Error()) - default: - features++ - } + + f := []interface{}{p, props} + fms = append(fms, f) default: unsupported[feature.Geometry.Type]++ } } return nil - }); err != nil { - return nil, err + }) + if err != nil { + return } if badProperties > 0 { @@ -322,21 +169,7 @@ feedback.Warn("Unsupported types found: %s", unsupported) } - if outsideOrDup > 0 { - feedback.Info( - "Features outside responsibility area and duplicates: %d", - outsideOrDup) - } + feedback.Info("Found %d usable features in data source", len(fms)) - if features == 0 { - err := errors.New("no valid new features found") - return nil, err - } - - if err = tx.Commit(); err == nil { - feedback.Info("Storing %d features took %s", - features, time.Since(start)) - } - - return nil, err + return }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pkg/imports/fm_bcnlat.go Thu Feb 06 19:09:14 2020 +0100 @@ -0,0 +1,237 @@ +// This is Free Software under GNU Affero General Public License v >= 3.0 +// without warranty, see README.md and license for details. +// +// SPDX-License-Identifier: AGPL-3.0-or-later +// License-Filename: LICENSES/AGPL-3.0.txt +// +// Copyright (C) 2020 by via donau +// – Österreichische Wasserstraßen-Gesellschaft mbH +// Software engineering by Intevation GmbH +// +// Author(s): +// * Tom Gottfried <tom.gottfried@intevation.de> + +package imports + +import ( + "context" + "database/sql" + "errors" + "strconv" + "time" + + "gemma.intevation.de/gemma/pkg/pgxutils" +) + +// Bcnlat is an import job to import +// fairway marks of type BCNLAT in form of point geometries +// and attribute data from a WFS service. +type Bcnlat struct { + FairwayMarks +} + +// Description gives a short info about relevant facts of this import. +func (bcnlat *Bcnlat) Description() (string, error) { + return bcnlat.URL + "|" + bcnlat.FeatureType, nil +} + +// FMJobKind is the import queue type identifier. +const FMJobKind JobKind = "fm" + +type bcnlatJobCreator struct{} + +func init() { + RegisterJobCreator(FMJobKind, bcnlatJobCreator{}) +} + +func (bcnlatJobCreator) Description() string { return "fairway marks bcnlat" } + +func (bcnlatJobCreator) AutoAccept() bool { return true } + +func (bcnlatJobCreator) Create() Job { return new(Bcnlat) } + +func (bcnlatJobCreator) Depends() [2][]string { + return [2][]string{ + {"fairway_marks"}, + {}, + } +} + +// StageDone is a NOP for fairway marks imports. +func (bcnlatJobCreator) StageDone(context.Context, *sql.Tx, int64) error { + return nil +} + +// CleanUp for fairway marks imports is a NOP. +func (*Bcnlat) CleanUp() error { return nil } + +type bcnlatProperties struct { + fairwayMarksProperties + Colour *string `json:"hydro_colour"` + Colpat *string `json:"hydro_colpat"` + Condtn *int `json:"hydro_condtn"` + Bcnshp *int `json:"hydro_bcnshp"` + HydroCatlam *int64 `json:"hydro_catlam,omitempty"` + IENCCatlam *int64 `json:"ienc_catlam,omitempty"` + Dirimp *string `json:"ienc_dirimp,omitempty"` +} + +const ( + insertBCNLATSQL = ` +with a as ( + select users.current_user_area_utm() AS a +) +INSERT INTO waterway.fairway_marks ( + geom, + datsta, + datend, + persta, + perend, + objnam, + nobjnm, + inform, + ninfom, + scamin, + picrep, + txtdsc, + sordat, + sorind, + colour, + colpat, + condtn, + bcnshp, + catlam, + dirimp +) +SELECT newfm, $3, $4, $5, $6, $7, $8, $9, + $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20, $21 + FROM ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326) AS newfm (newfm) + WHERE pg_has_role('sys_admin', 'MEMBER') + OR ST_Intersects((select a from a), + ST_Transform(newfm, (select ST_SRID(a) from a))) +ON CONFLICT ( + CAST((0, geom, + datsta, datend, persta, perend, objnam, nobjnm, inform, ninfom, + scamin, picrep, txtdsc, sordat, sorind, colour, colpat, condtn, + bcnshp, catlam, dirimp) AS waterway.fairway_marks) + ) + DO NOTHING +RETURNING id +` +) + +// Do executes the actual import. +func (fm *Bcnlat) Do( + ctx context.Context, + importID int64, + conn *sql.Conn, + feedback Feedback, +) (interface{}, error) { + + start := time.Now() + + feedback.Info("Import fairway marks of type BCNLAT/bcnlat") + + var props bcnlatProperties + fms, epsg, err := getFMFeatures( + feedback, + fm.FairwayMarks, + &props, + ) + if err != nil { + return nil, err + } + + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer tx.Rollback() + + insertStmt, err := tx.PrepareContext(ctx, insertBCNLATSQL) + if err != nil { + return nil, err + } + defer insertStmt.Close() + + savepoint := Savepoint(ctx, tx, "feature") + + var outsideOrDup int + + var features int + for _, fm := range fms { + + p := fm[0].(pointSlice) + fp := fm[1].(*bcnlatProperties) + + var catlam sql.NullInt64 + if fp.HydroCatlam != nil { + catlam = sql.NullInt64{Int64: *fp.HydroCatlam, Valid: true} + } else if fp.IENCCatlam != nil { + catlam = sql.NullInt64{Int64: *fp.IENCCatlam, Valid: true} + } + + var dirimp sql.NullInt64 + if fp.Dirimp != nil { + if value, err := strconv.ParseInt(*fp.Dirimp, 10, 64); err == nil { + dirimp = sql.NullInt64{Int64: value, Valid: true} + } + } + + var fmid int64 + err := savepoint(func() error { + err := insertStmt.QueryRowContext( + ctx, + p.asWKB(), + epsg, + fp.Datsta, + fp.Datend, + fp.Persta, + fp.Perend, + fp.Objnam, + fp.Nobjnm, + fp.Inform, + fp.Ninfom, + fp.Scamin, + fp.Picrep, + fp.Txtdsc, + fp.Sordat, + fp.Sorind, + fp.Colour, + fp.Colpat, + fp.Condtn, + fp.Bcnshp, + catlam, + dirimp, + ).Scan(&fmid) + return err + }) + switch { + case err == sql.ErrNoRows: + outsideOrDup++ + // ignore -> filtered by responsibility_areas + case err != nil: + feedback.Error(pgxutils.ReadableError{Err: err}.Error()) + default: + features++ + } + } + + if outsideOrDup > 0 { + feedback.Info( + "Features outside responsibility area and duplicates: %d", + outsideOrDup) + } + + if features == 0 { + err := errors.New("no valid new features found") + return nil, err + } + + if err = tx.Commit(); err == nil { + feedback.Info("Storing %d features took %s", + features, time.Since(start)) + } + + return nil, err +}