Mercurial > gemma
changeset 5620:165e77c5736a erdms2
Simplified WG import logging.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Mon, 28 Nov 2022 18:17:11 +0100 |
parents | f0413b20ad4d |
children | cf1e8ffe1ed5 |
files | pkg/imports/wg.go |
diffstat | 1 files changed, 210 insertions(+), 178 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/imports/wg.go Mon Nov 28 17:40:44 2022 +0100 +++ b/pkg/imports/wg.go Mon Nov 28 18:17:11 2022 +0100 @@ -17,6 +17,8 @@ import ( "context" "database/sql" + "errors" + "strings" "time" "github.com/jackc/pgx/pgtype" @@ -177,6 +179,8 @@ ` ) +var continueErr = errors.New("continue") + // Do implements the actual import. func (wg *WaterwayGauge) Do( ctx context.Context, @@ -226,17 +230,20 @@ var gauges []string var unchanged int + var invalidISRS, startEndOrder, missingObjname, missingZeropoint []string + + databaseErrors := map[string][]string{} + for _, data := range responseData { for _, dr := range data.RisdataReturn { isrs := string(*dr.RisidxCode) code, err := models.IsrsFromString(isrs) if err != nil { - feedback.Warn("Invalid ISRS code '%s': %v", isrs, err) + invalidISRS = append(invalidISRS, isrs) continue } gauges = append(gauges, isrs) - feedback.Info("Processing %s", code) // We need a valid, non-empty time range to identify gauge versions if dr.Enddate != nil && dr.Startdate != nil { @@ -244,12 +251,24 @@ sd := dr.Startdate.ToGoTime() // log.Debugf("start date: %v end date: %v\n", sd, ed) if !ed.After(sd) { - feedback.Error("End date not after start date") + startEndOrder = append(startEndOrder, isrs) unchanged++ continue } } + if dr.Zeropoint == nil { + missingZeropoint = append(missingZeropoint, isrs) + unchanged++ + continue + } + + if dr.Objname.Loc == nil { + missingObjname = append(missingObjname, isrs) + unchanged++ + continue + } + var from, to sql.NullInt64 if dr.Applicabilityfromkm != nil { @@ -324,199 +343,212 @@ } } - tx, err := conn.BeginTx(ctx, nil) - if err != nil { - return nil, err - } - defer tx.Rollback() - - // Mark old entry of gauge as erased, if applicable - var isNew bool - err = tx.StmtContext(ctx, eraseGaugeStmt).QueryRowContext(ctx, - code.String(), - validity, - ).Scan(&isNew) - switch { - case err != nil: - feedback.Error(pgxutils.ReadableError{Err: err}.Error()) - if err2 := tx.Rollback(); err2 != nil { - return nil, err2 + err = func() error { + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return err } - unchanged++ - continue - case isNew: - var lu *time.Time - if dr.Lastupdate != nil { - t := dr.Lastupdate.ToGoTime() - lu = &t - } - // insert gauge version entry - if _, err = tx.StmtContext(ctx, insertStmt).ExecContext(ctx, - code.CountryCode, - code.LoCode, - code.FairwaySection, - code.Orc, - code.Hectometre, - dr.Objname.Loc, - dr.Lon, dr.Lat, - from, - to, - &validity, - dr.Zeropoint, - geodref, - &dateInfo, - source, - lu, - ); err != nil { + defer tx.Rollback() + + // Mark old entry of gauge as erased, if applicable + var isNew bool + err = tx.StmtContext(ctx, eraseGaugeStmt).QueryRowContext(ctx, + code.String(), + validity, + ).Scan(&isNew) + switch { + case err != nil: feedback.Error(pgxutils.ReadableError{Err: err}.Error()) - if err2 := tx.Rollback(); err2 != nil { - return nil, err2 + unchanged++ + return continueErr + case isNew: + var lu *time.Time + if dr.Lastupdate != nil { + t := dr.Lastupdate.ToGoTime() + lu = &t + } + // insert gauge version entry + if _, err = tx.StmtContext(ctx, insertStmt).ExecContext(ctx, + code.CountryCode, + code.LoCode, + code.FairwaySection, + code.Orc, + code.Hectometre, + dr.Objname.Loc, + dr.Lon, dr.Lat, + from, + to, + &validity, + dr.Zeropoint, + geodref, + &dateInfo, + source, + lu, + ); err != nil { + key := pgxutils.ReadableError{Err: err}.Error() + databaseErrors[key] = append(databaseErrors[key], isrs) + return continueErr + } + //feedback.Info("insert new version") + case !isNew: + var lu *time.Time + if dr.Lastupdate != nil { + t := dr.Lastupdate.ToGoTime() + lu = &t } - unchanged++ - continue - } - feedback.Info("insert new version") - case !isNew: - var lu *time.Time - if dr.Lastupdate != nil { - t := dr.Lastupdate.ToGoTime() - lu = &t - } - // try to update - var dummy int - err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx, - code.CountryCode, - code.LoCode, - code.FairwaySection, - code.Orc, - code.Hectometre, - dr.Objname.Loc, - dr.Lon, dr.Lat, - from, - to, - dr.Zeropoint, - geodref, - &dateInfo, - source, - lu, - &validity, - ).Scan(&dummy) - switch { - case err2 == sql.ErrNoRows: - feedback.Info("unchanged") - if err3 := tx.Rollback(); err3 != nil { - return nil, err3 + // try to update + var dummy int + err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx, + code.CountryCode, + code.LoCode, + code.FairwaySection, + code.Orc, + code.Hectometre, + dr.Objname.Loc, + dr.Lon, dr.Lat, + from, + to, + dr.Zeropoint, + geodref, + &dateInfo, + source, + lu, + &validity, + ).Scan(&dummy) + switch { + case err2 == sql.ErrNoRows: + //feedback.Info("unchanged") + unchanged++ + return continueErr + case err2 != nil: + key := pgxutils.ReadableError{Err: err}.Error() + databaseErrors[key] = append(databaseErrors[key], isrs) + unchanged++ + return continueErr + default: + //feedback.Info("update") } - unchanged++ - continue - case err2 != nil: - feedback.Error(pgxutils.ReadableError{Err: err2}.Error()) - if err3 := tx.Rollback(); err3 != nil { - return nil, err3 + + // Remove obsolete reference water levels + var currLevels pgtype.VarcharArray + currLevels.Set([]string{ + string(*dr.Reflevel1code), + string(*dr.Reflevel2code), + string(*dr.Reflevel3code), + }) + rwls, err := tx.StmtContext(ctx, + deleteReferenceWaterLevelsStmt).QueryContext(ctx, + code.String(), + &validity, + &currLevels, + ) + if err != nil { + return err } - unchanged++ - continue - default: - feedback.Info("update") + defer rwls.Close() + for rwls.Next() { + var delRef string + if err := rwls.Scan(&delRef); err != nil { + return err + } + feedback.Warn("Removed reference water level %s from %s", + delRef, code) + } + if err := rwls.Err(); err != nil { + return err + } } - // Remove obsolete reference water levels - var currLevels pgtype.VarcharArray - currLevels.Set([]string{ - string(*dr.Reflevel1code), - string(*dr.Reflevel2code), - string(*dr.Reflevel3code), - }) - rwls, err := tx.StmtContext(ctx, - deleteReferenceWaterLevelsStmt).QueryContext(ctx, + // Set end of validity of old version to start of new version + // in case of overlap + if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext( + ctx, code.String(), &validity, - &currLevels, - ) - if err != nil { - return nil, err - } - defer rwls.Close() - for rwls.Next() { - var delRef string - if err := rwls.Scan(&delRef); err != nil { - return nil, err - } - feedback.Warn("Removed reference water level %s from %s", - delRef, code) - } - if err := rwls.Err(); err != nil { - return nil, err - } - } - - // Set end of validity of old version to start of new version - // in case of overlap - if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext( - ctx, - code.String(), - &validity, - ); err != nil { - feedback.Error(pgxutils.ReadableError{Err: err}.Error()) - if err2 := tx.Rollback(); err2 != nil { - return nil, err2 - } - unchanged++ - continue - } - - // "Upsert" reference water levels - for _, wl := range []struct { - level **erdms.RisreflevelcodeType - value **erdms.RisreflevelvalueType - }{ - {&dr.Reflevel1code, &dr.Reflevel1value}, - {&dr.Reflevel2code, &dr.Reflevel2value}, - {&dr.Reflevel3code, &dr.Reflevel3value}, - } { - if *wl.level == nil || *wl.value == nil { - continue + ); err != nil { + key := pgxutils.ReadableError{Err: err}.Error() + databaseErrors[key] = append(databaseErrors[key], isrs) + unchanged++ + return continueErr } - var isNtSDepthRef bool - if err := tx.StmtContext( - ctx, isNtSDepthRefStmt).QueryRowContext(ctx, - string(**wl.level), - ).Scan( - &isNtSDepthRef, - ); err != nil { - return nil, err - } - if !isNtSDepthRef { - feedback.Warn( - "Reference level code '%s' is not in line "+ - "with the NtS reference_code table", - string(**wl.level)) + // "Upsert" reference water levels + for _, wl := range []struct { + level **erdms.RisreflevelcodeType + value **erdms.RisreflevelvalueType + }{ + {&dr.Reflevel1code, &dr.Reflevel1value}, + {&dr.Reflevel2code, &dr.Reflevel2value}, + {&dr.Reflevel3code, &dr.Reflevel3value}, + } { + if *wl.level == nil || *wl.value == nil { + continue + } + + var isNtSDepthRef bool + if err := tx.StmtContext( + ctx, isNtSDepthRefStmt).QueryRowContext(ctx, + string(**wl.level), + ).Scan( + &isNtSDepthRef, + ); err != nil { + return err + } + if !isNtSDepthRef { + feedback.Warn( + "Reference level code '%s' is not in line "+ + "with the NtS reference_code table", + string(**wl.level)) + } + + if _, err := tx.StmtContext( + ctx, insertWaterLevelStmt).ExecContext(ctx, + code.CountryCode, + code.LoCode, + code.FairwaySection, + code.Orc, + code.Hectometre, + &validity, + string(**wl.level), + int64(**wl.value), + ); err != nil { + key := pgxutils.ReadableError{Err: err}.Error() + databaseErrors[key] = append(databaseErrors[key], isrs) + continue + } } - if _, err := tx.StmtContext( - ctx, insertWaterLevelStmt).ExecContext(ctx, - code.CountryCode, - code.LoCode, - code.FairwaySection, - code.Orc, - code.Hectometre, - &validity, - string(**wl.level), - int64(**wl.value), - ); err != nil { - feedback.Error(pgxutils.ReadableError{Err: err}.Error()) - tx.Rollback() - continue - } - } + return tx.Commit() + }() - if err = tx.Commit(); err != nil { - return nil, err + if err != nil && err != continueErr { + return err, nil } } } + if len(invalidISRS) > 0 { + feedback.Error("Invalid ISRS code: '%s'", strings.Join(invalidISRS, "', '")) + } + + if len(startEndOrder) > 0 { + feedback.Error("start date not before end date: %s", + strings.Join(startEndOrder, ", ")) + } + + if len(databaseErrors) > 0 { + for err, iris := range databaseErrors { + feedback.Error("%s: %s", err, strings.Join(iris, ", ")) + } + } + + if len(missingObjname) > 0 { + feedback.Error("Missing zeropoint: %s", strings.Join(missingZeropoint, ", ")) + } + if len(missingObjname) > 0 { + feedback.Error("Missing objname: %s", strings.Join(missingObjname, ", ")) + } + if len(gauges) == 0 { return nil, UnchangedError("No gauges returned from ERDMS") }