Mercurial > gemma
changeset 3310:e0dabe7b2fcf
Simplify gauges import
Instead of checking some NOT NULL constraints in an extra loop,
check all of them in the database.
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Fri, 17 May 2019 12:49:28 +0200 |
parents | 80037790032d |
children | 0f6b156cff55 |
files | pkg/imports/errors.go pkg/imports/wg.go |
diffstat | 2 files changed, 238 insertions(+), 262 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/imports/errors.go Fri May 17 12:45:13 2019 +0200 +++ b/pkg/imports/errors.go Fri May 17 12:49:28 2019 +0200 @@ -29,6 +29,7 @@ // Handle PostgreSQL error codes const ( + notNullViolation = "23502" foreignKeyViolation = "23503" noDataFound = "P0002" ) @@ -37,6 +38,21 @@ func (err dbError) Error() string { switch err.Code { + case notNullViolation: + switch err.SchemaName { + case "waterway": + switch err.TableName { + case "gauges": + switch err.ColumnName { + case "objname": + return "Missing objname" + case "geom": + return "Missing lat/lon" + case "zero_point": + return "Missing zeropoint" + } + } + } case foreignKeyViolation: switch err.SchemaName { case "waterway":
--- a/pkg/imports/wg.go Fri May 17 12:45:13 2019 +0200 +++ b/pkg/imports/wg.go Fri May 17 12:49:28 2019 +0200 @@ -172,58 +172,6 @@ return nil, err } - var ignored int - - type idxCode struct { - jdx int - idx int - code *models.Isrs - } - - var gauges []idxCode - - for j, data := range responseData { - for i, dr := range data.RisdataReturn { - if dr.RisidxCode == nil { - ignored++ - continue - } - code, err := models.IsrsFromString(string(*dr.RisidxCode)) - if err != nil { - feedback.Warn("invalid ISRS code %v", err) - ignored++ - continue - } - - if dr.Objname.Loc == nil { - feedback.Warn("missing objname: %s", code) - ignored++ - continue - } - - if dr.Lat == nil || dr.Lon == nil { - feedback.Warn("missing lat/lon: %s", code) - ignored++ - continue - } - - if dr.Zeropoint == nil { - feedback.Warn("missing zeropoint: %s", code) - ignored++ - continue - } - - gauges = append(gauges, idxCode{jdx: j, idx: i, code: code}) - } - } - feedback.Info("Ignored gauges: %d", ignored) - feedback.Info("Further process %d gauges", len(gauges)) - - if len(gauges) == 0 { - return nil, UnchangedError("Nothing to do") - } - - // insert/update the gauges var eraseGaugeStmt, insertStmt, updateStmt, deleteReferenceWaterLevelsStmt, isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt @@ -245,257 +193,269 @@ defer (*x.stmt).Close() } + var gauges []string var unchanged int - for i := range gauges { - ic := &gauges[i] - dr := responseData[ic.jdx].RisdataReturn[ic.idx] - - feedback.Info("Processing %s", ic.code) + for _, data := range responseData { + for _, dr := range data.RisdataReturn { - var from, to sql.NullInt64 - - if dr.Applicabilityfromkm != nil { - from = sql.NullInt64{ - Int64: int64(*dr.Applicabilityfromkm), - Valid: true, + isrs := string(*dr.RisidxCode) + code, err := models.IsrsFromString(isrs) + if err != nil { + feedback.Warn("Invalid ISRS code '%s': %v", isrs, err) + continue } - } - if dr.Applicabilitytokm != nil { - to = sql.NullInt64{ - Int64: int64(*dr.Applicabilitytokm), - Valid: true, - } - } + gauges = append(gauges, isrs) + feedback.Info("Processing %s", code) + + var from, to sql.NullInt64 - var tfrom, tto, dateInfo pgtype.Timestamptz - - if dr.Startdate != nil { - tfrom = pgtype.Timestamptz{ - Time: time.Time(*dr.Startdate), - Status: pgtype.Present, - } - } else { - tfrom = pgtype.Timestamptz{ - Status: pgtype.Null, + if dr.Applicabilityfromkm != nil { + from = sql.NullInt64{ + Int64: int64(*dr.Applicabilityfromkm), + Valid: true, + } } - } + if dr.Applicabilitytokm != nil { + to = sql.NullInt64{ + Int64: int64(*dr.Applicabilitytokm), + Valid: true, + } + } - if dr.Enddate != nil { - tto = pgtype.Timestamptz{ - Time: time.Time(*dr.Enddate), - Status: pgtype.Present, - } - } else { - tto = pgtype.Timestamptz{ - Status: pgtype.Null, - } - } + var tfrom, tto, dateInfo pgtype.Timestamptz - validity := pgtype.Tstzrange{ - Lower: tfrom, - Upper: tto, - LowerType: pgtype.Inclusive, - UpperType: pgtype.Exclusive, - Status: pgtype.Present, - } + if dr.Startdate != nil { + tfrom = pgtype.Timestamptz{ + Time: time.Time(*dr.Startdate), + Status: pgtype.Present, + } + } else { + tfrom = pgtype.Timestamptz{ + Status: pgtype.Null, + } + } - if dr.Infodate != nil { - dateInfo = pgtype.Timestamptz{ - Time: time.Time(*dr.Infodate), - Status: pgtype.Present, + if dr.Enddate != nil { + tto = pgtype.Timestamptz{ + Time: time.Time(*dr.Enddate), + Status: pgtype.Present, + } + } else { + tto = pgtype.Timestamptz{ + Status: pgtype.Null, + } } - } else { - dateInfo = pgtype.Timestamptz{ - Status: pgtype.Null, - } - } - var geodref sql.NullString - if dr.Geodref != nil { - geodref = sql.NullString{ - String: string(*dr.Geodref), - Valid: true, - } - } - - var source sql.NullString - if dr.Source != nil { - source = sql.NullString{ - String: string(*dr.Source), - Valid: true, + validity := pgtype.Tstzrange{ + Lower: tfrom, + Upper: tto, + LowerType: pgtype.Inclusive, + UpperType: pgtype.Exclusive, + Status: pgtype.Present, } - } - - tx, err := conn.BeginTx(ctx, nil) - if err != nil { - return nil, err - } - defer tx.Rollback() - // Mark old entries of gauge as erased, if applicable - if _, err := tx.StmtContext(ctx, eraseGaugeStmt).ExecContext(ctx, - ic.code.String(), - validity, - ); err != nil { - feedback.Warn(handleError(err).Error()) - if err2 := tx.Rollback(); err2 != nil { - return nil, err2 + if dr.Infodate != nil { + dateInfo = pgtype.Timestamptz{ + Time: time.Time(*dr.Infodate), + Status: pgtype.Present, + } + } else { + dateInfo = pgtype.Timestamptz{ + Status: pgtype.Null, + } + } + + var geodref sql.NullString + if dr.Geodref != nil { + geodref = sql.NullString{ + String: string(*dr.Geodref), + Valid: true, + } } - unchanged++ - continue - } + + var source sql.NullString + if dr.Source != nil { + source = sql.NullString{ + String: string(*dr.Source), + Valid: true, + } + } + + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return nil, err + } + defer tx.Rollback() - // Try to insert gauge entry - var dummy int - err = tx.StmtContext(ctx, insertStmt).QueryRowContext(ctx, - ic.code.CountryCode, - ic.code.LoCode, - ic.code.FairwaySection, - ic.code.Orc, - ic.code.Hectometre, - string(*dr.Objname.Loc), - float64(*dr.Lon), float64(*dr.Lat), - from, - to, - &validity, - float64(*dr.Zeropoint), - geodref, - &dateInfo, - source, - time.Time(*dr.Lastupdate), - ).Scan(&dummy) - switch { - case err == sql.ErrNoRows: - // Assume constraint conflict, try to update - err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx, - ic.code.CountryCode, - ic.code.LoCode, - ic.code.FairwaySection, - ic.code.Orc, - ic.code.Hectometre, - string(*dr.Objname.Loc), - float64(*dr.Lon), float64(*dr.Lat), + // Mark old entries of gauge as erased, if applicable + if _, err := tx.StmtContext(ctx, eraseGaugeStmt).ExecContext(ctx, + code.String(), + validity, + ); err != nil { + feedback.Warn(handleError(err).Error()) + if err2 := tx.Rollback(); err2 != nil { + return nil, err2 + } + unchanged++ + continue + } + + // Try to insert gauge entry + var dummy int + err = tx.StmtContext(ctx, insertStmt).QueryRowContext(ctx, + code.CountryCode, + code.LoCode, + code.FairwaySection, + code.Orc, + code.Hectometre, + dr.Objname.Loc, + dr.Lon, dr.Lat, from, to, - float64(*dr.Zeropoint), + &validity, + dr.Zeropoint, geodref, &dateInfo, source, time.Time(*dr.Lastupdate), ).Scan(&dummy) switch { - case err2 == sql.ErrNoRows: - feedback.Info("unchanged") - if err3 := tx.Rollback(); err3 != nil { - return nil, err3 + case err == sql.ErrNoRows: + // Assume constraint conflict, try to update + err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx, + code.CountryCode, + code.LoCode, + code.FairwaySection, + code.Orc, + code.Hectometre, + dr.Objname.Loc, + dr.Lon, dr.Lat, + from, + to, + dr.Zeropoint, + geodref, + &dateInfo, + source, + time.Time(*dr.Lastupdate), + ).Scan(&dummy) + switch { + case err2 == sql.ErrNoRows: + feedback.Info("unchanged") + if err3 := tx.Rollback(); err3 != nil { + return nil, err3 + } + unchanged++ + continue + case err2 != nil: + feedback.Warn(handleError(err2).Error()) + if err3 := tx.Rollback(); err3 != nil { + return nil, err3 + } + unchanged++ + continue + default: + feedback.Info("update") } - unchanged++ - continue - case err2 != nil: - feedback.Warn(handleError(err2).Error()) - if err3 := tx.Rollback(); err3 != nil { - return nil, err3 + + // Remove obsolete reference water levels + var currLevels pgtype.VarcharArray + currLevels.Set([]string{ + string(*dr.Reflevel1code), + string(*dr.Reflevel2code), + string(*dr.Reflevel3code), + }) + rwls, err := tx.StmtContext(ctx, + deleteReferenceWaterLevelsStmt).QueryContext(ctx, + code.String(), + &validity, + &currLevels, + ) + if err != nil { + return nil, err + } + defer rwls.Close() + for rwls.Next() { + var delRef string + if err := rwls.Scan(&delRef); err != nil { + return nil, err + } + feedback.Warn("Removed reference water level %s from %s", + delRef, code) + } + if err := rwls.Err(); err != nil { + return nil, err + } + case err != nil: + feedback.Warn(handleError(err).Error()) + if err2 := tx.Rollback(); err2 != nil { + return nil, err2 } unchanged++ continue default: - feedback.Info("update") + feedback.Info("insert new version") } - // Remove obsolete reference water levels - var currLevels pgtype.VarcharArray - currLevels.Set([]string{ - string(*dr.Reflevel1code), - string(*dr.Reflevel2code), - string(*dr.Reflevel3code), - }) - rwls, err := tx.StmtContext(ctx, - deleteReferenceWaterLevelsStmt).QueryContext(ctx, - ic.code.String(), - &validity, - &currLevels, - ) - if err != nil { - return nil, err - } - defer rwls.Close() - for rwls.Next() { - var delRef string - if err := rwls.Scan(&delRef); err != nil { + // "Upsert" reference water levels + for _, wl := range []struct { + level **erdms.RisreflevelcodeType + value **erdms.RisreflevelvalueType + }{ + {&dr.Reflevel1code, &dr.Reflevel1value}, + {&dr.Reflevel2code, &dr.Reflevel2value}, + {&dr.Reflevel3code, &dr.Reflevel3value}, + } { + if *wl.level == nil || *wl.value == nil { + continue + } + + var isNtSDepthRef bool + if err := tx.StmtContext( + ctx, isNtSDepthRefStmt).QueryRowContext(ctx, + string(**wl.level), + ).Scan( + &isNtSDepthRef, + ); err != nil { return nil, err } - feedback.Warn("Removed reference water level %s from %s", - delRef, ic.code) + if !isNtSDepthRef { + feedback.Warn( + "Reference level code '%s' is not in line "+ + "with the NtS reference_code table", + string(**wl.level)) + } + + if _, err := tx.StmtContext( + ctx, insertWaterLevelStmt).ExecContext(ctx, + code.CountryCode, + code.LoCode, + code.FairwaySection, + code.Orc, + code.Hectometre, + &validity, + string(**wl.level), + int64(**wl.value), + ); err != nil { + feedback.Warn(handleError(err).Error()) + tx.Rollback() + continue + } } - if err := rwls.Err(); err != nil { + + if err = tx.Commit(); err != nil { return nil, err } - case err != nil: - feedback.Warn(handleError(err).Error()) - if err2 := tx.Rollback(); err2 != nil { - return nil, err2 - } - unchanged++ - continue - default: - feedback.Info("insert new version") - } - - // "Upsert" reference water levels - for _, wl := range []struct { - level **erdms.RisreflevelcodeType - value **erdms.RisreflevelvalueType - }{ - {&dr.Reflevel1code, &dr.Reflevel1value}, - {&dr.Reflevel2code, &dr.Reflevel2value}, - {&dr.Reflevel3code, &dr.Reflevel3value}, - } { - if *wl.level == nil || *wl.value == nil { - continue - } - - var isNtSDepthRef bool - if err := tx.StmtContext(ctx, isNtSDepthRefStmt).QueryRowContext( - ctx, - string(**wl.level), - ).Scan( - &isNtSDepthRef, - ); err != nil { - return nil, err - } - if !isNtSDepthRef { - feedback.Warn( - "Reference level code '%s' is not in line "+ - "with the NtS reference_code table", - string(**wl.level)) - } - - if _, err := tx.StmtContext(ctx, insertWaterLevelStmt).ExecContext( - ctx, - ic.code.CountryCode, - ic.code.LoCode, - ic.code.FairwaySection, - ic.code.Orc, - ic.code.Hectometre, - &validity, - string(**wl.level), - int64(**wl.value), - ); err != nil { - feedback.Warn(handleError(err).Error()) - tx.Rollback() - continue - } - } - - if err = tx.Commit(); err != nil { - return nil, err } } feedback.Info("Importing gauges took %s", time.Since(start)) + if len(gauges) == 0 { + return nil, UnchangedError("No gauges returned from ERDMS") + } + if unchanged == len(gauges) { return nil, UnchangedError("All gauges unchanged") }