Mercurial > gemma
changeset 4946:b0dbc0f2c748 fairway-marks-import
Simplified importing of fairway marks.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 19 Feb 2020 15:08:35 +0100 |
parents | 97533bbfaa2d |
children | 407103c299a0 |
files | pkg/imports/fm.go pkg/imports/fm_bcnlat.go pkg/imports/fm_boycar.go pkg/imports/fm_boylat.go pkg/imports/fm_boysaw.go pkg/imports/fm_boyspp.go pkg/imports/fm_daymar.go pkg/imports/fm_lights.go pkg/imports/fm_notmrk.go pkg/imports/fm_rtpbcn.go pkg/imports/fm_topmar.go pkg/imports/modelconvert.go pkg/imports/pointwfs.go pkg/models/imports.go |
diffstat | 14 files changed, 1155 insertions(+), 2066 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/imports/fm.go Tue Feb 18 13:00:25 2020 +0100 +++ b/pkg/imports/fm.go Wed Feb 19 15:08:35 2020 +0100 @@ -1,6 +1,3 @@ -// This is Free Software under GNU Affero General Public License v >= 3.0 -// without warranty, see README.md and license for details. -// // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // @@ -10,58 +7,321 @@ // // Author(s): // * Tom Gottfried <tom.gottfried@intevation.de> +// * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( - "context" "database/sql" - "encoding/json" "fmt" - "io" "strings" - "time" + + "gemma.intevation.de/gemma/pkg/pgxutils" +) + +type ( + // Properties common to all types of fairway marks + fairwayMarksProperties struct { + Datsta *string `json:"hydro_datsta"` + Datend *string `json:"hydro_datend"` + Persta *string `json:"hydro_persta"` + Perend *string `json:"hydro_perend"` + Objnam *string `json:"hydro_objnam"` + Nobjnm *string `json:"hydro_nobjnm"` + Inform *string `json:"hydro_inform"` + Ninfom *string `json:"hydro_ninfom"` + Scamin *int `json:"hydro_scamin"` + Picrep *string `json:"hydro_picrep"` + Txtdsc *string `json:"hydro_txtdsc"` + Sordat *string `json:"hydro_sordat"` + Sorind *string `json:"hydro_sorind"` + } + + bcnlatProperties struct { + fairwayMarksProperties + Colour *string `json:"hydro_colour"` + Colpat *string `json:"hydro_colpat"` + Condtn *int `json:"hydro_condtn"` + Bcnshp *int `json:"hydro_bcnshp"` + HydroCatlam *int64 `json:"hydro_catlam,omitempty"` + IENCCatlam *int64 `json:"ienc_catlam,omitempty"` + Dirimp *string `json:"ienc_dirimp,omitempty"` + } + + boylatProperties struct { + fairwayMarksProperties + Colour *string `json:"hydro_colour"` + Colpat *string `json:"hydro_colpat"` + Conrad *int `json:"hydro_conrad"` + HydroMarsys *int64 `json:"hydro_marsys,omitempty"` + IENCMarsys *int64 `json:"ienc_marsys,omitempty"` + Boyshp *int `json:"hydro_boyshp"` + HydroCatlam *int64 `json:"hydro_catlam,omitempty"` + IENCCatlam *int64 `json:"ienc_catlam,omitempty"` + } + + boycarProperties struct { + fairwayMarksProperties + Colour *string `json:"hydro_colour"` + Colpat *string `json:"hydro_colpat"` + Conrad *int `json:"hydro_conrad"` + Marsys *int `json:"hydro_marsys"` + Boyshp *int `json:"hydro_boyshp"` + Catcam *int `json:"hydro_catcam"` + } + + boysawProperties struct { + fairwayMarksProperties + Colour *string `json:"hydro_colour"` + Colpat *string `json:"hydro_colpat"` + Conrad *int `json:"hydro_conrad"` + Marsys *int64 `json:"hydro_marsys"` + Boyshp *int `json:"hydro_boyshp"` + } + + boysppProperties struct { + fairwayMarksProperties + Colour *string `json:"hydro_colour"` + Colpat *string `json:"hydro_colpat"` + Conrad *int `json:"hydro_conrad"` + Marsys *int64 `json:"hydro_marsys"` + Boyshp *int `json:"hydro_boyshp"` + Catspm *string `json:"hydro_catspm"` + } - "gemma.intevation.de/gemma/pkg/wfs" + daymarProperties struct { + fairwayMarksProperties + Colour *string `json:"hydro_colour"` + Colpat *string `json:"hydro_colpat"` + Condtn *int `json:"hydro_condtn"` + Dirimp *string `json:"ienc_dirimp,omitempty"` + Topshp *int `json:"hydro_topshp"` + Orient *float64 `json:"hydro_orient,omitempty"` + } + + lightsProperties struct { + fairwayMarksProperties + Colour *string `json:"hydro_colour"` + Condtn *int `json:"hydro_condtn"` + Orient *float64 `json:"hydro_orient"` + Catlit *string `json:"hydro_catlit"` + Exclit *int `json:"hydro_exclit"` + Litchr *int `json:"hydro_litchr"` + Litvis *string `json:"hydro_litvis"` + Mltylt *int `json:"hydro_mltylt"` + Sectr1 *float64 `json:"hydro_sectr1"` + Sectr2 *float64 `json:"hydro_sectr2"` + Siggrp *string `json:"hydro_siggrp"` + Sigper *float64 `json:"hydro_sigper"` + Sigseq *string `json:"hydro_sigseq"` + Status *string `json:"hydro_status"` + } + + notmrkProperties struct { + fairwayMarksProperties + Condtn *int `json:"hydro_condtn"` + Marsys *int `json:"hydro_bcnshp"` + Dirimp *string `json:"ienc_dirimp"` + Orient *float64 `json:"hydro_orient"` + Status *string `json:"hydro_status"` + Addmrk *string `json:"ienc_addmrk"` + Catnmk *int `json:"ienc_catnmk"` + Disipd *float64 `json:"ienc_disipd"` + Disipu *float64 `json:"ienc_disipu"` + Disbk1 *float64 `json:"ienc_disbk1"` + Disbk2 *float64 `json:"ienc_disbk2"` + Fnctnm *int `json:"ienc_fnctnm"` + Bnkwtw *int `json:"ienc_bnkwtw"` + } + + rtpbcnProperties struct { + fairwayMarksProperties + Condtn *int `json:"hydro_condtn"` + Siggrp *string `json:"hydro_siggrp"` + Catrtb *int `json:"hydro_catrtb"` + Radwal *string `json:"hydro_radwal"` + } + + topmarProperties struct { + fairwayMarksProperties + Colour *string `json:"hydro_colour"` + Colpat *string `json:"hydro_colpat"` + Condtn *int `json:"hydro_condtn"` + Topshp *int `json:"hydro_topshp"` + } +) + +const ( + BCNLATJobKind JobKind = "fm_bcnlat" + BOYLATJobKind JobKind = "fm_boylat" + BOYCARJobKind JobKind = "fm_boycar" + BOYSAWJobKind JobKind = "fm_boysaw" + BOYSPPJobKind JobKind = "fm_boyspp" + DAYMARJobKind JobKind = "fm_daymar" + LIGHTSJobKind JobKind = "fm_lights" + NOTMRKJobKind JobKind = "fm_notmrk" + RTPBCNJobKind JobKind = "fm_rtpbcn" + TOPMARJobKind JobKind = "fm_topmar" ) -// FairwayMarks is a struct -// to be used as the basis for imports of -// specific types for fairway marks. -type FairwayMarks struct { - // URL the GetCapabilities URL of the WFS service. - URL string `json:"url"` - // FeatureType selects the feature type of the WFS service. - FeatureType string `json:"feature-type"` - // SortBy works around misconfigured services to - // establish a sort order to get the features. - SortBy string `json:"sort-by"` - // User is an optional username for Basic Auth. - User string `json:"user,omitempty"` - // Password is an optional password for Basic Auth. - Password string `json:"password,omitempty"` -} +func init() { + RegisterJobCreator(BCNLATJobKind, + &PointWFSJobCreator{ + description: "fairway marks bcnlat", + depends: [2][]string{{"fairway_marks_bcnlat"}, {}}, + newConsumer: newSQLConsumer( + prepareStmnts( + createInsertFMSQL("fm_bcnlat", + "colour", "colpat", "condtn", "bcnshp", "catlam"), + insertBcnlatDirimpSQL, + ), + consumeBCNLAT, + func() interface{} { return new(bcnlatProperties) }, + ), + }) + + RegisterJobCreator(BOYLATJobKind, + &PointWFSJobCreator{ + description: "fairway marks boylat", + depends: [2][]string{{"fairway_marks_boylat"}, {}}, + newConsumer: newSQLConsumer( + prepareStmnts( + createInsertFMSQL("boylat", + "colour", "colpat", "conrad", + "marsys", "boyshp", "catlam"), + ), + consumeBOYLAT, + func() interface{} { return new(boylatProperties) }, + ), + }) + + RegisterJobCreator(BOYCARJobKind, + &PointWFSJobCreator{ + description: "fairway marks boycar", + depends: [2][]string{{"fairway_marks_boycar"}, {}}, + newConsumer: newSQLConsumer( + prepareStmnts( + createInsertFMSQL("boycar", + "colour", "colpat", "conrad", + "marsys", "boyshp", "catcam"), + ), + consumeBOYCAR, + func() interface{} { return new(boycarProperties) }, + ), + }) + + RegisterJobCreator(BOYSAWJobKind, + &PointWFSJobCreator{ + description: "fairway marks boysaw", + depends: [2][]string{{"fairway_marks_boysaw"}, {}}, + newConsumer: newSQLConsumer( + prepareStmnts( + createInsertFMSQL("boysaw", + "colour", "colpat", "conrad", "marsys", "boyshp"), + ), + consumeBOYSAW, + func() interface{} { return new(boysawProperties) }, + ), + }) + + RegisterJobCreator(BOYSPPJobKind, + &PointWFSJobCreator{ + description: "fairway marks boyspp", + depends: [2][]string{{"fairway_marks_boyspp"}, {}}, + newConsumer: newSQLConsumer( + prepareStmnts( + createInsertFMSQL("boyspp", + "colour", "colpat", "conrad", + "marsys", "boyshp", "catspm"), + ), + consumeBOYSPP, + func() interface{} { return new(boysppProperties) }, + ), + }) -// Properties common to all types of fairway marks -type fairwayMarksProperties struct { - Datsta *string `json:"hydro_datsta"` - Datend *string `json:"hydro_datend"` - Persta *string `json:"hydro_persta"` - Perend *string `json:"hydro_perend"` - Objnam *string `json:"hydro_objnam"` - Nobjnm *string `json:"hydro_nobjnm"` - Inform *string `json:"hydro_inform"` - Ninfom *string `json:"hydro_ninfom"` - Scamin *int `json:"hydro_scamin"` - Picrep *string `json:"hydro_picrep"` - Txtdsc *string `json:"hydro_txtdsc"` - Sordat *string `json:"hydro_sordat"` - Sorind *string `json:"hydro_sorind"` + RegisterJobCreator(DAYMARJobKind, + &PointWFSJobCreator{ + description: "fairway marks daymar", + depends: [2][]string{{"fairway_marks_daymar"}, {}}, + newConsumer: newSQLConsumer( + prepareStmnts( + createInsertFMSQL("daymar", + "colour", "colpat", "condtn", "topshp", "orient"), + insertDaymarDirimpSQL, + ), + consumeDAYMAR, + func() interface{} { return new(daymarProperties) }, + ), + }) + + RegisterJobCreator(LIGHTSJobKind, + &PointWFSJobCreator{ + description: "fairway marks lights", + depends: [2][]string{{"fairway_marks_lights"}, {}}, + newConsumer: newSQLConsumer( + prepareStmnts( + createInsertFMSQL("lights", + "colour", "condtn", "orient", + "catlit", "exclit", "litchr", + "litvis", "mltylt", "sectr1", + "sectr2", "siggrp", "sigper", + "sigseq", "status"), + ), + consumeLIGHTS, + func() interface{} { return new(lightsProperties) }, + ), + }) + + RegisterJobCreator(NOTMRKJobKind, + &PointWFSJobCreator{ + description: "fairway marks notmrk", + depends: [2][]string{{"fairway_marks_lights"}, {}}, + newConsumer: newSQLConsumer( + prepareStmnts( + createInsertFMSQL("notmrk", + "condtn", "marsys", "orient", + "status", "addmrk", "catnmk", + "disipd", "disipu", "disbk1", + "disbk2", "fnctnm", "bnkwtw"), + insertNotmrkDirimpSQL, + ), + consumeNOTMRK, + func() interface{} { return new(notmrkProperties) }, + ), + }) + + RegisterJobCreator(RTPBCNJobKind, + &PointWFSJobCreator{ + description: "fairway marks rtpbcn", + depends: [2][]string{{"fairway_marks_rtpbcn"}, {}}, + newConsumer: newSQLConsumer( + prepareStmnts( + createInsertFMSQL("rtpbcn", + "condtn", "siggrp", "catrtb", "radwal"), + ), + consumeRTPBCN, + func() interface{} { return new(rtpbcnProperties) }, + ), + }) + + RegisterJobCreator(TOPMARJobKind, + &PointWFSJobCreator{ + description: "fairway marks topmar", + depends: [2][]string{{"fairway_marks_topmar"}, {}}, + newConsumer: newSQLConsumer( + prepareStmnts( + createInsertFMSQL("topmar", + "colour", "colpat", "condtn", "topshp"), + ), + consumeTOPMAR, + func() interface{} { return new(topmarProperties) }, + ), + }) } const ( // Format string to be completed with type and additional attributes - insertFairwayMarkSQL = ` + insertFMSQLtmpl = ` WITH a AS ( SELECT users.current_user_area_utm() AS a ) @@ -98,176 +358,563 @@ DO NOTHING RETURNING id ` + + insertBcnlatDirimpSQL = ` +INSERT INTO waterway.fairway_marks_bcnlat_dirimps (fm_bcnlat_id, dirimp) + VALUES ($1, $2) +` + + insertDaymarDirimpSQL = ` +INSERT INTO waterway.fairway_marks_daymar_dirimps (fm_daymar_id, dirimp) + VALUES ($1, $2) +` + + insertNotmrkDirimpSQL = ` +INSERT INTO waterway.fairway_marks_notmrk_dirimps (fm_notmrk_id, dirimp) + VALUES ($1, $2) +` ) // Create INSERT statement for specific fairway marks type -func getFMInsertSQL(fmType string, attributes ...string) string { +func createInsertFMSQL(fmType string, attributes ...string) string { attNums := "$16" for i := 1; i < len(attributes); i++ { attNums += fmt.Sprintf(",$%d", 16+i) } return fmt.Sprintf( - insertFairwayMarkSQL, + insertFMSQLtmpl, fmType, strings.Join(attributes, ","), attNums, ) } -// Common operation of FM imports to get features from WFS service -func getFMFeatures( - ctx context.Context, - conn *sql.Conn, - feedback Feedback, - fm FairwayMarks, - // Constructor returning pointer to struct - // representing featuretype's properties - newProps func() interface{}, - // Construct pointer to featuretype from given pointSlice and properties - newFeat func(pointSlice, interface{}) interface{}, - // Store features in type specific database tables - storeFMs func( - tx *sql.Tx, - epsg int, - // Slice of features to be converted to featuretypes type - fms []interface{}, - ) (outsideOrDup int, features int, err error), -) (err error) { +func coalesceInt64(ints ...*int64) sql.NullInt64 { + for _, i := range ints { + if i != nil { + return sql.NullInt64{Int64: *i, Valid: true} + } + } + return sql.NullInt64{} +} - start := time.Now() - - feedback.Info("Import fairway marks") +func consumeBCNLAT( + spc *SQLPointConsumer, + points pointSlice, + properties interface{}, + epsg int, +) error { + props := properties.(*bcnlatProperties) - feedback.Info("Loading capabilities from %s", fm.URL) - caps, err := wfs.GetCapabilities(fm.URL) - if err != nil { - feedback.Error("Loading capabilities failed: %v", err) - return - } - - ft := caps.FindFeatureType(fm.FeatureType) - if ft == nil { - err = fmt.Errorf("unknown feature type '%s'", fm.FeatureType) - return - } + catlam := coalesceInt64(props.HydroCatlam, props.IENCCatlam) - feedback.Info("Found feature type '%s", fm.FeatureType) - - epsg, err := wfs.CRSToEPSG(ft.DefaultCRS) - if err != nil { - feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS) - return - } - - if fm.SortBy != "" { - feedback.Info("Features will be sorted by '%s'", fm.SortBy) - } - - dl, err := wfs.GetFeatures(caps, fm.FeatureType, fm.SortBy) - if err != nil { - feedback.Error("Cannot create GetFeature URLs. %v", err) - return - } - - var ( - fms []interface{} - unsupported = stringCounter{} - missingProperties int - badProperties int - ) - - err = dl.Download(fm.User, fm.Password, func(url string, r io.Reader) error { - feedback.Info("Get features from: '%s'", url) - rfc, err := wfs.ParseRawFeatureCollection(r) - if err != nil { - return fmt.Errorf("parsing GetFeature document failed: %v", err) - } - if rfc.CRS != nil { - crsName := rfc.CRS.Properties.Name - if epsg, err = wfs.CRSToEPSG(crsName); err != nil { - feedback.Error("Unsupported CRS: %d", crsName) - return err + var fmid int64 + err := spc.savepoint(func() error { + return spc.stmts[0].QueryRowContext( + spc.ctx, + points.asWKB(), + epsg, + props.Datsta, + props.Datend, + props.Persta, + props.Perend, + props.Objnam, + props.Nobjnm, + props.Inform, + props.Ninfom, + props.Scamin, + props.Picrep, + props.Txtdsc, + props.Sordat, + props.Sorind, + props.Colour, + props.Colpat, + props.Condtn, + props.Bcnshp, + catlam, + ).Scan(&fmid) + }) + switch { + case err == sql.ErrNoRows: + return ErrFeatureDuplicated + // ignore -> filtered by responsibility area or a duplicate + // TODO: handle eventual changes to dirimp + case err != nil: + spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error()) + return ErrFeatureIgnored + default: + if props.Dirimp != nil && *props.Dirimp != "" { + dirimps := strings.Split(*props.Dirimp, ",") + for _, dirimp := range dirimps { + if err := spc.savepoint(func() error { + _, err := spc.stmts[1].ExecContext( + spc.ctx, fmid, dirimp) + return err + }); err != nil { + spc.feedback.Warn( + pgxutils.ReadableError{Err: err}.Error()) + spc.feedback.Info( + "Tried to import '%s' as dirimp value", + dirimp) + } } } + } + return nil +} - // No features -> ignore. - if rfc.Features == nil { - return nil - } +func consumeBOYLAT( + spc *SQLPointConsumer, + points pointSlice, + properties interface{}, + epsg int, +) error { + props := properties.(*boylatProperties) + + marsys := coalesceInt64(props.HydroMarsys, props.IENCMarsys) + catlam := coalesceInt64(props.HydroCatlam, props.IENCCatlam) - feedback.Info("Using EPSG: %d", epsg) + var fmid int64 + err := spc.savepoint(func() error { + return spc.stmts[0].QueryRowContext( + spc.ctx, + points.asWKB(), + epsg, + props.Datsta, + props.Datend, + props.Persta, + props.Perend, + props.Objnam, + props.Nobjnm, + props.Inform, + props.Ninfom, + props.Scamin, + props.Picrep, + props.Txtdsc, + props.Sordat, + props.Sorind, + props.Colour, + props.Colpat, + props.Conrad, + marsys, + props.Boyshp, + catlam, + ).Scan(&fmid) + }) + switch { + case err == sql.ErrNoRows: + return ErrFeatureDuplicated + // ignore -> filtered by responsibility_areas + case err != nil: + spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error()) + return ErrFeatureIgnored + } + return nil +} - for _, feature := range rfc.Features { - if feature.Properties == nil || feature.Geometry.Coordinates == nil { - missingProperties++ - continue - } +func consumeBOYCAR( + spc *SQLPointConsumer, + points pointSlice, + properties interface{}, + epsg int, +) error { + props := properties.(*boycarProperties) + var fmid int64 + err := spc.savepoint(func() error { + return spc.stmts[0].QueryRowContext( + spc.ctx, + points.asWKB(), + epsg, + props.Datsta, + props.Datend, + props.Persta, + props.Perend, + props.Objnam, + props.Nobjnm, + props.Inform, + props.Ninfom, + props.Scamin, + props.Picrep, + props.Txtdsc, + props.Sordat, + props.Sorind, + props.Colour, + props.Colpat, + props.Conrad, + props.Marsys, + props.Boyshp, + props.Catcam, + ).Scan(&fmid) + }) + switch { + case err == sql.ErrNoRows: + return ErrFeatureDuplicated + // ignore -> filtered by responsibility_areas + case err != nil: + spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error()) + return ErrFeatureIgnored + } + return nil +} - props := newProps() - if err := json.Unmarshal(*feature.Properties, props); err != nil { - badProperties++ - continue - } +func consumeBOYSAW( + spc *SQLPointConsumer, + points pointSlice, + properties interface{}, + epsg int, +) error { + props := properties.(*boysawProperties) + var fmid int64 + err := spc.savepoint(func() error { + return spc.stmts[0].QueryRowContext( + spc.ctx, + points.asWKB(), + epsg, + props.Datsta, + props.Datend, + props.Persta, + props.Perend, + props.Objnam, + props.Nobjnm, + props.Inform, + props.Ninfom, + props.Scamin, + props.Picrep, + props.Txtdsc, + props.Sordat, + props.Sorind, + props.Colour, + props.Colpat, + props.Conrad, + props.Marsys, + props.Boyshp, + ).Scan(&fmid) + }) + switch { + case err == sql.ErrNoRows: + return ErrFeatureDuplicated + // ignore -> filtered by responsibility_areas + case err != nil: + spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error()) + return ErrFeatureIgnored + } + return nil +} - switch feature.Geometry.Type { - case "Point": - var p pointSlice - if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil { +func consumeBOYSPP( + spc *SQLPointConsumer, + points pointSlice, + properties interface{}, + epsg int, +) error { + props := properties.(*boysppProperties) + var fmid int64 + err := spc.savepoint(func() error { + return spc.stmts[0].QueryRowContext( + spc.ctx, + points.asWKB(), + epsg, + props.Datsta, + props.Datend, + props.Persta, + props.Perend, + props.Objnam, + props.Nobjnm, + props.Inform, + props.Ninfom, + props.Scamin, + props.Picrep, + props.Txtdsc, + props.Sordat, + props.Sorind, + props.Colour, + props.Colpat, + props.Conrad, + props.Marsys, + props.Boyshp, + props.Catspm, + ).Scan(&fmid) + }) + switch { + case err == sql.ErrNoRows: + return ErrFeatureDuplicated + // ignore -> filtered by responsibility_areas + case err != nil: + spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error()) + return ErrFeatureIgnored + } + return nil +} + +func consumeDAYMAR( + spc *SQLPointConsumer, + points pointSlice, + properties interface{}, + epsg int, +) error { + props := properties.(*daymarProperties) + + var fmid int64 + err := spc.savepoint(func() error { + return spc.stmts[0].QueryRowContext( + spc.ctx, + points.asWKB(), + epsg, + props.Datsta, + props.Datend, + props.Persta, + props.Perend, + props.Objnam, + props.Nobjnm, + props.Inform, + props.Ninfom, + props.Scamin, + props.Picrep, + props.Txtdsc, + props.Sordat, + props.Sorind, + props.Colour, + props.Colpat, + props.Condtn, + props.Topshp, + props.Orient, + ).Scan(&fmid) + }) + switch { + case err == sql.ErrNoRows: + return ErrFeatureDuplicated + // ignore -> filtered by responsibility area or a duplicate + // TODO: handle eventual changes to dirimp + case err != nil: + spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error()) + return ErrFeatureIgnored + default: + if props.Dirimp != nil && *props.Dirimp != "" { + dirimps := strings.Split(*props.Dirimp, ",") + for _, dirimp := range dirimps { + if err := spc.savepoint(func() error { + _, err := spc.stmts[1].ExecContext( + spc.ctx, fmid, dirimp) return err + }); err != nil { + spc.feedback.Warn( + pgxutils.ReadableError{Err: err}.Error()) + spc.feedback.Info( + "Tried to import '%s' as dirimp value", + dirimp) } - - f := newFeat(p, props) - fms = append(fms, f) - default: - unsupported[feature.Geometry.Type]++ } } - return nil - }) - if err != nil { - return } + return nil +} - if badProperties > 0 { - feedback.Warn("Bad properties: %d", badProperties) - } +func consumeLIGHTS( + spc *SQLPointConsumer, + points pointSlice, + properties interface{}, + epsg int, +) error { + props := properties.(*lightsProperties) - if missingProperties > 0 { - feedback.Warn("Missing properties: %d", missingProperties) - } - - if len(unsupported) != 0 { - feedback.Warn("Unsupported types found: %s", unsupported) - } - - feedback.Info("Found %d usable features in data source", len(fms)) - - tx, err := conn.BeginTx(ctx, nil) - if err != nil { - return + var fmid int64 + err := spc.savepoint(func() error { + return spc.stmts[0].QueryRowContext( + spc.ctx, + points.asWKB(), + epsg, + props.Datsta, + props.Datend, + props.Persta, + props.Perend, + props.Objnam, + props.Nobjnm, + props.Inform, + props.Ninfom, + props.Scamin, + props.Picrep, + props.Txtdsc, + props.Sordat, + props.Sorind, + props.Colour, + props.Condtn, + props.Orient, + props.Catlit, + props.Exclit, + props.Litchr, + props.Litvis, + props.Mltylt, + props.Sectr1, + props.Sectr2, + props.Siggrp, + props.Sigper, + props.Sigseq, + props.Status, + ).Scan(&fmid) + }) + switch { + case err == sql.ErrNoRows: + return ErrFeatureDuplicated + // ignore -> filtered by responsibility area or a duplicate + case err != nil: + spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error()) + return ErrFeatureIgnored } - defer tx.Rollback() + return nil +} - outsideOrDup, features, err := storeFMs(tx, epsg, fms) - if err != nil { - return - } - - if outsideOrDup > 0 { - feedback.Info( - "Features outside responsibility area and duplicates: %d", - outsideOrDup) +func consumeNOTMRK( + spc *SQLPointConsumer, + points pointSlice, + properties interface{}, + epsg int, +) error { + props := properties.(*notmrkProperties) + var fmid int64 + err := spc.savepoint(func() error { + return spc.stmts[0].QueryRowContext( + spc.ctx, + points.asWKB(), + epsg, + props.Datsta, + props.Datend, + props.Persta, + props.Perend, + props.Objnam, + props.Nobjnm, + props.Inform, + props.Ninfom, + props.Scamin, + props.Picrep, + props.Txtdsc, + props.Sordat, + props.Sorind, + props.Condtn, + props.Marsys, + props.Orient, + props.Status, + props.Addmrk, + props.Catnmk, + props.Disipd, + props.Disipu, + props.Disbk1, + props.Disbk2, + props.Fnctnm, + props.Bnkwtw, + ).Scan(&fmid) + }) + switch { + case err == sql.ErrNoRows: + return ErrFeatureDuplicated + // ignore -> filtered by responsibility area or a duplicate + // TODO: handle eventual changes to dirimp + case err != nil: + spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error()) + return ErrFeatureIgnored + default: + if props.Dirimp != nil && *props.Dirimp != "" { + dirimps := strings.Split(*props.Dirimp, ",") + for _, dirimp := range dirimps { + if err := spc.savepoint(func() error { + _, err := spc.stmts[1].ExecContext( + spc.ctx, fmid, dirimp) + return err + }); err != nil { + spc.feedback.Warn( + pgxutils.ReadableError{Err: err}.Error()) + spc.feedback.Info( + "Tried to import '%s' as dirimp value", + dirimp) + } + } + } } - - if features == 0 { - return UnchangedError("no valid new features found") - } + return nil +} - if err = tx.Commit(); err == nil { - feedback.Info("Storing %d features took %s", - features, time.Since(start)) +func consumeRTPBCN( + spc *SQLPointConsumer, + points pointSlice, + properties interface{}, + epsg int, +) error { + props := properties.(*rtpbcnProperties) + var fmid int64 + err := spc.savepoint(func() error { + return spc.stmts[0].QueryRowContext( + spc.ctx, + points.asWKB(), + epsg, + props.Datsta, + props.Datend, + props.Persta, + props.Perend, + props.Objnam, + props.Nobjnm, + props.Inform, + props.Ninfom, + props.Scamin, + props.Picrep, + props.Txtdsc, + props.Sordat, + props.Sorind, + props.Condtn, + props.Siggrp, + props.Catrtb, + props.Radwal, + ).Scan(&fmid) + }) + switch { + case err == sql.ErrNoRows: + return ErrFeatureDuplicated + // ignore -> filtered by responsibility area or a duplicate + case err != nil: + spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error()) + return ErrFeatureIgnored } + return nil +} - return +func consumeTOPMAR( + spc *SQLPointConsumer, + points pointSlice, + properties interface{}, + epsg int, +) error { + props := properties.(*topmarProperties) + var fmid int64 + err := spc.savepoint(func() error { + return spc.stmts[0].QueryRowContext( + spc.ctx, + points.asWKB(), + epsg, + props.Datsta, + props.Datend, + props.Persta, + props.Perend, + props.Objnam, + props.Nobjnm, + props.Inform, + props.Ninfom, + props.Scamin, + props.Picrep, + props.Txtdsc, + props.Sordat, + props.Sorind, + props.Colour, + props.Colpat, + props.Condtn, + props.Topshp, + ).Scan(&fmid) + }) + switch { + case err == sql.ErrNoRows: + return ErrFeatureDuplicated + // ignore -> filtered by responsibility area or a duplicate + case err != nil: + spc.feedback.Error(pgxutils.ReadableError{Err: err}.Error()) + return ErrFeatureIgnored + } + return nil }
--- a/pkg/imports/fm_bcnlat.go Tue Feb 18 13:00:25 2020 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,203 +0,0 @@ -// This is Free Software under GNU Affero General Public License v >= 3.0 -// without warranty, see README.md and license for details. -// -// SPDX-License-Identifier: AGPL-3.0-or-later -// License-Filename: LICENSES/AGPL-3.0.txt -// -// Copyright (C) 2020 by via donau -// – Österreichische Wasserstraßen-Gesellschaft mbH -// Software engineering by Intevation GmbH -// -// Author(s): -// * Tom Gottfried <tom.gottfried@intevation.de> - -package imports - -import ( - "context" - "database/sql" - "strings" - - "gemma.intevation.de/gemma/pkg/pgxutils" -) - -// Bcnlat is an import job to import -// fairway marks of type BCNLAT in form of point geometries -// and attribute data from a WFS service. -type Bcnlat struct { - FairwayMarks -} - -// Description gives a short info about relevant facts of this import. -func (bcnlat *Bcnlat) Description() (string, error) { - return bcnlat.URL + "|" + bcnlat.FeatureType, nil -} - -// BCNLATJobKind is the import queue type identifier. -const BCNLATJobKind JobKind = "fm_bcnlat" - -type bcnlatJobCreator struct{} - -func init() { - RegisterJobCreator(BCNLATJobKind, bcnlatJobCreator{}) -} - -func (bcnlatJobCreator) Description() string { return "fairway marks bcnlat" } - -func (bcnlatJobCreator) AutoAccept() bool { return true } - -func (bcnlatJobCreator) Create() Job { return new(Bcnlat) } - -func (bcnlatJobCreator) Depends() [2][]string { - return [2][]string{ - {"fairway_marks_bcnlat"}, - {}, - } -} - -// StageDone is a NOP for fairway marks imports. -func (bcnlatJobCreator) StageDone(context.Context, *sql.Tx, int64) error { - return nil -} - -// CleanUp for fairway marks imports is a NOP. -func (*Bcnlat) CleanUp() error { return nil } - -type bcnlatProperties struct { - fairwayMarksProperties - Colour *string `json:"hydro_colour"` - Colpat *string `json:"hydro_colpat"` - Condtn *int `json:"hydro_condtn"` - Bcnshp *int `json:"hydro_bcnshp"` - HydroCatlam *int64 `json:"hydro_catlam,omitempty"` - IENCCatlam *int64 `json:"ienc_catlam,omitempty"` - Dirimp *string `json:"ienc_dirimp,omitempty"` -} - -type bcnlatFeaturetype struct { - geom pointSlice - props *bcnlatProperties -} - -const ( - insertBcnlatDirimpSQL = ` -INSERT INTO waterway.fairway_marks_bcnlat_dirimps (fm_bcnlat_id, dirimp) - VALUES ($1, $2) -` -) - -// Do executes the actual import. -func (fm *Bcnlat) Do( - ctx context.Context, - importID int64, - conn *sql.Conn, - feedback Feedback, -) (interface{}, error) { - - err := getFMFeatures( - ctx, - conn, - feedback, - fm.FairwayMarks, - func() interface{} { return new(bcnlatProperties) }, - func(p pointSlice, props interface{}) interface{} { - return &bcnlatFeaturetype{p, props.(*bcnlatProperties)} - }, - func( - tx *sql.Tx, epsg int, fms []interface{}, - ) (outsideOrDup int, features int, err error) { - - feedback.Info("Store fairway marks of type BCNLAT/bcnlat") - - insertStmt, err := tx.PrepareContext( - ctx, - getFMInsertSQL("bcnlat", - "colour", "colpat", "condtn", "bcnshp", "catlam"), - ) - if err != nil { - return - } - defer insertStmt.Close() - - insertBcnlatDirimpStmt, err := tx.PrepareContext( - ctx, insertBcnlatDirimpSQL) - if err != nil { - return - } - defer insertBcnlatDirimpStmt.Close() - - savepoint := Savepoint(ctx, tx, "feature") - - for _, fm := range fms { - - f := fm.(*bcnlatFeaturetype) - - var catlam sql.NullInt64 - if f.props.HydroCatlam != nil { - catlam = sql.NullInt64{ - Int64: *f.props.HydroCatlam, Valid: true} - } else if f.props.IENCCatlam != nil { - catlam = sql.NullInt64{ - Int64: *f.props.IENCCatlam, Valid: true} - } - - var fmid int64 - err := savepoint(func() error { - err := insertStmt.QueryRowContext( - ctx, - f.geom.asWKB(), - epsg, - f.props.Datsta, - f.props.Datend, - f.props.Persta, - f.props.Perend, - f.props.Objnam, - f.props.Nobjnm, - f.props.Inform, - f.props.Ninfom, - f.props.Scamin, - f.props.Picrep, - f.props.Txtdsc, - f.props.Sordat, - f.props.Sorind, - f.props.Colour, - f.props.Colpat, - f.props.Condtn, - f.props.Bcnshp, - catlam, - ).Scan(&fmid) - return err - }) - switch { - case err == sql.ErrNoRows: - outsideOrDup++ - // ignore -> filtered by responsibility area or a duplicate - // TODO: handle eventual changes to dirimp - case err != nil: - feedback.Error(pgxutils.ReadableError{Err: err}.Error()) - default: - features++ - - if f.props.Dirimp != nil && *f.props.Dirimp != "" { - dirimps := strings.Split(*f.props.Dirimp, ",") - for _, dirimp := range dirimps { - if err := savepoint(func() error { - _, err := insertBcnlatDirimpStmt.ExecContext( - ctx, fmid, dirimp) - return err - }); err != nil { - feedback.Warn( - pgxutils.ReadableError{Err: err}.Error()) - feedback.Info( - "Tried to import '%s' as dirimp value", - dirimp) - } - } - } - } - } - return - }) - - return nil, err -}
--- a/pkg/imports/fm_boycar.go Tue Feb 18 13:00:25 2020 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,162 +0,0 @@ -// This is Free Software under GNU Affero General Public License v >= 3.0 -// without warranty, see README.md and license for details. -// -// SPDX-License-Identifier: AGPL-3.0-or-later -// License-Filename: LICENSES/AGPL-3.0.txt -// -// Copyright (C) 2020 by via donau -// – Österreichische Wasserstraßen-Gesellschaft mbH -// Software engineering by Intevation GmbH -// -// Author(s): -// * Tom Gottfried <tom.gottfried@intevation.de> - -package imports - -import ( - "context" - "database/sql" - - "gemma.intevation.de/gemma/pkg/pgxutils" -) - -// Boycar is an import job to import -// fairway marks of type BOYCAR in form of point geometries -// and attribute data from a WFS service. -type Boycar struct { - FairwayMarks -} - -// Description gives a short info about relevant facts of this import. -func (boycar *Boycar) Description() (string, error) { - return boycar.URL + "|" + boycar.FeatureType, nil -} - -// BOYCARJobKind is the import queue type identifier. -const BOYCARJobKind JobKind = "fm_boycar" - -type boycarJobCreator struct{} - -func init() { - RegisterJobCreator(BOYCARJobKind, boycarJobCreator{}) -} - -func (boycarJobCreator) Description() string { return "fairway marks boycar" } - -func (boycarJobCreator) AutoAccept() bool { return true } - -func (boycarJobCreator) Create() Job { return new(Boycar) } - -func (boycarJobCreator) Depends() [2][]string { - return [2][]string{ - {"fairway_marks_boycar"}, - {}, - } -} - -// StageDone is a NOP for fairway marks imports. -func (boycarJobCreator) StageDone(context.Context, *sql.Tx, int64) error { - return nil -} - -// CleanUp for fairway marks imports is a NOP. -func (*Boycar) CleanUp() error { return nil } - -type boycarProperties struct { - fairwayMarksProperties - Colour *string `json:"hydro_colour"` - Colpat *string `json:"hydro_colpat"` - Conrad *int `json:"hydro_conrad"` - Marsys *int `json:"hydro_marsys"` - Boyshp *int `json:"hydro_boyshp"` - Catcam *int `json:"hydro_catcam"` -} - -type boycarFeaturetype struct { - geom pointSlice - props *boycarProperties -} - -// Do executes the actual import. -func (fm *Boycar) Do( - ctx context.Context, - importID int64, - conn *sql.Conn, - feedback Feedback, -) (interface{}, error) { - - err := getFMFeatures( - ctx, - conn, - feedback, - fm.FairwayMarks, - func() interface{} { return new(boycarProperties) }, - func(p pointSlice, props interface{}) interface{} { - return &boycarFeaturetype{p, props.(*boycarProperties)} - }, - func( - tx *sql.Tx, epsg int, fms []interface{}, - ) (outsideOrDup int, features int, err error) { - - feedback.Info("Store fairway marks of type BOYCAR") - - insertStmt, err := tx.PrepareContext( - ctx, - getFMInsertSQL("boycar", - "colour", "colpat", "conrad", - "marsys", "boyshp", "catcam"), - ) - if err != nil { - return - } - defer insertStmt.Close() - - savepoint := Savepoint(ctx, tx, "feature") - - for _, fm := range fms { - - f := fm.(*boycarFeaturetype) - - var fmid int64 - err := savepoint(func() error { - err := insertStmt.QueryRowContext( - ctx, - f.geom.asWKB(), - epsg, - f.props.Datsta, - f.props.Datend, - f.props.Persta, - f.props.Perend, - f.props.Objnam, - f.props.Nobjnm, - f.props.Inform, - f.props.Ninfom, - f.props.Scamin, - f.props.Picrep, - f.props.Txtdsc, - f.props.Sordat, - f.props.Sorind, - f.props.Colour, - f.props.Colpat, - f.props.Conrad, - f.props.Marsys, - f.props.Boyshp, - f.props.Catcam, - ).Scan(&fmid) - return err - }) - switch { - case err == sql.ErrNoRows: - outsideOrDup++ - // ignore -> filtered by responsibility_areas - case err != nil: - feedback.Error(pgxutils.ReadableError{Err: err}.Error()) - default: - features++ - } - } - return - }) - - return nil, err -}
--- a/pkg/imports/fm_boylat.go Tue Feb 18 13:00:25 2020 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,182 +0,0 @@ -// This is Free Software under GNU Affero General Public License v >= 3.0 -// without warranty, see README.md and license for details. -// -// SPDX-License-Identifier: AGPL-3.0-or-later -// License-Filename: LICENSES/AGPL-3.0.txt -// -// Copyright (C) 2020 by via donau -// – Österreichische Wasserstraßen-Gesellschaft mbH -// Software engineering by Intevation GmbH -// -// Author(s): -// * Tom Gottfried <tom.gottfried@intevation.de> - -package imports - -import ( - "context" - "database/sql" - - "gemma.intevation.de/gemma/pkg/pgxutils" -) - -// Boylat is an import job to import -// fairway marks of type BOYLAT in form of point geometries -// and attribute data from a WFS service. -type Boylat struct { - FairwayMarks -} - -// Description gives a short info about relevant facts of this import. -func (boylat *Boylat) Description() (string, error) { - return boylat.URL + "|" + boylat.FeatureType, nil -} - -// BOYLATJobKind is the import queue type identifier. -const BOYLATJobKind JobKind = "fm_boylat" - -type boylatJobCreator struct{} - -func init() { - RegisterJobCreator(BOYLATJobKind, boylatJobCreator{}) -} - -func (boylatJobCreator) Description() string { return "fairway marks boylat" } - -func (boylatJobCreator) AutoAccept() bool { return true } - -func (boylatJobCreator) Create() Job { return new(Boylat) } - -func (boylatJobCreator) Depends() [2][]string { - return [2][]string{ - {"fairway_marks_boylat"}, - {}, - } -} - -// StageDone is a NOP for fairway marks imports. -func (boylatJobCreator) StageDone(context.Context, *sql.Tx, int64) error { - return nil -} - -// CleanUp for fairway marks imports is a NOP. -func (*Boylat) CleanUp() error { return nil } - -type boylatProperties struct { - fairwayMarksProperties - Colour *string `json:"hydro_colour"` - Colpat *string `json:"hydro_colpat"` - Conrad *int `json:"hydro_conrad"` - HydroMarsys *int64 `json:"hydro_marsys,omitempty"` - IENCMarsys *int64 `json:"ienc_marsys,omitempty"` - Boyshp *int `json:"hydro_boyshp"` - HydroCatlam *int64 `json:"hydro_catlam,omitempty"` - IENCCatlam *int64 `json:"ienc_catlam,omitempty"` -} - -type boylatFeaturetype struct { - geom pointSlice - props *boylatProperties -} - -// Do executes the actual import. -func (fm *Boylat) Do( - ctx context.Context, - importID int64, - conn *sql.Conn, - feedback Feedback, -) (interface{}, error) { - - err := getFMFeatures( - ctx, - conn, - feedback, - fm.FairwayMarks, - func() interface{} { return new(boylatProperties) }, - func(p pointSlice, props interface{}) interface{} { - return &boylatFeaturetype{p, props.(*boylatProperties)} - }, - func( - tx *sql.Tx, epsg int, fms []interface{}, - ) (outsideOrDup int, features int, err error) { - - feedback.Info("Store fairway marks of type BOYLAT") - - insertStmt, err := tx.PrepareContext( - ctx, - getFMInsertSQL("boylat", - "colour", "colpat", "conrad", - "marsys", "boyshp", "catlam"), - ) - if err != nil { - return - } - defer insertStmt.Close() - - savepoint := Savepoint(ctx, tx, "feature") - - for _, fm := range fms { - - f := fm.(*boylatFeaturetype) - - var marsys sql.NullInt64 - if f.props.HydroMarsys != nil { - marsys = sql.NullInt64{ - Int64: *f.props.HydroMarsys, Valid: true} - } else if f.props.IENCMarsys != nil { - marsys = sql.NullInt64{ - Int64: *f.props.IENCMarsys, Valid: true} - } - - var catlam sql.NullInt64 - if f.props.HydroCatlam != nil { - catlam = sql.NullInt64{ - Int64: *f.props.HydroCatlam, Valid: true} - } else if f.props.IENCCatlam != nil { - catlam = sql.NullInt64{ - Int64: *f.props.IENCCatlam, Valid: true} - } - - var fmid int64 - err := savepoint(func() error { - err := insertStmt.QueryRowContext( - ctx, - f.geom.asWKB(), - epsg, - f.props.Datsta, - f.props.Datend, - f.props.Persta, - f.props.Perend, - f.props.Objnam, - f.props.Nobjnm, - f.props.Inform, - f.props.Ninfom, - f.props.Scamin, - f.props.Picrep, - f.props.Txtdsc, - f.props.Sordat, - f.props.Sorind, - f.props.Colour, - f.props.Colpat, - f.props.Conrad, - marsys, - f.props.Boyshp, - catlam, - ).Scan(&fmid) - return err - }) - switch { - case err == sql.ErrNoRows: - outsideOrDup++ - // ignore -> filtered by responsibility_areas - case err != nil: - feedback.Error(pgxutils.ReadableError{Err: err}.Error()) - default: - features++ - } - } - return - }) - - return nil, err -}
--- a/pkg/imports/fm_boysaw.go Tue Feb 18 13:00:25 2020 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,159 +0,0 @@ -// This is Free Software under GNU Affero General Public License v >= 3.0 -// without warranty, see README.md and license for details. -// -// SPDX-License-Identifier: AGPL-3.0-or-later -// License-Filename: LICENSES/AGPL-3.0.txt -// -// Copyright (C) 2020 by via donau -// – Österreichische Wasserstraßen-Gesellschaft mbH -// Software engineering by Intevation GmbH -// -// Author(s): -// * Tom Gottfried <tom.gottfried@intevation.de> - -package imports - -import ( - "context" - "database/sql" - - "gemma.intevation.de/gemma/pkg/pgxutils" -) - -// Boysaw is an import job to import -// fairway marks of type BOYSAW in form of point geometries -// and attribute data from a WFS service. -type Boysaw struct { - FairwayMarks -} - -// Description gives a short info about relevant facts of this import. -func (boysaw *Boysaw) Description() (string, error) { - return boysaw.URL + "|" + boysaw.FeatureType, nil -} - -// BOYSAWJobKind is the import queue type identifier. -const BOYSAWJobKind JobKind = "fm_boysaw" - -type boysawJobCreator struct{} - -func init() { - RegisterJobCreator(BOYSAWJobKind, boysawJobCreator{}) -} - -func (boysawJobCreator) Description() string { return "fairway marks boysaw" } - -func (boysawJobCreator) AutoAccept() bool { return true } - -func (boysawJobCreator) Create() Job { return new(Boysaw) } - -func (boysawJobCreator) Depends() [2][]string { - return [2][]string{ - {"fairway_marks_boysaw"}, - {}, - } -} - -// StageDone is a NOP for fairway marks imports. -func (boysawJobCreator) StageDone(context.Context, *sql.Tx, int64) error { - return nil -} - -// CleanUp for fairway marks imports is a NOP. -func (*Boysaw) CleanUp() error { return nil } - -type boysawProperties struct { - fairwayMarksProperties - Colour *string `json:"hydro_colour"` - Colpat *string `json:"hydro_colpat"` - Conrad *int `json:"hydro_conrad"` - Marsys *int64 `json:"hydro_marsys"` - Boyshp *int `json:"hydro_boyshp"` -} - -type boysawFeaturetype struct { - geom pointSlice - props *boysawProperties -} - -// Do executes the actual import. -func (fm *Boysaw) Do( - ctx context.Context, - importID int64, - conn *sql.Conn, - feedback Feedback, -) (interface{}, error) { - - err := getFMFeatures( - ctx, - conn, - feedback, - fm.FairwayMarks, - func() interface{} { return new(boysawProperties) }, - func(p pointSlice, props interface{}) interface{} { - return &boysawFeaturetype{p, props.(*boysawProperties)} - }, - func( - tx *sql.Tx, epsg int, fms []interface{}, - ) (outsideOrDup int, features int, err error) { - - feedback.Info("Store fairway marks of type BOYSAW") - - insertStmt, err := tx.PrepareContext( - ctx, - getFMInsertSQL("boysaw", - "colour", "colpat", "conrad", "marsys", "boyshp"), - ) - if err != nil { - return - } - defer insertStmt.Close() - - savepoint := Savepoint(ctx, tx, "feature") - - for _, fm := range fms { - - f := fm.(*boysawFeaturetype) - - var fmid int64 - err := savepoint(func() error { - err := insertStmt.QueryRowContext( - ctx, - f.geom.asWKB(), - epsg, - f.props.Datsta, - f.props.Datend, - f.props.Persta, - f.props.Perend, - f.props.Objnam, - f.props.Nobjnm, - f.props.Inform, - f.props.Ninfom, - f.props.Scamin, - f.props.Picrep, - f.props.Txtdsc, - f.props.Sordat, - f.props.Sorind, - f.props.Colour, - f.props.Colpat, - f.props.Conrad, - f.props.Marsys, - f.props.Boyshp, - ).Scan(&fmid) - return err - }) - switch { - case err == sql.ErrNoRows: - outsideOrDup++ - // ignore -> filtered by responsibility_areas - case err != nil: - feedback.Error(pgxutils.ReadableError{Err: err}.Error()) - default: - features++ - } - } - return - }) - - return nil, err -}
--- a/pkg/imports/fm_boyspp.go Tue Feb 18 13:00:25 2020 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,162 +0,0 @@ -// This is Free Software under GNU Affero General Public License v >= 3.0 -// without warranty, see README.md and license for details. -// -// SPDX-License-Identifier: AGPL-3.0-or-later -// License-Filename: LICENSES/AGPL-3.0.txt -// -// Copyright (C) 2020 by via donau -// – Österreichische Wasserstraßen-Gesellschaft mbH -// Software engineering by Intevation GmbH -// -// Author(s): -// * Tom Gottfried <tom.gottfried@intevation.de> - -package imports - -import ( - "context" - "database/sql" - - "gemma.intevation.de/gemma/pkg/pgxutils" -) - -// Boyspp is an import job to import -// fairway marks of type BOYSPP in form of point geometries -// and attribute data from a WFS service. -type Boyspp struct { - FairwayMarks -} - -// Description gives a short info about relevant facts of this import. -func (boyspp *Boyspp) Description() (string, error) { - return boyspp.URL + "|" + boyspp.FeatureType, nil -} - -// BOYSPPJobKind is the import queue type identifier. -const BOYSPPJobKind JobKind = "fm_boyspp" - -type boysppJobCreator struct{} - -func init() { - RegisterJobCreator(BOYSPPJobKind, boysppJobCreator{}) -} - -func (boysppJobCreator) Description() string { return "fairway marks boyspp" } - -func (boysppJobCreator) AutoAccept() bool { return true } - -func (boysppJobCreator) Create() Job { return new(Boyspp) } - -func (boysppJobCreator) Depends() [2][]string { - return [2][]string{ - {"fairway_marks_boyspp"}, - {}, - } -} - -// StageDone is a NOP for fairway marks imports. -func (boysppJobCreator) StageDone(context.Context, *sql.Tx, int64) error { - return nil -} - -// CleanUp for fairway marks imports is a NOP. -func (*Boyspp) CleanUp() error { return nil } - -type boysppProperties struct { - fairwayMarksProperties - Colour *string `json:"hydro_colour"` - Colpat *string `json:"hydro_colpat"` - Conrad *int `json:"hydro_conrad"` - Marsys *int64 `json:"hydro_marsys"` - Boyshp *int `json:"hydro_boyshp"` - Catspm *string `json:"hydro_catspm"` -} - -type boysppFeaturetype struct { - geom pointSlice - props *boysppProperties -} - -// Do executes the actual import. -func (fm *Boyspp) Do( - ctx context.Context, - importID int64, - conn *sql.Conn, - feedback Feedback, -) (interface{}, error) { - - err := getFMFeatures( - ctx, - conn, - feedback, - fm.FairwayMarks, - func() interface{} { return new(boysppProperties) }, - func(p pointSlice, props interface{}) interface{} { - return &boysppFeaturetype{p, props.(*boysppProperties)} - }, - func( - tx *sql.Tx, epsg int, fms []interface{}, - ) (outsideOrDup int, features int, err error) { - - feedback.Info("Store fairway marks of type BOYSPP") - - insertStmt, err := tx.PrepareContext( - ctx, - getFMInsertSQL("boyspp", - "colour", "colpat", "conrad", - "marsys", "boyshp", "catspm"), - ) - if err != nil { - return - } - defer insertStmt.Close() - - savepoint := Savepoint(ctx, tx, "feature") - - for _, fm := range fms { - - f := fm.(*boysppFeaturetype) - - var fmid int64 - err := savepoint(func() error { - err := insertStmt.QueryRowContext( - ctx, - f.geom.asWKB(), - epsg, - f.props.Datsta, - f.props.Datend, - f.props.Persta, - f.props.Perend, - f.props.Objnam, - f.props.Nobjnm, - f.props.Inform, - f.props.Ninfom, - f.props.Scamin, - f.props.Picrep, - f.props.Txtdsc, - f.props.Sordat, - f.props.Sorind, - f.props.Colour, - f.props.Colpat, - f.props.Conrad, - f.props.Marsys, - f.props.Boyshp, - f.props.Catspm, - ).Scan(&fmid) - return err - }) - switch { - case err == sql.ErrNoRows: - outsideOrDup++ - // ignore -> filtered by responsibility_areas - case err != nil: - feedback.Error(pgxutils.ReadableError{Err: err}.Error()) - default: - features++ - } - } - return - }) - - return nil, err -}
--- a/pkg/imports/fm_daymar.go Tue Feb 18 13:00:25 2020 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,193 +0,0 @@ -// This is Free Software under GNU Affero General Public License v >= 3.0 -// without warranty, see README.md and license for details. -// -// SPDX-License-Identifier: AGPL-3.0-or-later -// License-Filename: LICENSES/AGPL-3.0.txt -// -// Copyright (C) 2020 by via donau -// – Österreichische Wasserstraßen-Gesellschaft mbH -// Software engineering by Intevation GmbH -// -// Author(s): -// * Tom Gottfried <tom.gottfried@intevation.de> - -package imports - -import ( - "context" - "database/sql" - "strings" - - "gemma.intevation.de/gemma/pkg/pgxutils" -) - -// Daymar is an import job to import -// fairway marks of type DAYMAR in form of point geometries -// and attribute data from a WFS service. -type Daymar struct { - FairwayMarks -} - -// Description gives a short info about relevant facts of this import. -func (daymar *Daymar) Description() (string, error) { - return daymar.URL + "|" + daymar.FeatureType, nil -} - -// DAYMARJobKind is the import queue type identifier. -const DAYMARJobKind JobKind = "fm_daymar" - -type daymarJobCreator struct{} - -func init() { - RegisterJobCreator(DAYMARJobKind, daymarJobCreator{}) -} - -func (daymarJobCreator) Description() string { return "fairway marks daymar" } - -func (daymarJobCreator) AutoAccept() bool { return true } - -func (daymarJobCreator) Create() Job { return new(Daymar) } - -func (daymarJobCreator) Depends() [2][]string { - return [2][]string{ - {"fairway_marks_daymar"}, - {}, - } -} - -// StageDone is a NOP for fairway marks imports. -func (daymarJobCreator) StageDone(context.Context, *sql.Tx, int64) error { - return nil -} - -// CleanUp for fairway marks imports is a NOP. -func (*Daymar) CleanUp() error { return nil } - -type daymarProperties struct { - fairwayMarksProperties - Colour *string `json:"hydro_colour"` - Colpat *string `json:"hydro_colpat"` - Condtn *int `json:"hydro_condtn"` - Dirimp *string `json:"ienc_dirimp,omitempty"` - Topshp *int `json:"hydro_topshp"` - Orient *float64 `json:"hydro_orient,omitempty"` -} - -type daymarFeaturetype struct { - geom pointSlice - props *daymarProperties -} - -const ( - insertDaymarDirimpSQL = ` -INSERT INTO waterway.fairway_marks_daymar_dirimps (fm_daymar_id, dirimp) - VALUES ($1, $2) -` -) - -// Do executes the actual import. -func (fm *Daymar) Do( - ctx context.Context, - importID int64, - conn *sql.Conn, - feedback Feedback, -) (interface{}, error) { - - err := getFMFeatures( - ctx, - conn, - feedback, - fm.FairwayMarks, - func() interface{} { return new(daymarProperties) }, - func(p pointSlice, props interface{}) interface{} { - return &daymarFeaturetype{p, props.(*daymarProperties)} - }, - func( - tx *sql.Tx, epsg int, fms []interface{}, - ) (outsideOrDup int, features int, err error) { - - feedback.Info("Store fairway marks of type DAYMAR/daymar") - - insertStmt, err := tx.PrepareContext( - ctx, - getFMInsertSQL("daymar", - "colour", "colpat", "condtn", "topshp", "orient"), - ) - if err != nil { - return - } - defer insertStmt.Close() - - insertDaymarDirimpStmt, err := tx.PrepareContext( - ctx, insertDaymarDirimpSQL) - if err != nil { - return - } - defer insertDaymarDirimpStmt.Close() - - savepoint := Savepoint(ctx, tx, "feature") - - for _, fm := range fms { - - f := fm.(*daymarFeaturetype) - - var fmid int64 - err := savepoint(func() error { - err := insertStmt.QueryRowContext( - ctx, - f.geom.asWKB(), - epsg, - f.props.Datsta, - f.props.Datend, - f.props.Persta, - f.props.Perend, - f.props.Objnam, - f.props.Nobjnm, - f.props.Inform, - f.props.Ninfom, - f.props.Scamin, - f.props.Picrep, - f.props.Txtdsc, - f.props.Sordat, - f.props.Sorind, - f.props.Colour, - f.props.Colpat, - f.props.Condtn, - f.props.Topshp, - f.props.Orient, - ).Scan(&fmid) - return err - }) - switch { - case err == sql.ErrNoRows: - outsideOrDup++ - // ignore -> filtered by responsibility area or a duplicate - // TODO: handle eventual changes to dirimp - case err != nil: - feedback.Error(pgxutils.ReadableError{Err: err}.Error()) - default: - features++ - - if f.props.Dirimp != nil && *f.props.Dirimp != "" { - dirimps := strings.Split(*f.props.Dirimp, ",") - for _, dirimp := range dirimps { - if err := savepoint(func() error { - _, err := insertDaymarDirimpStmt.ExecContext( - ctx, fmid, dirimp) - return err - }); err != nil { - feedback.Warn( - pgxutils.ReadableError{Err: err}.Error()) - feedback.Info( - "Tried to import '%s' as dirimp value", - dirimp) - } - } - } - } - } - return - }) - - return nil, err -}
--- a/pkg/imports/fm_lights.go Tue Feb 18 13:00:25 2020 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,181 +0,0 @@ -// This is Free Software under GNU Affero General Public License v >= 3.0 -// without warranty, see README.md and license for details. -// -// SPDX-License-Identifier: AGPL-3.0-or-later -// License-Filename: LICENSES/AGPL-3.0.txt -// -// Copyright (C) 2020 by via donau -// – Österreichische Wasserstraßen-Gesellschaft mbH -// Software engineering by Intevation GmbH -// -// Author(s): -// * Tom Gottfried <tom.gottfried@intevation.de> - -package imports - -import ( - "context" - "database/sql" - - "gemma.intevation.de/gemma/pkg/pgxutils" -) - -// Lights is an import job to import -// fairway marks of type LIGHTS in form of point geometries -// and attribute data from a WFS service. -type Lights struct { - FairwayMarks -} - -// Description gives a short info about relevant facts of this import. -func (lights *Lights) Description() (string, error) { - return lights.URL + "|" + lights.FeatureType, nil -} - -// LIGHTSJobKind is the import queue type identifier. -const LIGHTSJobKind JobKind = "fm_lights" - -type lightsJobCreator struct{} - -func init() { - RegisterJobCreator(LIGHTSJobKind, lightsJobCreator{}) -} - -func (lightsJobCreator) Description() string { return "fairway marks lights" } - -func (lightsJobCreator) AutoAccept() bool { return true } - -func (lightsJobCreator) Create() Job { return new(Lights) } - -func (lightsJobCreator) Depends() [2][]string { - return [2][]string{ - {"fairway_marks_lights"}, - {}, - } -} - -// StageDone is a NOP for fairway marks imports. -func (lightsJobCreator) StageDone(context.Context, *sql.Tx, int64) error { - return nil -} - -// CleanUp for fairway marks imports is a NOP. -func (*Lights) CleanUp() error { return nil } - -type lightsProperties struct { - fairwayMarksProperties - Colour *string `json:"hydro_colour"` - Condtn *int `json:"hydro_condtn"` - Orient *float64 `json:"hydro_orient"` - Catlit *string `json:"hydro_catlit"` - Exclit *int `json:"hydro_exclit"` - Litchr *int `json:"hydro_litchr"` - Litvis *string `json:"hydro_litvis"` - Mltylt *int `json:"hydro_mltylt"` - Sectr1 *float64 `json:"hydro_sectr1"` - Sectr2 *float64 `json:"hydro_sectr2"` - Siggrp *string `json:"hydro_siggrp"` - Sigper *float64 `json:"hydro_sigper"` - Sigseq *string `json:"hydro_sigseq"` - Status *string `json:"hydro_status"` -} - -type lightsFeaturetype struct { - geom pointSlice - props *lightsProperties -} - -// Do executes the actual import. -func (fm *Lights) Do( - ctx context.Context, - importID int64, - conn *sql.Conn, - feedback Feedback, -) (interface{}, error) { - - err := getFMFeatures( - ctx, - conn, - feedback, - fm.FairwayMarks, - func() interface{} { return new(lightsProperties) }, - func(p pointSlice, props interface{}) interface{} { - return &lightsFeaturetype{p, props.(*lightsProperties)} - }, - func( - tx *sql.Tx, epsg int, fms []interface{}, - ) (outsideOrDup int, features int, err error) { - - feedback.Info("Store fairway marks of type LIGHTS") - - insertStmt, err := tx.PrepareContext( - ctx, - getFMInsertSQL("lights", - "colour", "condtn", "orient", - "catlit", "exclit", "litchr", - "litvis", "mltylt", "sectr1", - "sectr2", "siggrp", "sigper", - "sigseq", "status"), - ) - if err != nil { - return - } - defer insertStmt.Close() - - savepoint := Savepoint(ctx, tx, "feature") - - for _, fm := range fms { - - f := fm.(*lightsFeaturetype) - - var fmid int64 - err := savepoint(func() error { - err := insertStmt.QueryRowContext( - ctx, - f.geom.asWKB(), - epsg, - f.props.Datsta, - f.props.Datend, - f.props.Persta, - f.props.Perend, - f.props.Objnam, - f.props.Nobjnm, - f.props.Inform, - f.props.Ninfom, - f.props.Scamin, - f.props.Picrep, - f.props.Txtdsc, - f.props.Sordat, - f.props.Sorind, - f.props.Colour, - f.props.Condtn, - f.props.Orient, - f.props.Catlit, - f.props.Exclit, - f.props.Litchr, - f.props.Litvis, - f.props.Mltylt, - f.props.Sectr1, - f.props.Sectr2, - f.props.Siggrp, - f.props.Sigper, - f.props.Sigseq, - f.props.Status, - ).Scan(&fmid) - return err - }) - switch { - case err == sql.ErrNoRows: - outsideOrDup++ - // ignore -> filtered by responsibility area or a duplicate - case err != nil: - feedback.Error(pgxutils.ReadableError{Err: err}.Error()) - default: - features++ - } - } - return - }) - - return nil, err -}
--- a/pkg/imports/fm_notmrk.go Tue Feb 18 13:00:25 2020 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,210 +0,0 @@ -// This is Free Software under GNU Affero General Public License v >= 3.0 -// without warranty, see README.md and license for details. -// -// SPDX-License-Identifier: AGPL-3.0-or-later -// License-Filename: LICENSES/AGPL-3.0.txt -// -// Copyright (C) 2020 by via donau -// – Österreichische Wasserstraßen-Gesellschaft mbH -// Software engineering by Intevation GmbH -// -// Author(s): -// * Tom Gottfried <tom.gottfried@intevation.de> - -package imports - -import ( - "context" - "database/sql" - "strings" - - "gemma.intevation.de/gemma/pkg/pgxutils" -) - -// Notmrk is an import job to import -// fairway marks of type NOTMRK in form of point geometries -// and attribute data from a WFS service. -type Notmrk struct { - FairwayMarks -} - -// Description gives a short info about relevant facts of this import. -func (notmrk *Notmrk) Description() (string, error) { - return notmrk.URL + "|" + notmrk.FeatureType, nil -} - -// NOTMRKJobKind is the import queue type identifier. -const NOTMRKJobKind JobKind = "fm_notmrk" - -type notmrkJobCreator struct{} - -func init() { - RegisterJobCreator(NOTMRKJobKind, notmrkJobCreator{}) -} - -func (notmrkJobCreator) Description() string { return "fairway marks notmrk" } - -func (notmrkJobCreator) AutoAccept() bool { return true } - -func (notmrkJobCreator) Create() Job { return new(Notmrk) } - -func (notmrkJobCreator) Depends() [2][]string { - return [2][]string{ - {"fairway_marks_notmrk"}, - {}, - } -} - -// StageDone is a NOP for fairway marks imports. -func (notmrkJobCreator) StageDone(context.Context, *sql.Tx, int64) error { - return nil -} - -// CleanUp for fairway marks imports is a NOP. -func (*Notmrk) CleanUp() error { return nil } - -type notmrkProperties struct { - fairwayMarksProperties - Condtn *int `json:"hydro_condtn"` - Marsys *int `json:"hydro_bcnshp"` - Dirimp *string `json:"ienc_dirimp"` - Orient *float64 `json:"hydro_orient"` - Status *string `json:"hydro_status"` - Addmrk *string `json:"ienc_addmrk"` - Catnmk *int `json:"ienc_catnmk"` - Disipd *float64 `json:"ienc_disipd"` - Disipu *float64 `json:"ienc_disipu"` - Disbk1 *float64 `json:"ienc_disbk1"` - Disbk2 *float64 `json:"ienc_disbk2"` - Fnctnm *int `json:"ienc_fnctnm"` - Bnkwtw *int `json:"ienc_bnkwtw"` -} - -type notmrkFeaturetype struct { - geom pointSlice - props *notmrkProperties -} - -const ( - insertNotmrkDirimpSQL = ` -INSERT INTO waterway.fairway_marks_notmrk_dirimps (fm_notmrk_id, dirimp) - VALUES ($1, $2) -` -) - -// Do executes the actual import. -func (fm *Notmrk) Do( - ctx context.Context, - importID int64, - conn *sql.Conn, - feedback Feedback, -) (interface{}, error) { - - err := getFMFeatures( - ctx, - conn, - feedback, - fm.FairwayMarks, - func() interface{} { return new(notmrkProperties) }, - func(p pointSlice, props interface{}) interface{} { - return ¬mrkFeaturetype{p, props.(*notmrkProperties)} - }, - func( - tx *sql.Tx, epsg int, fms []interface{}, - ) (outsideOrDup int, features int, err error) { - - feedback.Info("Store fairway marks of type NOTMRK") - - insertStmt, err := tx.PrepareContext( - ctx, - getFMInsertSQL("notmrk", - "condtn", "marsys", "orient", - "status", "addmrk", "catnmk", - "disipd", "disipu", "disbk1", - "disbk2", "fnctnm", "bnkwtw"), - ) - if err != nil { - return - } - defer insertStmt.Close() - - insertNotmrkDirimpStmt, err := tx.PrepareContext( - ctx, insertNotmrkDirimpSQL) - if err != nil { - return - } - defer insertNotmrkDirimpStmt.Close() - - savepoint := Savepoint(ctx, tx, "feature") - - for _, fm := range fms { - - f := fm.(*notmrkFeaturetype) - - var fmid int64 - err := savepoint(func() error { - err := insertStmt.QueryRowContext( - ctx, - f.geom.asWKB(), - epsg, - f.props.Datsta, - f.props.Datend, - f.props.Persta, - f.props.Perend, - f.props.Objnam, - f.props.Nobjnm, - f.props.Inform, - f.props.Ninfom, - f.props.Scamin, - f.props.Picrep, - f.props.Txtdsc, - f.props.Sordat, - f.props.Sorind, - f.props.Condtn, - f.props.Marsys, - f.props.Orient, - f.props.Status, - f.props.Addmrk, - f.props.Catnmk, - f.props.Disipd, - f.props.Disipu, - f.props.Disbk1, - f.props.Disbk2, - f.props.Fnctnm, - f.props.Bnkwtw, - ).Scan(&fmid) - return err - }) - switch { - case err == sql.ErrNoRows: - outsideOrDup++ - // ignore -> filtered by responsibility area or a duplicate - // TODO: handle eventual changes to dirimp - case err != nil: - feedback.Error(pgxutils.ReadableError{Err: err}.Error()) - default: - features++ - - if f.props.Dirimp != nil && *f.props.Dirimp != "" { - dirimps := strings.Split(*f.props.Dirimp, ",") - for _, dirimp := range dirimps { - if err := savepoint(func() error { - _, err := insertNotmrkDirimpStmt.ExecContext( - ctx, fmid, dirimp) - return err - }); err != nil { - feedback.Warn( - pgxutils.ReadableError{Err: err}.Error()) - feedback.Info( - "Tried to import '%s' as dirimp value", - dirimp) - } - } - } - } - } - return - }) - - return nil, err -}
--- a/pkg/imports/fm_rtpbcn.go Tue Feb 18 13:00:25 2020 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,157 +0,0 @@ -// This is Free Software under GNU Affero General Public License v >= 3.0 -// without warranty, see README.md and license for details. -// -// SPDX-License-Identifier: AGPL-3.0-or-later -// License-Filename: LICENSES/AGPL-3.0.txt -// -// Copyright (C) 2020 by via donau -// – Österreichische Wasserstraßen-Gesellschaft mbH -// Software engineering by Intevation GmbH -// -// Author(s): -// * Tom Gottfried <tom.gottfried@intevation.de> - -package imports - -import ( - "context" - "database/sql" - - "gemma.intevation.de/gemma/pkg/pgxutils" -) - -// Rtpbcn is an import job to import -// fairway marks of type RTPBCN in form of point geometries -// and attribute data from a WFS service. -type Rtpbcn struct { - FairwayMarks -} - -// Description gives a short info about relevant facts of this import. -func (rtpbcn *Rtpbcn) Description() (string, error) { - return rtpbcn.URL + "|" + rtpbcn.FeatureType, nil -} - -// RTPBCNJobKind is the import queue type identifier. -const RTPBCNJobKind JobKind = "fm_rtpbcn" - -type rtpbcnJobCreator struct{} - -func init() { - RegisterJobCreator(RTPBCNJobKind, rtpbcnJobCreator{}) -} - -func (rtpbcnJobCreator) Description() string { return "fairway marks rtpbcn" } - -func (rtpbcnJobCreator) AutoAccept() bool { return true } - -func (rtpbcnJobCreator) Create() Job { return new(Rtpbcn) } - -func (rtpbcnJobCreator) Depends() [2][]string { - return [2][]string{ - {"fairway_marks_rtpbcn"}, - {}, - } -} - -// StageDone is a NOP for fairway marks imports. -func (rtpbcnJobCreator) StageDone(context.Context, *sql.Tx, int64) error { - return nil -} - -// CleanUp for fairway marks imports is a NOP. -func (*Rtpbcn) CleanUp() error { return nil } - -type rtpbcnProperties struct { - fairwayMarksProperties - Condtn *int `json:"hydro_condtn"` - Siggrp *string `json:"hydro_siggrp"` - Catrtb *int `json:"hydro_catrtb"` - Radwal *string `json:"hydro_radwal"` -} - -type rtpbcnFeaturetype struct { - geom pointSlice - props *rtpbcnProperties -} - -// Do executes the actual import. -func (fm *Rtpbcn) Do( - ctx context.Context, - importID int64, - conn *sql.Conn, - feedback Feedback, -) (interface{}, error) { - - err := getFMFeatures( - ctx, - conn, - feedback, - fm.FairwayMarks, - func() interface{} { return new(rtpbcnProperties) }, - func(p pointSlice, props interface{}) interface{} { - return &rtpbcnFeaturetype{p, props.(*rtpbcnProperties)} - }, - func( - tx *sql.Tx, epsg int, fms []interface{}, - ) (outsideOrDup int, features int, err error) { - - feedback.Info("Store fairway marks of type RTPBCN") - - insertStmt, err := tx.PrepareContext( - ctx, - getFMInsertSQL("rtpbcn", - "condtn", "siggrp", "catrtb", "radwal"), - ) - if err != nil { - return - } - defer insertStmt.Close() - - savepoint := Savepoint(ctx, tx, "feature") - - for _, fm := range fms { - - f := fm.(*rtpbcnFeaturetype) - - var fmid int64 - err := savepoint(func() error { - err := insertStmt.QueryRowContext( - ctx, - f.geom.asWKB(), - epsg, - f.props.Datsta, - f.props.Datend, - f.props.Persta, - f.props.Perend, - f.props.Objnam, - f.props.Nobjnm, - f.props.Inform, - f.props.Ninfom, - f.props.Scamin, - f.props.Picrep, - f.props.Txtdsc, - f.props.Sordat, - f.props.Sorind, - f.props.Condtn, - f.props.Siggrp, - f.props.Catrtb, - f.props.Radwal, - ).Scan(&fmid) - return err - }) - switch { - case err == sql.ErrNoRows: - outsideOrDup++ - // ignore -> filtered by responsibility area or a duplicate - case err != nil: - feedback.Error(pgxutils.ReadableError{Err: err}.Error()) - default: - features++ - } - } - return - }) - - return nil, err -}
--- a/pkg/imports/fm_topmar.go Tue Feb 18 13:00:25 2020 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,157 +0,0 @@ -// This is Free Software under GNU Affero General Public License v >= 3.0 -// without warranty, see README.md and license for details. -// -// SPDX-License-Identifier: AGPL-3.0-or-later -// License-Filename: LICENSES/AGPL-3.0.txt -// -// Copyright (C) 2020 by via donau -// – Österreichische Wasserstraßen-Gesellschaft mbH -// Software engineering by Intevation GmbH -// -// Author(s): -// * Tom Gottfried <tom.gottfried@intevation.de> - -package imports - -import ( - "context" - "database/sql" - - "gemma.intevation.de/gemma/pkg/pgxutils" -) - -// Topmar is an import job to import -// fairway marks of type TOPMAR in form of point geometries -// and attribute data from a WFS service. -type Topmar struct { - FairwayMarks -} - -// Description gives a short info about relevant facts of this import. -func (topmar *Topmar) Description() (string, error) { - return topmar.URL + "|" + topmar.FeatureType, nil -} - -// TOPMARJobKind is the import queue type identifier. -const TOPMARJobKind JobKind = "fm_topmar" - -type topmarJobCreator struct{} - -func init() { - RegisterJobCreator(TOPMARJobKind, topmarJobCreator{}) -} - -func (topmarJobCreator) Description() string { return "fairway marks topmar" } - -func (topmarJobCreator) AutoAccept() bool { return true } - -func (topmarJobCreator) Create() Job { return new(Topmar) } - -func (topmarJobCreator) Depends() [2][]string { - return [2][]string{ - {"fairway_marks_topmar"}, - {}, - } -} - -// StageDone is a NOP for fairway marks imports. -func (topmarJobCreator) StageDone(context.Context, *sql.Tx, int64) error { - return nil -} - -// CleanUp for fairway marks imports is a NOP. -func (*Topmar) CleanUp() error { return nil } - -type topmarProperties struct { - fairwayMarksProperties - Colour *string `json:"hydro_colour"` - Colpat *string `json:"hydro_colpat"` - Condtn *int `json:"hydro_condtn"` - Topshp *int `json:"hydro_topshp"` -} - -type topmarFeaturetype struct { - geom pointSlice - props *topmarProperties -} - -// Do executes the actual import. -func (fm *Topmar) Do( - ctx context.Context, - importID int64, - conn *sql.Conn, - feedback Feedback, -) (interface{}, error) { - - err := getFMFeatures( - ctx, - conn, - feedback, - fm.FairwayMarks, - func() interface{} { return new(topmarProperties) }, - func(p pointSlice, props interface{}) interface{} { - return &topmarFeaturetype{p, props.(*topmarProperties)} - }, - func( - tx *sql.Tx, epsg int, fms []interface{}, - ) (outsideOrDup int, features int, err error) { - - feedback.Info("Store fairway marks of type TOPMAR") - - insertStmt, err := tx.PrepareContext( - ctx, - getFMInsertSQL("topmar", - "colour", "colpat", "condtn", "topshp"), - ) - if err != nil { - return - } - defer insertStmt.Close() - - savepoint := Savepoint(ctx, tx, "feature") - - for _, fm := range fms { - - f := fm.(*topmarFeaturetype) - - var fmid int64 - err := savepoint(func() error { - err := insertStmt.QueryRowContext( - ctx, - f.geom.asWKB(), - epsg, - f.props.Datsta, - f.props.Datend, - f.props.Persta, - f.props.Perend, - f.props.Objnam, - f.props.Nobjnm, - f.props.Inform, - f.props.Ninfom, - f.props.Scamin, - f.props.Picrep, - f.props.Txtdsc, - f.props.Sordat, - f.props.Sorind, - f.props.Colour, - f.props.Colpat, - f.props.Condtn, - f.props.Topshp, - ).Scan(&fmid) - return err - }) - switch { - case err == sql.ErrNoRows: - outsideOrDup++ - // ignore -> filtered by responsibility area or a duplicate - case err != nil: - feedback.Error(pgxutils.ReadableError{Err: err}.Error()) - default: - features++ - } - } - return - }) - - return nil, err -}
--- a/pkg/imports/modelconvert.go Tue Feb 18 13:00:25 2020 +0100 +++ b/pkg/imports/modelconvert.go Wed Feb 19 15:08:35 2020 +0100 @@ -27,16 +27,16 @@ DMVJobKind: func() interface{} { return new(models.DistanceMarksVirtualImport) }, FDJobKind: func() interface{} { return new(models.FairwayDimensionImport) }, DMAJobKind: func() interface{} { return new(models.DistanceMarksAshoreImport) }, - BCNLATJobKind: func() interface{} { return new(models.FairwayMarksImport) }, - BOYCARJobKind: func() interface{} { return new(models.FairwayMarksImport) }, - BOYLATJobKind: func() interface{} { return new(models.FairwayMarksImport) }, - BOYSAWJobKind: func() interface{} { return new(models.FairwayMarksImport) }, - BOYSPPJobKind: func() interface{} { return new(models.FairwayMarksImport) }, - DAYMARJobKind: func() interface{} { return new(models.FairwayMarksImport) }, - LIGHTSJobKind: func() interface{} { return new(models.FairwayMarksImport) }, - RTPBCNJobKind: func() interface{} { return new(models.FairwayMarksImport) }, - TOPMARJobKind: func() interface{} { return new(models.FairwayMarksImport) }, - NOTMRKJobKind: func() interface{} { return new(models.FairwayMarksImport) }, + BCNLATJobKind: func() interface{} { return FindJobCreator(BCNLATJobKind).Create() }, + BOYCARJobKind: func() interface{} { return FindJobCreator(BOYCARJobKind).Create() }, + BOYLATJobKind: func() interface{} { return FindJobCreator(BOYLATJobKind).Create() }, + BOYSAWJobKind: func() interface{} { return FindJobCreator(BOYSAWJobKind).Create() }, + BOYSPPJobKind: func() interface{} { return FindJobCreator(BOYSPPJobKind).Create() }, + DAYMARJobKind: func() interface{} { return FindJobCreator(DAYMARJobKind).Create() }, + LIGHTSJobKind: func() interface{} { return FindJobCreator(LIGHTSJobKind).Create() }, + RTPBCNJobKind: func() interface{} { return FindJobCreator(RTPBCNJobKind).Create() }, + TOPMARJobKind: func() interface{} { return FindJobCreator(TOPMARJobKind).Create() }, + NOTMRKJobKind: func() interface{} { return FindJobCreator(NOTMRKJobKind).Create() }, STJobKind: func() interface{} { return new(models.StretchImport) }, SECJobKind: func() interface{} { return new(models.SectionImport) }, DSECJobKind: func() interface{} { return new(models.SectionDelete) }, @@ -146,116 +146,6 @@ } }, - BCNLATJobKind: func(input interface{}) interface{} { - fmi := input.(*models.FairwayMarksImport) - return &FairwayMarks{ - URL: fmi.URL, - FeatureType: fmi.FeatureType, - SortBy: nilString(fmi.SortBy), - User: nilString(fmi.User), - Password: nilString(fmi.Password), - } - }, - - BOYCARJobKind: func(input interface{}) interface{} { - fmi := input.(*models.FairwayMarksImport) - return &FairwayMarks{ - URL: fmi.URL, - FeatureType: fmi.FeatureType, - SortBy: nilString(fmi.SortBy), - User: nilString(fmi.User), - Password: nilString(fmi.Password), - } - }, - - BOYLATJobKind: func(input interface{}) interface{} { - fmi := input.(*models.FairwayMarksImport) - return &FairwayMarks{ - URL: fmi.URL, - FeatureType: fmi.FeatureType, - SortBy: nilString(fmi.SortBy), - User: nilString(fmi.User), - Password: nilString(fmi.Password), - } - }, - - BOYSAWJobKind: func(input interface{}) interface{} { - fmi := input.(*models.FairwayMarksImport) - return &FairwayMarks{ - URL: fmi.URL, - FeatureType: fmi.FeatureType, - SortBy: nilString(fmi.SortBy), - User: nilString(fmi.User), - Password: nilString(fmi.Password), - } - }, - - BOYSPPJobKind: func(input interface{}) interface{} { - fmi := input.(*models.FairwayMarksImport) - return &FairwayMarks{ - URL: fmi.URL, - FeatureType: fmi.FeatureType, - SortBy: nilString(fmi.SortBy), - User: nilString(fmi.User), - Password: nilString(fmi.Password), - } - }, - - DAYMARJobKind: func(input interface{}) interface{} { - fmi := input.(*models.FairwayMarksImport) - return &FairwayMarks{ - URL: fmi.URL, - FeatureType: fmi.FeatureType, - SortBy: nilString(fmi.SortBy), - User: nilString(fmi.User), - Password: nilString(fmi.Password), - } - }, - - LIGHTSJobKind: func(input interface{}) interface{} { - fmi := input.(*models.FairwayMarksImport) - return &FairwayMarks{ - URL: fmi.URL, - FeatureType: fmi.FeatureType, - SortBy: nilString(fmi.SortBy), - User: nilString(fmi.User), - Password: nilString(fmi.Password), - } - }, - - RTPBCNJobKind: func(input interface{}) interface{} { - fmi := input.(*models.FairwayMarksImport) - return &FairwayMarks{ - URL: fmi.URL, - FeatureType: fmi.FeatureType, - SortBy: nilString(fmi.SortBy), - User: nilString(fmi.User), - Password: nilString(fmi.Password), - } - }, - - TOPMARJobKind: func(input interface{}) interface{} { - fmi := input.(*models.FairwayMarksImport) - return &FairwayMarks{ - URL: fmi.URL, - FeatureType: fmi.FeatureType, - SortBy: nilString(fmi.SortBy), - User: nilString(fmi.User), - Password: nilString(fmi.Password), - } - }, - - NOTMRKJobKind: func(input interface{}) interface{} { - fmi := input.(*models.FairwayMarksImport) - return &FairwayMarks{ - URL: fmi.URL, - FeatureType: fmi.FeatureType, - SortBy: nilString(fmi.SortBy), - User: nilString(fmi.User), - Password: nilString(fmi.Password), - } - }, - STJobKind: func(input interface{}) interface{} { sti := input.(*models.StretchImport) return &Stretch{ @@ -316,7 +206,7 @@ func ConvertToInternal(kind JobKind, src interface{}) interface{} { fn := convertModel[kind] if fn == nil { - return nil + return src } return fn(src) }
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pkg/imports/pointwfs.go Wed Feb 19 15:08:35 2020 +0100 @@ -0,0 +1,323 @@ +// SPDX-License-Identifier: AGPL-3.0-or-later +// License-Filename: LICENSES/AGPL-3.0.txt +// +// Copyright (C) 2020 by via donau +// – Österreichische Wasserstraßen-Gesellschaft mbH +// Software engineering by Intevation GmbH +// +// Author(s): +// * Tom Gottfried <tom.gottfried@intevation.de> +// * Sascha L. Teichmann <sascha.teichmann@intevation.de> + +package imports + +import ( + "context" + "database/sql" + "encoding/json" + "errors" + "fmt" + "io" + "time" + + "gemma.intevation.de/gemma/pkg/models" + "gemma.intevation.de/gemma/pkg/wfs" +) + +var ( + ErrFeatureIgnored = errors.New("feature ignored") + ErrFeatureDuplicated = errors.New("feature duplicated") +) + +type ( + WFSPointConsumer interface { + Commit() error + Rollback() error + + NewProperties() interface{} + Consume(points pointSlice, properties interface{}, epsg int) error + } + + PointWFSJobCreator struct { + description string + depends [2][]string + + newConsumer func(context.Context, *sql.Conn, Feedback) (WFSPointConsumer, error) + } + + PointWFSJob struct { + models.WFSImport + creator *PointWFSJobCreator + } +) + +func (pwjc *PointWFSJobCreator) Description() string { + return pwjc.description +} + +func (pwjc *PointWFSJobCreator) Depends() [2][]string { + return pwjc.depends +} + +func (*PointWFSJobCreator) AutoAccept() bool { + return true +} + +// StageDone is a NOP for WFS imports. +func (*PointWFSJobCreator) StageDone(context.Context, *sql.Tx, int64) error { + return nil +} + +func (pwjc *PointWFSJobCreator) Create() Job { + return &PointWFSJob{creator: pwjc} +} + +// Description gives a short info about relevant facts of this import. +func (pwj *PointWFSJob) Description() (string, error) { + return pwj.URL + "|" + pwj.FeatureType, nil +} + +// CleanUp for WFS imports is a NOP. +func (*PointWFSJob) CleanUp() error { + return nil +} + +func (pwj *PointWFSJob) Do( + ctx context.Context, + importID int64, + conn *sql.Conn, + feedback Feedback, +) (interface{}, error) { + + start := time.Now() + + feedback.Info("Import %s", pwj.creator.Description()) + + feedback.Info("Loading capabilities from %s", pwj.URL) + caps, err := wfs.GetCapabilities(pwj.URL) + if err != nil { + feedback.Error("Loading capabilities failed: %v", err) + return nil, err + } + + ft := caps.FindFeatureType(pwj.FeatureType) + if ft == nil { + return nil, fmt.Errorf("unknown feature type '%s'", pwj.FeatureType) + } + + feedback.Info("Found feature type '%s", pwj.FeatureType) + + epsg, err := wfs.CRSToEPSG(ft.DefaultCRS) + if err != nil { + feedback.Error("Unsupported CRS: '%s'", ft.DefaultCRS) + return nil, err + } + + if nilString(pwj.SortBy) != "" { + feedback.Info("Features will be sorted by '%s'", pwj.SortBy) + } + + dl, err := wfs.GetFeatures(caps, pwj.FeatureType, nilString(pwj.SortBy)) + if err != nil { + feedback.Error("Cannot create GetFeature URLs. %v", err) + return nil, err + } + + var ( + unsupported = stringCounter{} + missingProperties int + badProperties int + dupes int + features int + ) + + consumer, err := pwj.creator.newConsumer(ctx, conn, feedback) + if err != nil { + return nil, err + } + defer consumer.Rollback() + + if err := dl.Download(nilString(pwj.User), nilString(pwj.Password), func(url string, r io.Reader) error { + feedback.Info("Get features from: '%s'", url) + rfc, err := wfs.ParseRawFeatureCollection(r) + if err != nil { + return fmt.Errorf("parsing GetFeature document failed: %v", err) + } + if rfc.CRS != nil { + crsName := rfc.CRS.Properties.Name + if epsg, err = wfs.CRSToEPSG(crsName); err != nil { + feedback.Error("Unsupported CRS: %d", crsName) + return err + } + } + + // No features -> ignore. + if rfc.Features == nil { + return nil + } + + feedback.Info("Using EPSG: %d", epsg) + + for _, feature := range rfc.Features { + if feature.Properties == nil || feature.Geometry.Coordinates == nil { + missingProperties++ + continue + } + + props := consumer.NewProperties() + if err := json.Unmarshal(*feature.Properties, props); err != nil { + badProperties++ + continue + } + + switch feature.Geometry.Type { + case "Point": + var p pointSlice + if err := json.Unmarshal(*feature.Geometry.Coordinates, &p); err != nil { + return err + } + + err := consumer.Consume(p, props, epsg) + switch { + case err == ErrFeatureDuplicated: + dupes++ + case err == ErrFeatureIgnored: + // be silent + case err != nil: + return err + default: + features++ + } + + default: + unsupported[feature.Geometry.Type]++ + } + } + return nil + }); err != nil { + return nil, err + } + + if dupes > 0 { + feedback.Info( + "Features outside responsibility area and duplicates: %d", + dupes) + } + + if badProperties > 0 { + feedback.Warn("Bad properties: %d", badProperties) + } + + if missingProperties > 0 { + feedback.Warn("Missing properties: %d", missingProperties) + } + + if len(unsupported) != 0 { + feedback.Warn("Unsupported types found: %s", unsupported) + } + + if features == 0 { + return nil, UnchangedError("no valid new features found") + } + + feedback.Info("Found %d usable features in data source", features) + + if err = consumer.Commit(); err == nil { + feedback.Info("Storing %d features took %s", + features, time.Since(start)) + } + + return nil, nil +} + +type ( + SQLPointConsumer struct { + ctx context.Context + tx *sql.Tx + feedback Feedback + newProperties func() interface{} + consume func(*SQLPointConsumer, pointSlice, interface{}, int) error + savepoint func(func() error) error + stmts []*sql.Stmt + } +) + +func (spc *SQLPointConsumer) Rollback() error { + if tx := spc.tx; tx != nil { + spc.releaseStmts() + spc.tx = nil + spc.ctx = nil + return tx.Rollback() + } + return nil +} + +func (spc *SQLPointConsumer) Commit() error { + if tx := spc.tx; tx != nil { + spc.releaseStmts() + spc.tx = nil + spc.ctx = nil + return tx.Commit() + } + return nil +} + +func (spc *SQLPointConsumer) NewProperties() interface{} { + return spc.newProperties() +} + +func (spc *SQLPointConsumer) Consume( + points pointSlice, + properties interface{}, + epsg int, +) error { + return spc.consume(spc, points, properties, epsg) +} + +func newSQLConsumer( + init func(*SQLPointConsumer) error, + consume func(*SQLPointConsumer, pointSlice, interface{}, int) error, + newProperties func() interface{}, + +) func(context.Context, *sql.Conn, Feedback) (WFSPointConsumer, error) { + return func(ctx context.Context, conn *sql.Conn, feedback Feedback) (WFSPointConsumer, error) { + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + spc := &SQLPointConsumer{ + ctx: ctx, + tx: tx, + feedback: feedback, + newProperties: newProperties, + consume: consume, + savepoint: Savepoint(ctx, tx, "feature"), + } + if err := init(spc); err != nil { + tx.Rollback() + return nil, err + } + return spc, nil + } +} + +func (spc *SQLPointConsumer) releaseStmts() { + for i := len(spc.stmts); i > 0; i-- { + spc.stmts[i-1].Close() + spc.stmts[i-1] = nil + } + spc.stmts = nil +} + +func prepareStmnts(queries ...string) func(*SQLPointConsumer) error { + return func(spc *SQLPointConsumer) error { + for _, query := range queries { + stmt, err := spc.tx.PrepareContext(spc.ctx, query) + if err != nil { + return err + } + spc.stmts = append(spc.stmts, stmt) + } + return nil + } +}
--- a/pkg/models/imports.go Tue Feb 18 13:00:25 2020 +0100 +++ b/pkg/models/imports.go Wed Feb 19 15:08:35 2020 +0100 @@ -78,11 +78,6 @@ WFSImport } - // FairwayMarksImport specifies an import of fairway marks. - FairwayMarksImport struct { - WFSImport - } - // FairwayDimensionImport specifies an import of the waterway axis. FairwayDimensionImport struct { WFSImport