Mercurial > gemma
comparison pkg/imports/wg.go @ 5620:165e77c5736a erdms2
Simplified WG import logging.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Mon, 28 Nov 2022 18:17:11 +0100 |
parents | f6179b31e0a9 |
children | cf1e8ffe1ed5 |
comparison
equal
deleted
inserted
replaced
5619:f0413b20ad4d | 5620:165e77c5736a |
---|---|
15 package imports | 15 package imports |
16 | 16 |
17 import ( | 17 import ( |
18 "context" | 18 "context" |
19 "database/sql" | 19 "database/sql" |
20 "errors" | |
21 "strings" | |
20 "time" | 22 "time" |
21 | 23 |
22 "github.com/jackc/pgx/pgtype" | 24 "github.com/jackc/pgx/pgtype" |
23 | 25 |
24 "gemma.intevation.de/gemma/pkg/models" | 26 "gemma.intevation.de/gemma/pkg/models" |
175 ) ON CONFLICT (location, validity, depth_reference) DO UPDATE SET | 177 ) ON CONFLICT (location, validity, depth_reference) DO UPDATE SET |
176 value = EXCLUDED.value | 178 value = EXCLUDED.value |
177 ` | 179 ` |
178 ) | 180 ) |
179 | 181 |
182 var continueErr = errors.New("continue") | |
183 | |
180 // Do implements the actual import. | 184 // Do implements the actual import. |
181 func (wg *WaterwayGauge) Do( | 185 func (wg *WaterwayGauge) Do( |
182 ctx context.Context, | 186 ctx context.Context, |
183 importID int64, | 187 importID int64, |
184 conn *sql.Conn, | 188 conn *sql.Conn, |
224 } | 228 } |
225 | 229 |
226 var gauges []string | 230 var gauges []string |
227 var unchanged int | 231 var unchanged int |
228 | 232 |
233 var invalidISRS, startEndOrder, missingObjname, missingZeropoint []string | |
234 | |
235 databaseErrors := map[string][]string{} | |
236 | |
229 for _, data := range responseData { | 237 for _, data := range responseData { |
230 for _, dr := range data.RisdataReturn { | 238 for _, dr := range data.RisdataReturn { |
231 | 239 |
232 isrs := string(*dr.RisidxCode) | 240 isrs := string(*dr.RisidxCode) |
233 code, err := models.IsrsFromString(isrs) | 241 code, err := models.IsrsFromString(isrs) |
234 if err != nil { | 242 if err != nil { |
235 feedback.Warn("Invalid ISRS code '%s': %v", isrs, err) | 243 invalidISRS = append(invalidISRS, isrs) |
236 continue | 244 continue |
237 } | 245 } |
238 gauges = append(gauges, isrs) | 246 gauges = append(gauges, isrs) |
239 feedback.Info("Processing %s", code) | |
240 | 247 |
241 // We need a valid, non-empty time range to identify gauge versions | 248 // We need a valid, non-empty time range to identify gauge versions |
242 if dr.Enddate != nil && dr.Startdate != nil { | 249 if dr.Enddate != nil && dr.Startdate != nil { |
243 ed := dr.Enddate.ToGoTime() | 250 ed := dr.Enddate.ToGoTime() |
244 sd := dr.Startdate.ToGoTime() | 251 sd := dr.Startdate.ToGoTime() |
245 // log.Debugf("start date: %v end date: %v\n", sd, ed) | 252 // log.Debugf("start date: %v end date: %v\n", sd, ed) |
246 if !ed.After(sd) { | 253 if !ed.After(sd) { |
247 feedback.Error("End date not after start date") | 254 startEndOrder = append(startEndOrder, isrs) |
248 unchanged++ | 255 unchanged++ |
249 continue | 256 continue |
250 } | 257 } |
258 } | |
259 | |
260 if dr.Zeropoint == nil { | |
261 missingZeropoint = append(missingZeropoint, isrs) | |
262 unchanged++ | |
263 continue | |
264 } | |
265 | |
266 if dr.Objname.Loc == nil { | |
267 missingObjname = append(missingObjname, isrs) | |
268 unchanged++ | |
269 continue | |
251 } | 270 } |
252 | 271 |
253 var from, to sql.NullInt64 | 272 var from, to sql.NullInt64 |
254 | 273 |
255 if dr.Applicabilityfromkm != nil { | 274 if dr.Applicabilityfromkm != nil { |
322 String: string(*dr.Source), | 341 String: string(*dr.Source), |
323 Valid: true, | 342 Valid: true, |
324 } | 343 } |
325 } | 344 } |
326 | 345 |
327 tx, err := conn.BeginTx(ctx, nil) | 346 err = func() error { |
328 if err != nil { | 347 tx, err := conn.BeginTx(ctx, nil) |
329 return nil, err | 348 if err != nil { |
330 } | 349 return err |
331 defer tx.Rollback() | 350 } |
332 | 351 defer tx.Rollback() |
333 // Mark old entry of gauge as erased, if applicable | 352 |
334 var isNew bool | 353 // Mark old entry of gauge as erased, if applicable |
335 err = tx.StmtContext(ctx, eraseGaugeStmt).QueryRowContext(ctx, | 354 var isNew bool |
336 code.String(), | 355 err = tx.StmtContext(ctx, eraseGaugeStmt).QueryRowContext(ctx, |
337 validity, | 356 code.String(), |
338 ).Scan(&isNew) | 357 validity, |
339 switch { | 358 ).Scan(&isNew) |
340 case err != nil: | 359 switch { |
341 feedback.Error(pgxutils.ReadableError{Err: err}.Error()) | 360 case err != nil: |
342 if err2 := tx.Rollback(); err2 != nil { | |
343 return nil, err2 | |
344 } | |
345 unchanged++ | |
346 continue | |
347 case isNew: | |
348 var lu *time.Time | |
349 if dr.Lastupdate != nil { | |
350 t := dr.Lastupdate.ToGoTime() | |
351 lu = &t | |
352 } | |
353 // insert gauge version entry | |
354 if _, err = tx.StmtContext(ctx, insertStmt).ExecContext(ctx, | |
355 code.CountryCode, | |
356 code.LoCode, | |
357 code.FairwaySection, | |
358 code.Orc, | |
359 code.Hectometre, | |
360 dr.Objname.Loc, | |
361 dr.Lon, dr.Lat, | |
362 from, | |
363 to, | |
364 &validity, | |
365 dr.Zeropoint, | |
366 geodref, | |
367 &dateInfo, | |
368 source, | |
369 lu, | |
370 ); err != nil { | |
371 feedback.Error(pgxutils.ReadableError{Err: err}.Error()) | 361 feedback.Error(pgxutils.ReadableError{Err: err}.Error()) |
372 if err2 := tx.Rollback(); err2 != nil { | |
373 return nil, err2 | |
374 } | |
375 unchanged++ | 362 unchanged++ |
376 continue | 363 return continueErr |
377 } | 364 case isNew: |
378 feedback.Info("insert new version") | 365 var lu *time.Time |
379 case !isNew: | 366 if dr.Lastupdate != nil { |
380 var lu *time.Time | 367 t := dr.Lastupdate.ToGoTime() |
381 if dr.Lastupdate != nil { | 368 lu = &t |
382 t := dr.Lastupdate.ToGoTime() | 369 } |
383 lu = &t | 370 // insert gauge version entry |
384 } | 371 if _, err = tx.StmtContext(ctx, insertStmt).ExecContext(ctx, |
385 // try to update | 372 code.CountryCode, |
386 var dummy int | 373 code.LoCode, |
387 err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx, | 374 code.FairwaySection, |
388 code.CountryCode, | 375 code.Orc, |
389 code.LoCode, | 376 code.Hectometre, |
390 code.FairwaySection, | 377 dr.Objname.Loc, |
391 code.Orc, | 378 dr.Lon, dr.Lat, |
392 code.Hectometre, | 379 from, |
393 dr.Objname.Loc, | 380 to, |
394 dr.Lon, dr.Lat, | 381 &validity, |
395 from, | 382 dr.Zeropoint, |
396 to, | 383 geodref, |
397 dr.Zeropoint, | 384 &dateInfo, |
398 geodref, | 385 source, |
399 &dateInfo, | 386 lu, |
400 source, | 387 ); err != nil { |
401 lu, | 388 key := pgxutils.ReadableError{Err: err}.Error() |
402 &validity, | 389 databaseErrors[key] = append(databaseErrors[key], isrs) |
403 ).Scan(&dummy) | 390 return continueErr |
404 switch { | 391 } |
405 case err2 == sql.ErrNoRows: | 392 //feedback.Info("insert new version") |
406 feedback.Info("unchanged") | 393 case !isNew: |
407 if err3 := tx.Rollback(); err3 != nil { | 394 var lu *time.Time |
408 return nil, err3 | 395 if dr.Lastupdate != nil { |
409 } | 396 t := dr.Lastupdate.ToGoTime() |
410 unchanged++ | 397 lu = &t |
411 continue | 398 } |
412 case err2 != nil: | 399 // try to update |
413 feedback.Error(pgxutils.ReadableError{Err: err2}.Error()) | 400 var dummy int |
414 if err3 := tx.Rollback(); err3 != nil { | 401 err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx, |
415 return nil, err3 | 402 code.CountryCode, |
416 } | 403 code.LoCode, |
417 unchanged++ | 404 code.FairwaySection, |
418 continue | 405 code.Orc, |
419 default: | 406 code.Hectometre, |
420 feedback.Info("update") | 407 dr.Objname.Loc, |
421 } | 408 dr.Lon, dr.Lat, |
422 | 409 from, |
423 // Remove obsolete reference water levels | 410 to, |
424 var currLevels pgtype.VarcharArray | 411 dr.Zeropoint, |
425 currLevels.Set([]string{ | 412 geodref, |
426 string(*dr.Reflevel1code), | 413 &dateInfo, |
427 string(*dr.Reflevel2code), | 414 source, |
428 string(*dr.Reflevel3code), | 415 lu, |
429 }) | 416 &validity, |
430 rwls, err := tx.StmtContext(ctx, | 417 ).Scan(&dummy) |
431 deleteReferenceWaterLevelsStmt).QueryContext(ctx, | 418 switch { |
419 case err2 == sql.ErrNoRows: | |
420 //feedback.Info("unchanged") | |
421 unchanged++ | |
422 return continueErr | |
423 case err2 != nil: | |
424 key := pgxutils.ReadableError{Err: err}.Error() | |
425 databaseErrors[key] = append(databaseErrors[key], isrs) | |
426 unchanged++ | |
427 return continueErr | |
428 default: | |
429 //feedback.Info("update") | |
430 } | |
431 | |
432 // Remove obsolete reference water levels | |
433 var currLevels pgtype.VarcharArray | |
434 currLevels.Set([]string{ | |
435 string(*dr.Reflevel1code), | |
436 string(*dr.Reflevel2code), | |
437 string(*dr.Reflevel3code), | |
438 }) | |
439 rwls, err := tx.StmtContext(ctx, | |
440 deleteReferenceWaterLevelsStmt).QueryContext(ctx, | |
441 code.String(), | |
442 &validity, | |
443 &currLevels, | |
444 ) | |
445 if err != nil { | |
446 return err | |
447 } | |
448 defer rwls.Close() | |
449 for rwls.Next() { | |
450 var delRef string | |
451 if err := rwls.Scan(&delRef); err != nil { | |
452 return err | |
453 } | |
454 feedback.Warn("Removed reference water level %s from %s", | |
455 delRef, code) | |
456 } | |
457 if err := rwls.Err(); err != nil { | |
458 return err | |
459 } | |
460 } | |
461 | |
462 // Set end of validity of old version to start of new version | |
463 // in case of overlap | |
464 if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext( | |
465 ctx, | |
432 code.String(), | 466 code.String(), |
433 &validity, | 467 &validity, |
434 &currLevels, | |
435 ) | |
436 if err != nil { | |
437 return nil, err | |
438 } | |
439 defer rwls.Close() | |
440 for rwls.Next() { | |
441 var delRef string | |
442 if err := rwls.Scan(&delRef); err != nil { | |
443 return nil, err | |
444 } | |
445 feedback.Warn("Removed reference water level %s from %s", | |
446 delRef, code) | |
447 } | |
448 if err := rwls.Err(); err != nil { | |
449 return nil, err | |
450 } | |
451 } | |
452 | |
453 // Set end of validity of old version to start of new version | |
454 // in case of overlap | |
455 if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext( | |
456 ctx, | |
457 code.String(), | |
458 &validity, | |
459 ); err != nil { | |
460 feedback.Error(pgxutils.ReadableError{Err: err}.Error()) | |
461 if err2 := tx.Rollback(); err2 != nil { | |
462 return nil, err2 | |
463 } | |
464 unchanged++ | |
465 continue | |
466 } | |
467 | |
468 // "Upsert" reference water levels | |
469 for _, wl := range []struct { | |
470 level **erdms.RisreflevelcodeType | |
471 value **erdms.RisreflevelvalueType | |
472 }{ | |
473 {&dr.Reflevel1code, &dr.Reflevel1value}, | |
474 {&dr.Reflevel2code, &dr.Reflevel2value}, | |
475 {&dr.Reflevel3code, &dr.Reflevel3value}, | |
476 } { | |
477 if *wl.level == nil || *wl.value == nil { | |
478 continue | |
479 } | |
480 | |
481 var isNtSDepthRef bool | |
482 if err := tx.StmtContext( | |
483 ctx, isNtSDepthRefStmt).QueryRowContext(ctx, | |
484 string(**wl.level), | |
485 ).Scan( | |
486 &isNtSDepthRef, | |
487 ); err != nil { | 468 ); err != nil { |
488 return nil, err | 469 key := pgxutils.ReadableError{Err: err}.Error() |
489 } | 470 databaseErrors[key] = append(databaseErrors[key], isrs) |
490 if !isNtSDepthRef { | 471 unchanged++ |
491 feedback.Warn( | 472 return continueErr |
492 "Reference level code '%s' is not in line "+ | 473 } |
493 "with the NtS reference_code table", | 474 |
494 string(**wl.level)) | 475 // "Upsert" reference water levels |
495 } | 476 for _, wl := range []struct { |
496 | 477 level **erdms.RisreflevelcodeType |
497 if _, err := tx.StmtContext( | 478 value **erdms.RisreflevelvalueType |
498 ctx, insertWaterLevelStmt).ExecContext(ctx, | 479 }{ |
499 code.CountryCode, | 480 {&dr.Reflevel1code, &dr.Reflevel1value}, |
500 code.LoCode, | 481 {&dr.Reflevel2code, &dr.Reflevel2value}, |
501 code.FairwaySection, | 482 {&dr.Reflevel3code, &dr.Reflevel3value}, |
502 code.Orc, | 483 } { |
503 code.Hectometre, | 484 if *wl.level == nil || *wl.value == nil { |
504 &validity, | 485 continue |
505 string(**wl.level), | 486 } |
506 int64(**wl.value), | 487 |
507 ); err != nil { | 488 var isNtSDepthRef bool |
508 feedback.Error(pgxutils.ReadableError{Err: err}.Error()) | 489 if err := tx.StmtContext( |
509 tx.Rollback() | 490 ctx, isNtSDepthRefStmt).QueryRowContext(ctx, |
510 continue | 491 string(**wl.level), |
511 } | 492 ).Scan( |
512 } | 493 &isNtSDepthRef, |
513 | 494 ); err != nil { |
514 if err = tx.Commit(); err != nil { | 495 return err |
515 return nil, err | 496 } |
497 if !isNtSDepthRef { | |
498 feedback.Warn( | |
499 "Reference level code '%s' is not in line "+ | |
500 "with the NtS reference_code table", | |
501 string(**wl.level)) | |
502 } | |
503 | |
504 if _, err := tx.StmtContext( | |
505 ctx, insertWaterLevelStmt).ExecContext(ctx, | |
506 code.CountryCode, | |
507 code.LoCode, | |
508 code.FairwaySection, | |
509 code.Orc, | |
510 code.Hectometre, | |
511 &validity, | |
512 string(**wl.level), | |
513 int64(**wl.value), | |
514 ); err != nil { | |
515 key := pgxutils.ReadableError{Err: err}.Error() | |
516 databaseErrors[key] = append(databaseErrors[key], isrs) | |
517 continue | |
518 } | |
519 } | |
520 | |
521 return tx.Commit() | |
522 }() | |
523 | |
524 if err != nil && err != continueErr { | |
525 return err, nil | |
516 } | 526 } |
517 } | 527 } |
528 } | |
529 | |
530 if len(invalidISRS) > 0 { | |
531 feedback.Error("Invalid ISRS code: '%s'", strings.Join(invalidISRS, "', '")) | |
532 } | |
533 | |
534 if len(startEndOrder) > 0 { | |
535 feedback.Error("start date not before end date: %s", | |
536 strings.Join(startEndOrder, ", ")) | |
537 } | |
538 | |
539 if len(databaseErrors) > 0 { | |
540 for err, iris := range databaseErrors { | |
541 feedback.Error("%s: %s", err, strings.Join(iris, ", ")) | |
542 } | |
543 } | |
544 | |
545 if len(missingObjname) > 0 { | |
546 feedback.Error("Missing zeropoint: %s", strings.Join(missingZeropoint, ", ")) | |
547 } | |
548 if len(missingObjname) > 0 { | |
549 feedback.Error("Missing objname: %s", strings.Join(missingObjname, ", ")) | |
518 } | 550 } |
519 | 551 |
520 if len(gauges) == 0 { | 552 if len(gauges) == 0 { |
521 return nil, UnchangedError("No gauges returned from ERDMS") | 553 return nil, UnchangedError("No gauges returned from ERDMS") |
522 } | 554 } |