# HG changeset patch # User Sascha L. Teichmann # Date 1564738456 -7200 # Node ID f464cbcdf2f2e7350d3986869840d92dcbfa5926 # Parent 6095e95db454d333c3383961698a72b1ab221195 BN import: More clean-ups. Typo fixes. Code simplifications. Calling conventions ... diff -r 6095e95db454 -r f464cbcdf2f2 pkg/imports/agm.go --- a/pkg/imports/agm.go Fri Aug 02 11:11:32 2019 +0200 +++ b/pkg/imports/agm.go Fri Aug 02 11:34:16 2019 +0200 @@ -41,7 +41,7 @@ Originator string `json:"originator"` } -// GMAPJobKind is the unique name of an approved gauge measurements import job. +// AGMJobKind is the unique name of an approved gauge measurements import job. const AGMJobKind JobKind = "agm" type agmJobCreator struct{} diff -r 6095e95db454 -r f464cbcdf2f2 pkg/imports/bn.go --- a/pkg/imports/bn.go Fri Aug 02 11:11:32 2019 +0200 +++ b/pkg/imports/bn.go Fri Aug 02 11:34:16 2019 +0200 @@ -24,10 +24,11 @@ "strings" "time" + "github.com/jackc/pgx/pgtype" + "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/pgxutils" "gemma.intevation.de/gemma/pkg/soap/ifbn" - "github.com/jackc/pgx/pgtype" ) // Bottleneck is an import job to import @@ -134,9 +135,7 @@ INSERT INTO waterway.bottlenecks_riverbed_materials ( bottleneck_id, riverbed -) SELECT * -FROM unnest(CAST($1 AS int[])) AS bns, - unnest(CAST($2 AS varchar[])) AS materials +) VALUES ($1, $2) ON CONFLICT (bottleneck_id, riverbed) DO NOTHING ` @@ -303,7 +302,7 @@ } var nids []string - seenOldBnIds := make(map[int64]bool) + seenOldBnIDs := make(map[int64]bool) feedback.Info("Tolerance used to snap waterway axis: %g", tolerance) @@ -313,11 +312,15 @@ } defer tx.Rollback() + savepoint := Savepoint(ctx, tx, "insert_bottlenck") + for _, bn := range bns { if err := storeBottleneck( - tx, ctx, importID, feedback, bn, - &nids, seenOldBnIds, tolerance, + ctx, tx, + importID, feedback, bn, + &nids, seenOldBnIDs, tolerance, &bs, + savepoint, ); err != nil { return nil, err } @@ -341,20 +344,23 @@ } func storeBottleneck( + ctx context.Context, tx *sql.Tx, - ctx context.Context, importID int64, feedback Feedback, bn *ifbn.BottleNeckType, nids *[]string, - seenOldBnIds map[int64]bool, + seenOldBnIDs map[int64]bool, tolerance float64, bs *bnStmts, + savepoint func(func() error) error, ) error { feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id) - var tfrom, tto pgtype.Timestamptz - var uBound pgtype.BoundType + var ( + tfrom, tto pgtype.Timestamptz + uBound pgtype.BoundType + ) if bn.AdditionalData == nil || bn.AdditionalData.KeyValuePair == nil { // This is a workaround for the fact that most BN data does not @@ -379,13 +385,12 @@ ) fromTo := make(map[string]time.Time) for _, kv := range bn.AdditionalData.KeyValuePair { - k := string(kv.Key) - if k == fromKey || k == toKey { - if t, err := time.Parse(time.RFC3339, kv.Value); err != nil { + if kv.Key == fromKey || kv.Key == toKey { + t, err := time.Parse(time.RFC3339, kv.Value) + if err != nil { return err - } else { - fromTo[k] = t } + fromTo[kv.Key] = t } } @@ -451,7 +456,7 @@ } } - // Check if an bottleneck identical to the one we would insert already + // Check if a bottleneck identical to the one we would insert already // exists: var old int64 err := tx.StmtContext(ctx, bs.findExactMatch).QueryRowContext( @@ -473,7 +478,7 @@ default: // We could check if the materials are also matching -- but per // specification the Date_Info would hvae to change on that kind of - // change anyway. So actualy we ar alreayd checking more in dpth than + // change anyway. So actually we are already checking more in depth than // required. feedback.Info("unchanged") return nil @@ -496,14 +501,14 @@ // Mark old intersecting bottleneck data for deletion. Don't worry about // materials, they will be deleted via cascading. - var oldBnIds []int64 + var oldBnIDs []int64 for bns.Next() { var oldID int64 err := bns.Scan(&oldID) if err != nil { return err } - oldBnIds = append(oldBnIds, oldID) + oldBnIDs = append(oldBnIDs, oldID) } if err := bns.Err(); err != nil { @@ -511,11 +516,11 @@ } switch { - case len(oldBnIds) == 1: - feedback.Info("Bottelneck '%s' "+ + case len(oldBnIDs) == 1: + feedback.Info("Bottleneck '%s' "+ "with intersecting validity already exists: "+ "UPDATING", bn.Bottleneck_id) - case len(oldBnIds) > 1: + case len(oldBnIDs) > 1: // This case is unexpected and should only happen when historic // data in the bottleneck service was changed subsequently... // We handle it gracefully anyway, but warn. @@ -524,16 +529,15 @@ "REPLACING all of them!", bn.Bottleneck_id) } // We write the actual tracking information for deletion of superseded - // bottlenecks later to the databs -- AFTER the new bottleneck was - // created successfully. That way, we don't change the database, when + // bottlenecks later to the database -- AFTER the new bottleneck was + // created successfully. That way, we don't change the database, when // an error arises during inserting the new data. - var bnIds []int64 + var bnID int64 // Add new BN data: - savepoint := Savepoint(ctx, tx, "insert_bottlenck") - - err = savepoint(func() error { - bns, err := tx.StmtContext(ctx, bs.insert).QueryContext(ctx, + if err := savepoint(func() error { + if err := tx.StmtContext(ctx, bs.insert).QueryRowContext( + ctx, bn.Bottleneck_id, &validity, bn.Fk_g_fid, @@ -547,68 +551,47 @@ limiting, bn.Date_Info, bn.Source, - tolerance) - if err != nil { - return err - } - defer bns.Close() - for bns.Next() { - var nid int64 - if err := bns.Scan(&nid); err != nil { - return err - } - bnIds = append(bnIds, nid) - } - if err := bns.Err(); err != nil { + tolerance, + ).Scan(&bnID); err != nil { return err } // Add new materials - if len(bnIds) > 0 && materials != nil { - var ( - pgBnIds pgtype.Int8Array - pgMaterials pgtype.VarcharArray - ) - pgBnIds.Set(bnIds) - pgMaterials.Set(materials) - - // Insert riverbed materials - if _, err := tx.StmtContext(ctx, bs.insertMaterial).ExecContext(ctx, - - &pgBnIds, - &pgMaterials, + for _, material := range materials { + if _, err := tx.StmtContext(ctx, bs.insertMaterial).ExecContext( + ctx, + bnID, + material, ); err != nil { return err } } return nil - }) - if err != nil { + }); err != nil { feedback.Warn(pgxutils.ReadableError{err}.Error()) return nil } // Now that adding BNs to staging was successful, write import tracking // information to database: - for _, oldID := range oldBnIds { + for _, oldID := range oldBnIDs { // It is possible, that two new bottlenecks intersect with the - // same old noe, therefor we have to handle duplicates in + // same old one, therefore we have to handle duplicates in // oldBnIds. - if !seenOldBnIds[oldID] { + if !seenOldBnIDs[oldID] { if _, err := tx.StmtContext(ctx, bs.track).ExecContext( ctx, importID, "waterway.bottlenecks", oldID, true, ); err != nil { return err } - seenOldBnIds[oldID] = true + seenOldBnIDs[oldID] = true } } - for _, nid := range bnIds { - if _, err := tx.StmtContext(ctx, bs.track).ExecContext( - ctx, importID, "waterway.bottlenecks", nid, false, - ); err != nil { - return err - } + + if _, err := tx.StmtContext(ctx, bs.track).ExecContext( + ctx, importID, "waterway.bottlenecks", bnID, false, + ); err != nil { + return err } *nids = append(*nids, bn.Bottleneck_id) diff -r 6095e95db454 -r f464cbcdf2f2 pkg/imports/misc.go --- a/pkg/imports/misc.go Fri Aug 02 11:11:32 2019 +0200 +++ b/pkg/imports/misc.go Fri Aug 02 11:34:16 2019 +0200 @@ -55,9 +55,7 @@ } } }() - err = fn() - - if err == nil { + if err = fn(); err == nil { done = true _, err = tx.ExecContext(ctx, release) }