comparison pkg/imports/wg.go @ 5620:165e77c5736a erdms2

Simplified WG import logging.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 28 Nov 2022 18:17:11 +0100
parents f6179b31e0a9
children cf1e8ffe1ed5
comparison
equal deleted inserted replaced
5619:f0413b20ad4d 5620:165e77c5736a
15 package imports 15 package imports
16 16
17 import ( 17 import (
18 "context" 18 "context"
19 "database/sql" 19 "database/sql"
20 "errors"
21 "strings"
20 "time" 22 "time"
21 23
22 "github.com/jackc/pgx/pgtype" 24 "github.com/jackc/pgx/pgtype"
23 25
24 "gemma.intevation.de/gemma/pkg/models" 26 "gemma.intevation.de/gemma/pkg/models"
175 ) ON CONFLICT (location, validity, depth_reference) DO UPDATE SET 177 ) ON CONFLICT (location, validity, depth_reference) DO UPDATE SET
176 value = EXCLUDED.value 178 value = EXCLUDED.value
177 ` 179 `
178 ) 180 )
179 181
182 var continueErr = errors.New("continue")
183
180 // Do implements the actual import. 184 // Do implements the actual import.
181 func (wg *WaterwayGauge) Do( 185 func (wg *WaterwayGauge) Do(
182 ctx context.Context, 186 ctx context.Context,
183 importID int64, 187 importID int64,
184 conn *sql.Conn, 188 conn *sql.Conn,
224 } 228 }
225 229
226 var gauges []string 230 var gauges []string
227 var unchanged int 231 var unchanged int
228 232
233 var invalidISRS, startEndOrder, missingObjname, missingZeropoint []string
234
235 databaseErrors := map[string][]string{}
236
229 for _, data := range responseData { 237 for _, data := range responseData {
230 for _, dr := range data.RisdataReturn { 238 for _, dr := range data.RisdataReturn {
231 239
232 isrs := string(*dr.RisidxCode) 240 isrs := string(*dr.RisidxCode)
233 code, err := models.IsrsFromString(isrs) 241 code, err := models.IsrsFromString(isrs)
234 if err != nil { 242 if err != nil {
235 feedback.Warn("Invalid ISRS code '%s': %v", isrs, err) 243 invalidISRS = append(invalidISRS, isrs)
236 continue 244 continue
237 } 245 }
238 gauges = append(gauges, isrs) 246 gauges = append(gauges, isrs)
239 feedback.Info("Processing %s", code)
240 247
241 // We need a valid, non-empty time range to identify gauge versions 248 // We need a valid, non-empty time range to identify gauge versions
242 if dr.Enddate != nil && dr.Startdate != nil { 249 if dr.Enddate != nil && dr.Startdate != nil {
243 ed := dr.Enddate.ToGoTime() 250 ed := dr.Enddate.ToGoTime()
244 sd := dr.Startdate.ToGoTime() 251 sd := dr.Startdate.ToGoTime()
245 // log.Debugf("start date: %v end date: %v\n", sd, ed) 252 // log.Debugf("start date: %v end date: %v\n", sd, ed)
246 if !ed.After(sd) { 253 if !ed.After(sd) {
247 feedback.Error("End date not after start date") 254 startEndOrder = append(startEndOrder, isrs)
248 unchanged++ 255 unchanged++
249 continue 256 continue
250 } 257 }
258 }
259
260 if dr.Zeropoint == nil {
261 missingZeropoint = append(missingZeropoint, isrs)
262 unchanged++
263 continue
264 }
265
266 if dr.Objname.Loc == nil {
267 missingObjname = append(missingObjname, isrs)
268 unchanged++
269 continue
251 } 270 }
252 271
253 var from, to sql.NullInt64 272 var from, to sql.NullInt64
254 273
255 if dr.Applicabilityfromkm != nil { 274 if dr.Applicabilityfromkm != nil {
322 String: string(*dr.Source), 341 String: string(*dr.Source),
323 Valid: true, 342 Valid: true,
324 } 343 }
325 } 344 }
326 345
327 tx, err := conn.BeginTx(ctx, nil) 346 err = func() error {
328 if err != nil { 347 tx, err := conn.BeginTx(ctx, nil)
329 return nil, err 348 if err != nil {
330 } 349 return err
331 defer tx.Rollback() 350 }
332 351 defer tx.Rollback()
333 // Mark old entry of gauge as erased, if applicable 352
334 var isNew bool 353 // Mark old entry of gauge as erased, if applicable
335 err = tx.StmtContext(ctx, eraseGaugeStmt).QueryRowContext(ctx, 354 var isNew bool
336 code.String(), 355 err = tx.StmtContext(ctx, eraseGaugeStmt).QueryRowContext(ctx,
337 validity, 356 code.String(),
338 ).Scan(&isNew) 357 validity,
339 switch { 358 ).Scan(&isNew)
340 case err != nil: 359 switch {
341 feedback.Error(pgxutils.ReadableError{Err: err}.Error()) 360 case err != nil:
342 if err2 := tx.Rollback(); err2 != nil {
343 return nil, err2
344 }
345 unchanged++
346 continue
347 case isNew:
348 var lu *time.Time
349 if dr.Lastupdate != nil {
350 t := dr.Lastupdate.ToGoTime()
351 lu = &t
352 }
353 // insert gauge version entry
354 if _, err = tx.StmtContext(ctx, insertStmt).ExecContext(ctx,
355 code.CountryCode,
356 code.LoCode,
357 code.FairwaySection,
358 code.Orc,
359 code.Hectometre,
360 dr.Objname.Loc,
361 dr.Lon, dr.Lat,
362 from,
363 to,
364 &validity,
365 dr.Zeropoint,
366 geodref,
367 &dateInfo,
368 source,
369 lu,
370 ); err != nil {
371 feedback.Error(pgxutils.ReadableError{Err: err}.Error()) 361 feedback.Error(pgxutils.ReadableError{Err: err}.Error())
372 if err2 := tx.Rollback(); err2 != nil {
373 return nil, err2
374 }
375 unchanged++ 362 unchanged++
376 continue 363 return continueErr
377 } 364 case isNew:
378 feedback.Info("insert new version") 365 var lu *time.Time
379 case !isNew: 366 if dr.Lastupdate != nil {
380 var lu *time.Time 367 t := dr.Lastupdate.ToGoTime()
381 if dr.Lastupdate != nil { 368 lu = &t
382 t := dr.Lastupdate.ToGoTime() 369 }
383 lu = &t 370 // insert gauge version entry
384 } 371 if _, err = tx.StmtContext(ctx, insertStmt).ExecContext(ctx,
385 // try to update 372 code.CountryCode,
386 var dummy int 373 code.LoCode,
387 err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx, 374 code.FairwaySection,
388 code.CountryCode, 375 code.Orc,
389 code.LoCode, 376 code.Hectometre,
390 code.FairwaySection, 377 dr.Objname.Loc,
391 code.Orc, 378 dr.Lon, dr.Lat,
392 code.Hectometre, 379 from,
393 dr.Objname.Loc, 380 to,
394 dr.Lon, dr.Lat, 381 &validity,
395 from, 382 dr.Zeropoint,
396 to, 383 geodref,
397 dr.Zeropoint, 384 &dateInfo,
398 geodref, 385 source,
399 &dateInfo, 386 lu,
400 source, 387 ); err != nil {
401 lu, 388 key := pgxutils.ReadableError{Err: err}.Error()
402 &validity, 389 databaseErrors[key] = append(databaseErrors[key], isrs)
403 ).Scan(&dummy) 390 return continueErr
404 switch { 391 }
405 case err2 == sql.ErrNoRows: 392 //feedback.Info("insert new version")
406 feedback.Info("unchanged") 393 case !isNew:
407 if err3 := tx.Rollback(); err3 != nil { 394 var lu *time.Time
408 return nil, err3 395 if dr.Lastupdate != nil {
409 } 396 t := dr.Lastupdate.ToGoTime()
410 unchanged++ 397 lu = &t
411 continue 398 }
412 case err2 != nil: 399 // try to update
413 feedback.Error(pgxutils.ReadableError{Err: err2}.Error()) 400 var dummy int
414 if err3 := tx.Rollback(); err3 != nil { 401 err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx,
415 return nil, err3 402 code.CountryCode,
416 } 403 code.LoCode,
417 unchanged++ 404 code.FairwaySection,
418 continue 405 code.Orc,
419 default: 406 code.Hectometre,
420 feedback.Info("update") 407 dr.Objname.Loc,
421 } 408 dr.Lon, dr.Lat,
422 409 from,
423 // Remove obsolete reference water levels 410 to,
424 var currLevels pgtype.VarcharArray 411 dr.Zeropoint,
425 currLevels.Set([]string{ 412 geodref,
426 string(*dr.Reflevel1code), 413 &dateInfo,
427 string(*dr.Reflevel2code), 414 source,
428 string(*dr.Reflevel3code), 415 lu,
429 }) 416 &validity,
430 rwls, err := tx.StmtContext(ctx, 417 ).Scan(&dummy)
431 deleteReferenceWaterLevelsStmt).QueryContext(ctx, 418 switch {
419 case err2 == sql.ErrNoRows:
420 //feedback.Info("unchanged")
421 unchanged++
422 return continueErr
423 case err2 != nil:
424 key := pgxutils.ReadableError{Err: err}.Error()
425 databaseErrors[key] = append(databaseErrors[key], isrs)
426 unchanged++
427 return continueErr
428 default:
429 //feedback.Info("update")
430 }
431
432 // Remove obsolete reference water levels
433 var currLevels pgtype.VarcharArray
434 currLevels.Set([]string{
435 string(*dr.Reflevel1code),
436 string(*dr.Reflevel2code),
437 string(*dr.Reflevel3code),
438 })
439 rwls, err := tx.StmtContext(ctx,
440 deleteReferenceWaterLevelsStmt).QueryContext(ctx,
441 code.String(),
442 &validity,
443 &currLevels,
444 )
445 if err != nil {
446 return err
447 }
448 defer rwls.Close()
449 for rwls.Next() {
450 var delRef string
451 if err := rwls.Scan(&delRef); err != nil {
452 return err
453 }
454 feedback.Warn("Removed reference water level %s from %s",
455 delRef, code)
456 }
457 if err := rwls.Err(); err != nil {
458 return err
459 }
460 }
461
462 // Set end of validity of old version to start of new version
463 // in case of overlap
464 if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext(
465 ctx,
432 code.String(), 466 code.String(),
433 &validity, 467 &validity,
434 &currLevels,
435 )
436 if err != nil {
437 return nil, err
438 }
439 defer rwls.Close()
440 for rwls.Next() {
441 var delRef string
442 if err := rwls.Scan(&delRef); err != nil {
443 return nil, err
444 }
445 feedback.Warn("Removed reference water level %s from %s",
446 delRef, code)
447 }
448 if err := rwls.Err(); err != nil {
449 return nil, err
450 }
451 }
452
453 // Set end of validity of old version to start of new version
454 // in case of overlap
455 if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext(
456 ctx,
457 code.String(),
458 &validity,
459 ); err != nil {
460 feedback.Error(pgxutils.ReadableError{Err: err}.Error())
461 if err2 := tx.Rollback(); err2 != nil {
462 return nil, err2
463 }
464 unchanged++
465 continue
466 }
467
468 // "Upsert" reference water levels
469 for _, wl := range []struct {
470 level **erdms.RisreflevelcodeType
471 value **erdms.RisreflevelvalueType
472 }{
473 {&dr.Reflevel1code, &dr.Reflevel1value},
474 {&dr.Reflevel2code, &dr.Reflevel2value},
475 {&dr.Reflevel3code, &dr.Reflevel3value},
476 } {
477 if *wl.level == nil || *wl.value == nil {
478 continue
479 }
480
481 var isNtSDepthRef bool
482 if err := tx.StmtContext(
483 ctx, isNtSDepthRefStmt).QueryRowContext(ctx,
484 string(**wl.level),
485 ).Scan(
486 &isNtSDepthRef,
487 ); err != nil { 468 ); err != nil {
488 return nil, err 469 key := pgxutils.ReadableError{Err: err}.Error()
489 } 470 databaseErrors[key] = append(databaseErrors[key], isrs)
490 if !isNtSDepthRef { 471 unchanged++
491 feedback.Warn( 472 return continueErr
492 "Reference level code '%s' is not in line "+ 473 }
493 "with the NtS reference_code table", 474
494 string(**wl.level)) 475 // "Upsert" reference water levels
495 } 476 for _, wl := range []struct {
496 477 level **erdms.RisreflevelcodeType
497 if _, err := tx.StmtContext( 478 value **erdms.RisreflevelvalueType
498 ctx, insertWaterLevelStmt).ExecContext(ctx, 479 }{
499 code.CountryCode, 480 {&dr.Reflevel1code, &dr.Reflevel1value},
500 code.LoCode, 481 {&dr.Reflevel2code, &dr.Reflevel2value},
501 code.FairwaySection, 482 {&dr.Reflevel3code, &dr.Reflevel3value},
502 code.Orc, 483 } {
503 code.Hectometre, 484 if *wl.level == nil || *wl.value == nil {
504 &validity, 485 continue
505 string(**wl.level), 486 }
506 int64(**wl.value), 487
507 ); err != nil { 488 var isNtSDepthRef bool
508 feedback.Error(pgxutils.ReadableError{Err: err}.Error()) 489 if err := tx.StmtContext(
509 tx.Rollback() 490 ctx, isNtSDepthRefStmt).QueryRowContext(ctx,
510 continue 491 string(**wl.level),
511 } 492 ).Scan(
512 } 493 &isNtSDepthRef,
513 494 ); err != nil {
514 if err = tx.Commit(); err != nil { 495 return err
515 return nil, err 496 }
497 if !isNtSDepthRef {
498 feedback.Warn(
499 "Reference level code '%s' is not in line "+
500 "with the NtS reference_code table",
501 string(**wl.level))
502 }
503
504 if _, err := tx.StmtContext(
505 ctx, insertWaterLevelStmt).ExecContext(ctx,
506 code.CountryCode,
507 code.LoCode,
508 code.FairwaySection,
509 code.Orc,
510 code.Hectometre,
511 &validity,
512 string(**wl.level),
513 int64(**wl.value),
514 ); err != nil {
515 key := pgxutils.ReadableError{Err: err}.Error()
516 databaseErrors[key] = append(databaseErrors[key], isrs)
517 continue
518 }
519 }
520
521 return tx.Commit()
522 }()
523
524 if err != nil && err != continueErr {
525 return err, nil
516 } 526 }
517 } 527 }
528 }
529
530 if len(invalidISRS) > 0 {
531 feedback.Error("Invalid ISRS code: '%s'", strings.Join(invalidISRS, "', '"))
532 }
533
534 if len(startEndOrder) > 0 {
535 feedback.Error("start date not before end date: %s",
536 strings.Join(startEndOrder, ", "))
537 }
538
539 if len(databaseErrors) > 0 {
540 for err, iris := range databaseErrors {
541 feedback.Error("%s: %s", err, strings.Join(iris, ", "))
542 }
543 }
544
545 if len(missingObjname) > 0 {
546 feedback.Error("Missing zeropoint: %s", strings.Join(missingZeropoint, ", "))
547 }
548 if len(missingObjname) > 0 {
549 feedback.Error("Missing objname: %s", strings.Join(missingObjname, ", "))
518 } 550 }
519 551
520 if len(gauges) == 0 { 552 if len(gauges) == 0 {
521 return nil, UnchangedError("No gauges returned from ERDMS") 553 return nil, UnchangedError("No gauges returned from ERDMS")
522 } 554 }