comparison pkg/imports/wg.go @ 3310:e0dabe7b2fcf

Simplify gauges import Instead of checking some NOT NULL constraints in an extra loop, check all of them in the database.
author Tom Gottfried <tom@intevation.de>
date Fri, 17 May 2019 12:49:28 +0200
parents 5932f9574493
children 0f6b156cff55
comparison
equal deleted inserted replaced
3309:80037790032d 3310:e0dabe7b2fcf
170 "wtwgag") 170 "wtwgag")
171 if err != nil { 171 if err != nil {
172 return nil, err 172 return nil, err
173 } 173 }
174 174
175 var ignored int
176
177 type idxCode struct {
178 jdx int
179 idx int
180 code *models.Isrs
181 }
182
183 var gauges []idxCode
184
185 for j, data := range responseData {
186 for i, dr := range data.RisdataReturn {
187 if dr.RisidxCode == nil {
188 ignored++
189 continue
190 }
191 code, err := models.IsrsFromString(string(*dr.RisidxCode))
192 if err != nil {
193 feedback.Warn("invalid ISRS code %v", err)
194 ignored++
195 continue
196 }
197
198 if dr.Objname.Loc == nil {
199 feedback.Warn("missing objname: %s", code)
200 ignored++
201 continue
202 }
203
204 if dr.Lat == nil || dr.Lon == nil {
205 feedback.Warn("missing lat/lon: %s", code)
206 ignored++
207 continue
208 }
209
210 if dr.Zeropoint == nil {
211 feedback.Warn("missing zeropoint: %s", code)
212 ignored++
213 continue
214 }
215
216 gauges = append(gauges, idxCode{jdx: j, idx: i, code: code})
217 }
218 }
219 feedback.Info("Ignored gauges: %d", ignored)
220 feedback.Info("Further process %d gauges", len(gauges))
221
222 if len(gauges) == 0 {
223 return nil, UnchangedError("Nothing to do")
224 }
225
226 // insert/update the gauges
227 var eraseGaugeStmt, insertStmt, updateStmt, 175 var eraseGaugeStmt, insertStmt, updateStmt,
228 deleteReferenceWaterLevelsStmt, 176 deleteReferenceWaterLevelsStmt,
229 isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt 177 isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt
230 for _, x := range []struct { 178 for _, x := range []struct {
231 sql string 179 sql string
243 return nil, err 191 return nil, err
244 } 192 }
245 defer (*x.stmt).Close() 193 defer (*x.stmt).Close()
246 } 194 }
247 195
196 var gauges []string
248 var unchanged int 197 var unchanged int
249 198
250 for i := range gauges { 199 for _, data := range responseData {
251 ic := &gauges[i] 200 for _, dr := range data.RisdataReturn {
252 dr := responseData[ic.jdx].RisdataReturn[ic.idx] 201
253 202 isrs := string(*dr.RisidxCode)
254 feedback.Info("Processing %s", ic.code) 203 code, err := models.IsrsFromString(isrs)
255 204 if err != nil {
256 var from, to sql.NullInt64 205 feedback.Warn("Invalid ISRS code '%s': %v", isrs, err)
257 206 continue
258 if dr.Applicabilityfromkm != nil { 207 }
259 from = sql.NullInt64{ 208 gauges = append(gauges, isrs)
260 Int64: int64(*dr.Applicabilityfromkm), 209 feedback.Info("Processing %s", code)
261 Valid: true, 210
262 } 211 var from, to sql.NullInt64
263 } 212
264 if dr.Applicabilitytokm != nil { 213 if dr.Applicabilityfromkm != nil {
265 to = sql.NullInt64{ 214 from = sql.NullInt64{
266 Int64: int64(*dr.Applicabilitytokm), 215 Int64: int64(*dr.Applicabilityfromkm),
267 Valid: true, 216 Valid: true,
268 } 217 }
269 } 218 }
270 219 if dr.Applicabilitytokm != nil {
271 var tfrom, tto, dateInfo pgtype.Timestamptz 220 to = sql.NullInt64{
272 221 Int64: int64(*dr.Applicabilitytokm),
273 if dr.Startdate != nil { 222 Valid: true,
274 tfrom = pgtype.Timestamptz{ 223 }
275 Time: time.Time(*dr.Startdate), 224 }
276 Status: pgtype.Present, 225
277 } 226 var tfrom, tto, dateInfo pgtype.Timestamptz
278 } else { 227
279 tfrom = pgtype.Timestamptz{ 228 if dr.Startdate != nil {
280 Status: pgtype.Null, 229 tfrom = pgtype.Timestamptz{
281 } 230 Time: time.Time(*dr.Startdate),
282 } 231 Status: pgtype.Present,
283 232 }
284 if dr.Enddate != nil { 233 } else {
285 tto = pgtype.Timestamptz{ 234 tfrom = pgtype.Timestamptz{
286 Time: time.Time(*dr.Enddate), 235 Status: pgtype.Null,
287 Status: pgtype.Present, 236 }
288 } 237 }
289 } else { 238
290 tto = pgtype.Timestamptz{ 239 if dr.Enddate != nil {
291 Status: pgtype.Null, 240 tto = pgtype.Timestamptz{
292 } 241 Time: time.Time(*dr.Enddate),
293 } 242 Status: pgtype.Present,
294 243 }
295 validity := pgtype.Tstzrange{ 244 } else {
296 Lower: tfrom, 245 tto = pgtype.Timestamptz{
297 Upper: tto, 246 Status: pgtype.Null,
298 LowerType: pgtype.Inclusive, 247 }
299 UpperType: pgtype.Exclusive, 248 }
300 Status: pgtype.Present, 249
301 } 250 validity := pgtype.Tstzrange{
302 251 Lower: tfrom,
303 if dr.Infodate != nil { 252 Upper: tto,
304 dateInfo = pgtype.Timestamptz{ 253 LowerType: pgtype.Inclusive,
305 Time: time.Time(*dr.Infodate), 254 UpperType: pgtype.Exclusive,
306 Status: pgtype.Present, 255 Status: pgtype.Present,
307 } 256 }
308 } else { 257
309 dateInfo = pgtype.Timestamptz{ 258 if dr.Infodate != nil {
310 Status: pgtype.Null, 259 dateInfo = pgtype.Timestamptz{
311 } 260 Time: time.Time(*dr.Infodate),
312 } 261 Status: pgtype.Present,
313 262 }
314 var geodref sql.NullString 263 } else {
315 if dr.Geodref != nil { 264 dateInfo = pgtype.Timestamptz{
316 geodref = sql.NullString{ 265 Status: pgtype.Null,
317 String: string(*dr.Geodref), 266 }
318 Valid: true, 267 }
319 } 268
320 } 269 var geodref sql.NullString
321 270 if dr.Geodref != nil {
322 var source sql.NullString 271 geodref = sql.NullString{
323 if dr.Source != nil { 272 String: string(*dr.Geodref),
324 source = sql.NullString{ 273 Valid: true,
325 String: string(*dr.Source), 274 }
326 Valid: true, 275 }
327 } 276
328 } 277 var source sql.NullString
329 278 if dr.Source != nil {
330 tx, err := conn.BeginTx(ctx, nil) 279 source = sql.NullString{
331 if err != nil { 280 String: string(*dr.Source),
332 return nil, err 281 Valid: true,
333 } 282 }
334 defer tx.Rollback() 283 }
335 284
336 // Mark old entries of gauge as erased, if applicable 285 tx, err := conn.BeginTx(ctx, nil)
337 if _, err := tx.StmtContext(ctx, eraseGaugeStmt).ExecContext(ctx, 286 if err != nil {
338 ic.code.String(), 287 return nil, err
339 validity, 288 }
340 ); err != nil { 289 defer tx.Rollback()
341 feedback.Warn(handleError(err).Error()) 290
342 if err2 := tx.Rollback(); err2 != nil { 291 // Mark old entries of gauge as erased, if applicable
343 return nil, err2 292 if _, err := tx.StmtContext(ctx, eraseGaugeStmt).ExecContext(ctx,
344 } 293 code.String(),
345 unchanged++ 294 validity,
346 continue 295 ); err != nil {
347 } 296 feedback.Warn(handleError(err).Error())
348 297 if err2 := tx.Rollback(); err2 != nil {
349 // Try to insert gauge entry 298 return nil, err2
350 var dummy int 299 }
351 err = tx.StmtContext(ctx, insertStmt).QueryRowContext(ctx, 300 unchanged++
352 ic.code.CountryCode, 301 continue
353 ic.code.LoCode, 302 }
354 ic.code.FairwaySection, 303
355 ic.code.Orc, 304 // Try to insert gauge entry
356 ic.code.Hectometre, 305 var dummy int
357 string(*dr.Objname.Loc), 306 err = tx.StmtContext(ctx, insertStmt).QueryRowContext(ctx,
358 float64(*dr.Lon), float64(*dr.Lat), 307 code.CountryCode,
359 from, 308 code.LoCode,
360 to, 309 code.FairwaySection,
361 &validity, 310 code.Orc,
362 float64(*dr.Zeropoint), 311 code.Hectometre,
363 geodref, 312 dr.Objname.Loc,
364 &dateInfo, 313 dr.Lon, dr.Lat,
365 source,
366 time.Time(*dr.Lastupdate),
367 ).Scan(&dummy)
368 switch {
369 case err == sql.ErrNoRows:
370 // Assume constraint conflict, try to update
371 err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx,
372 ic.code.CountryCode,
373 ic.code.LoCode,
374 ic.code.FairwaySection,
375 ic.code.Orc,
376 ic.code.Hectometre,
377 string(*dr.Objname.Loc),
378 float64(*dr.Lon), float64(*dr.Lat),
379 from, 314 from,
380 to, 315 to,
381 float64(*dr.Zeropoint), 316 &validity,
317 dr.Zeropoint,
382 geodref, 318 geodref,
383 &dateInfo, 319 &dateInfo,
384 source, 320 source,
385 time.Time(*dr.Lastupdate), 321 time.Time(*dr.Lastupdate),
386 ).Scan(&dummy) 322 ).Scan(&dummy)
387 switch { 323 switch {
388 case err2 == sql.ErrNoRows: 324 case err == sql.ErrNoRows:
389 feedback.Info("unchanged") 325 // Assume constraint conflict, try to update
390 if err3 := tx.Rollback(); err3 != nil { 326 err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx,
391 return nil, err3 327 code.CountryCode,
392 } 328 code.LoCode,
393 unchanged++ 329 code.FairwaySection,
394 continue 330 code.Orc,
395 case err2 != nil: 331 code.Hectometre,
396 feedback.Warn(handleError(err2).Error()) 332 dr.Objname.Loc,
397 if err3 := tx.Rollback(); err3 != nil { 333 dr.Lon, dr.Lat,
398 return nil, err3 334 from,
335 to,
336 dr.Zeropoint,
337 geodref,
338 &dateInfo,
339 source,
340 time.Time(*dr.Lastupdate),
341 ).Scan(&dummy)
342 switch {
343 case err2 == sql.ErrNoRows:
344 feedback.Info("unchanged")
345 if err3 := tx.Rollback(); err3 != nil {
346 return nil, err3
347 }
348 unchanged++
349 continue
350 case err2 != nil:
351 feedback.Warn(handleError(err2).Error())
352 if err3 := tx.Rollback(); err3 != nil {
353 return nil, err3
354 }
355 unchanged++
356 continue
357 default:
358 feedback.Info("update")
359 }
360
361 // Remove obsolete reference water levels
362 var currLevels pgtype.VarcharArray
363 currLevels.Set([]string{
364 string(*dr.Reflevel1code),
365 string(*dr.Reflevel2code),
366 string(*dr.Reflevel3code),
367 })
368 rwls, err := tx.StmtContext(ctx,
369 deleteReferenceWaterLevelsStmt).QueryContext(ctx,
370 code.String(),
371 &validity,
372 &currLevels,
373 )
374 if err != nil {
375 return nil, err
376 }
377 defer rwls.Close()
378 for rwls.Next() {
379 var delRef string
380 if err := rwls.Scan(&delRef); err != nil {
381 return nil, err
382 }
383 feedback.Warn("Removed reference water level %s from %s",
384 delRef, code)
385 }
386 if err := rwls.Err(); err != nil {
387 return nil, err
388 }
389 case err != nil:
390 feedback.Warn(handleError(err).Error())
391 if err2 := tx.Rollback(); err2 != nil {
392 return nil, err2
399 } 393 }
400 unchanged++ 394 unchanged++
401 continue 395 continue
402 default: 396 default:
403 feedback.Info("update") 397 feedback.Info("insert new version")
404 } 398 }
405 399
406 // Remove obsolete reference water levels 400 // "Upsert" reference water levels
407 var currLevels pgtype.VarcharArray 401 for _, wl := range []struct {
408 currLevels.Set([]string{ 402 level **erdms.RisreflevelcodeType
409 string(*dr.Reflevel1code), 403 value **erdms.RisreflevelvalueType
410 string(*dr.Reflevel2code), 404 }{
411 string(*dr.Reflevel3code), 405 {&dr.Reflevel1code, &dr.Reflevel1value},
412 }) 406 {&dr.Reflevel2code, &dr.Reflevel2value},
413 rwls, err := tx.StmtContext(ctx, 407 {&dr.Reflevel3code, &dr.Reflevel3value},
414 deleteReferenceWaterLevelsStmt).QueryContext(ctx, 408 } {
415 ic.code.String(), 409 if *wl.level == nil || *wl.value == nil {
416 &validity, 410 continue
417 &currLevels, 411 }
418 ) 412
419 if err != nil { 413 var isNtSDepthRef bool
414 if err := tx.StmtContext(
415 ctx, isNtSDepthRefStmt).QueryRowContext(ctx,
416 string(**wl.level),
417 ).Scan(
418 &isNtSDepthRef,
419 ); err != nil {
420 return nil, err
421 }
422 if !isNtSDepthRef {
423 feedback.Warn(
424 "Reference level code '%s' is not in line "+
425 "with the NtS reference_code table",
426 string(**wl.level))
427 }
428
429 if _, err := tx.StmtContext(
430 ctx, insertWaterLevelStmt).ExecContext(ctx,
431 code.CountryCode,
432 code.LoCode,
433 code.FairwaySection,
434 code.Orc,
435 code.Hectometre,
436 &validity,
437 string(**wl.level),
438 int64(**wl.value),
439 ); err != nil {
440 feedback.Warn(handleError(err).Error())
441 tx.Rollback()
442 continue
443 }
444 }
445
446 if err = tx.Commit(); err != nil {
420 return nil, err 447 return nil, err
421 } 448 }
422 defer rwls.Close()
423 for rwls.Next() {
424 var delRef string
425 if err := rwls.Scan(&delRef); err != nil {
426 return nil, err
427 }
428 feedback.Warn("Removed reference water level %s from %s",
429 delRef, ic.code)
430 }
431 if err := rwls.Err(); err != nil {
432 return nil, err
433 }
434 case err != nil:
435 feedback.Warn(handleError(err).Error())
436 if err2 := tx.Rollback(); err2 != nil {
437 return nil, err2
438 }
439 unchanged++
440 continue
441 default:
442 feedback.Info("insert new version")
443 }
444
445 // "Upsert" reference water levels
446 for _, wl := range []struct {
447 level **erdms.RisreflevelcodeType
448 value **erdms.RisreflevelvalueType
449 }{
450 {&dr.Reflevel1code, &dr.Reflevel1value},
451 {&dr.Reflevel2code, &dr.Reflevel2value},
452 {&dr.Reflevel3code, &dr.Reflevel3value},
453 } {
454 if *wl.level == nil || *wl.value == nil {
455 continue
456 }
457
458 var isNtSDepthRef bool
459 if err := tx.StmtContext(ctx, isNtSDepthRefStmt).QueryRowContext(
460 ctx,
461 string(**wl.level),
462 ).Scan(
463 &isNtSDepthRef,
464 ); err != nil {
465 return nil, err
466 }
467 if !isNtSDepthRef {
468 feedback.Warn(
469 "Reference level code '%s' is not in line "+
470 "with the NtS reference_code table",
471 string(**wl.level))
472 }
473
474 if _, err := tx.StmtContext(ctx, insertWaterLevelStmt).ExecContext(
475 ctx,
476 ic.code.CountryCode,
477 ic.code.LoCode,
478 ic.code.FairwaySection,
479 ic.code.Orc,
480 ic.code.Hectometre,
481 &validity,
482 string(**wl.level),
483 int64(**wl.value),
484 ); err != nil {
485 feedback.Warn(handleError(err).Error())
486 tx.Rollback()
487 continue
488 }
489 }
490
491 if err = tx.Commit(); err != nil {
492 return nil, err
493 } 449 }
494 } 450 }
495 451
496 feedback.Info("Importing gauges took %s", 452 feedback.Info("Importing gauges took %s",
497 time.Since(start)) 453 time.Since(start))
498 454
455 if len(gauges) == 0 {
456 return nil, UnchangedError("No gauges returned from ERDMS")
457 }
458
499 if unchanged == len(gauges) { 459 if unchanged == len(gauges) {
500 return nil, UnchangedError("All gauges unchanged") 460 return nil, UnchangedError("All gauges unchanged")
501 } 461 }
502 462
503 return nil, err 463 return nil, err