Mercurial > gemma
comparison pkg/imports/wg.go @ 3310:e0dabe7b2fcf
Simplify gauges import
Instead of checking some NOT NULL constraints in an extra loop,
check all of them in the database.
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Fri, 17 May 2019 12:49:28 +0200 |
parents | 5932f9574493 |
children | 0f6b156cff55 |
comparison
equal
deleted
inserted
replaced
3309:80037790032d | 3310:e0dabe7b2fcf |
---|---|
170 "wtwgag") | 170 "wtwgag") |
171 if err != nil { | 171 if err != nil { |
172 return nil, err | 172 return nil, err |
173 } | 173 } |
174 | 174 |
175 var ignored int | |
176 | |
177 type idxCode struct { | |
178 jdx int | |
179 idx int | |
180 code *models.Isrs | |
181 } | |
182 | |
183 var gauges []idxCode | |
184 | |
185 for j, data := range responseData { | |
186 for i, dr := range data.RisdataReturn { | |
187 if dr.RisidxCode == nil { | |
188 ignored++ | |
189 continue | |
190 } | |
191 code, err := models.IsrsFromString(string(*dr.RisidxCode)) | |
192 if err != nil { | |
193 feedback.Warn("invalid ISRS code %v", err) | |
194 ignored++ | |
195 continue | |
196 } | |
197 | |
198 if dr.Objname.Loc == nil { | |
199 feedback.Warn("missing objname: %s", code) | |
200 ignored++ | |
201 continue | |
202 } | |
203 | |
204 if dr.Lat == nil || dr.Lon == nil { | |
205 feedback.Warn("missing lat/lon: %s", code) | |
206 ignored++ | |
207 continue | |
208 } | |
209 | |
210 if dr.Zeropoint == nil { | |
211 feedback.Warn("missing zeropoint: %s", code) | |
212 ignored++ | |
213 continue | |
214 } | |
215 | |
216 gauges = append(gauges, idxCode{jdx: j, idx: i, code: code}) | |
217 } | |
218 } | |
219 feedback.Info("Ignored gauges: %d", ignored) | |
220 feedback.Info("Further process %d gauges", len(gauges)) | |
221 | |
222 if len(gauges) == 0 { | |
223 return nil, UnchangedError("Nothing to do") | |
224 } | |
225 | |
226 // insert/update the gauges | |
227 var eraseGaugeStmt, insertStmt, updateStmt, | 175 var eraseGaugeStmt, insertStmt, updateStmt, |
228 deleteReferenceWaterLevelsStmt, | 176 deleteReferenceWaterLevelsStmt, |
229 isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt | 177 isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt |
230 for _, x := range []struct { | 178 for _, x := range []struct { |
231 sql string | 179 sql string |
243 return nil, err | 191 return nil, err |
244 } | 192 } |
245 defer (*x.stmt).Close() | 193 defer (*x.stmt).Close() |
246 } | 194 } |
247 | 195 |
196 var gauges []string | |
248 var unchanged int | 197 var unchanged int |
249 | 198 |
250 for i := range gauges { | 199 for _, data := range responseData { |
251 ic := &gauges[i] | 200 for _, dr := range data.RisdataReturn { |
252 dr := responseData[ic.jdx].RisdataReturn[ic.idx] | 201 |
253 | 202 isrs := string(*dr.RisidxCode) |
254 feedback.Info("Processing %s", ic.code) | 203 code, err := models.IsrsFromString(isrs) |
255 | 204 if err != nil { |
256 var from, to sql.NullInt64 | 205 feedback.Warn("Invalid ISRS code '%s': %v", isrs, err) |
257 | 206 continue |
258 if dr.Applicabilityfromkm != nil { | 207 } |
259 from = sql.NullInt64{ | 208 gauges = append(gauges, isrs) |
260 Int64: int64(*dr.Applicabilityfromkm), | 209 feedback.Info("Processing %s", code) |
261 Valid: true, | 210 |
262 } | 211 var from, to sql.NullInt64 |
263 } | 212 |
264 if dr.Applicabilitytokm != nil { | 213 if dr.Applicabilityfromkm != nil { |
265 to = sql.NullInt64{ | 214 from = sql.NullInt64{ |
266 Int64: int64(*dr.Applicabilitytokm), | 215 Int64: int64(*dr.Applicabilityfromkm), |
267 Valid: true, | 216 Valid: true, |
268 } | 217 } |
269 } | 218 } |
270 | 219 if dr.Applicabilitytokm != nil { |
271 var tfrom, tto, dateInfo pgtype.Timestamptz | 220 to = sql.NullInt64{ |
272 | 221 Int64: int64(*dr.Applicabilitytokm), |
273 if dr.Startdate != nil { | 222 Valid: true, |
274 tfrom = pgtype.Timestamptz{ | 223 } |
275 Time: time.Time(*dr.Startdate), | 224 } |
276 Status: pgtype.Present, | 225 |
277 } | 226 var tfrom, tto, dateInfo pgtype.Timestamptz |
278 } else { | 227 |
279 tfrom = pgtype.Timestamptz{ | 228 if dr.Startdate != nil { |
280 Status: pgtype.Null, | 229 tfrom = pgtype.Timestamptz{ |
281 } | 230 Time: time.Time(*dr.Startdate), |
282 } | 231 Status: pgtype.Present, |
283 | 232 } |
284 if dr.Enddate != nil { | 233 } else { |
285 tto = pgtype.Timestamptz{ | 234 tfrom = pgtype.Timestamptz{ |
286 Time: time.Time(*dr.Enddate), | 235 Status: pgtype.Null, |
287 Status: pgtype.Present, | 236 } |
288 } | 237 } |
289 } else { | 238 |
290 tto = pgtype.Timestamptz{ | 239 if dr.Enddate != nil { |
291 Status: pgtype.Null, | 240 tto = pgtype.Timestamptz{ |
292 } | 241 Time: time.Time(*dr.Enddate), |
293 } | 242 Status: pgtype.Present, |
294 | 243 } |
295 validity := pgtype.Tstzrange{ | 244 } else { |
296 Lower: tfrom, | 245 tto = pgtype.Timestamptz{ |
297 Upper: tto, | 246 Status: pgtype.Null, |
298 LowerType: pgtype.Inclusive, | 247 } |
299 UpperType: pgtype.Exclusive, | 248 } |
300 Status: pgtype.Present, | 249 |
301 } | 250 validity := pgtype.Tstzrange{ |
302 | 251 Lower: tfrom, |
303 if dr.Infodate != nil { | 252 Upper: tto, |
304 dateInfo = pgtype.Timestamptz{ | 253 LowerType: pgtype.Inclusive, |
305 Time: time.Time(*dr.Infodate), | 254 UpperType: pgtype.Exclusive, |
306 Status: pgtype.Present, | 255 Status: pgtype.Present, |
307 } | 256 } |
308 } else { | 257 |
309 dateInfo = pgtype.Timestamptz{ | 258 if dr.Infodate != nil { |
310 Status: pgtype.Null, | 259 dateInfo = pgtype.Timestamptz{ |
311 } | 260 Time: time.Time(*dr.Infodate), |
312 } | 261 Status: pgtype.Present, |
313 | 262 } |
314 var geodref sql.NullString | 263 } else { |
315 if dr.Geodref != nil { | 264 dateInfo = pgtype.Timestamptz{ |
316 geodref = sql.NullString{ | 265 Status: pgtype.Null, |
317 String: string(*dr.Geodref), | 266 } |
318 Valid: true, | 267 } |
319 } | 268 |
320 } | 269 var geodref sql.NullString |
321 | 270 if dr.Geodref != nil { |
322 var source sql.NullString | 271 geodref = sql.NullString{ |
323 if dr.Source != nil { | 272 String: string(*dr.Geodref), |
324 source = sql.NullString{ | 273 Valid: true, |
325 String: string(*dr.Source), | 274 } |
326 Valid: true, | 275 } |
327 } | 276 |
328 } | 277 var source sql.NullString |
329 | 278 if dr.Source != nil { |
330 tx, err := conn.BeginTx(ctx, nil) | 279 source = sql.NullString{ |
331 if err != nil { | 280 String: string(*dr.Source), |
332 return nil, err | 281 Valid: true, |
333 } | 282 } |
334 defer tx.Rollback() | 283 } |
335 | 284 |
336 // Mark old entries of gauge as erased, if applicable | 285 tx, err := conn.BeginTx(ctx, nil) |
337 if _, err := tx.StmtContext(ctx, eraseGaugeStmt).ExecContext(ctx, | 286 if err != nil { |
338 ic.code.String(), | 287 return nil, err |
339 validity, | 288 } |
340 ); err != nil { | 289 defer tx.Rollback() |
341 feedback.Warn(handleError(err).Error()) | 290 |
342 if err2 := tx.Rollback(); err2 != nil { | 291 // Mark old entries of gauge as erased, if applicable |
343 return nil, err2 | 292 if _, err := tx.StmtContext(ctx, eraseGaugeStmt).ExecContext(ctx, |
344 } | 293 code.String(), |
345 unchanged++ | 294 validity, |
346 continue | 295 ); err != nil { |
347 } | 296 feedback.Warn(handleError(err).Error()) |
348 | 297 if err2 := tx.Rollback(); err2 != nil { |
349 // Try to insert gauge entry | 298 return nil, err2 |
350 var dummy int | 299 } |
351 err = tx.StmtContext(ctx, insertStmt).QueryRowContext(ctx, | 300 unchanged++ |
352 ic.code.CountryCode, | 301 continue |
353 ic.code.LoCode, | 302 } |
354 ic.code.FairwaySection, | 303 |
355 ic.code.Orc, | 304 // Try to insert gauge entry |
356 ic.code.Hectometre, | 305 var dummy int |
357 string(*dr.Objname.Loc), | 306 err = tx.StmtContext(ctx, insertStmt).QueryRowContext(ctx, |
358 float64(*dr.Lon), float64(*dr.Lat), | 307 code.CountryCode, |
359 from, | 308 code.LoCode, |
360 to, | 309 code.FairwaySection, |
361 &validity, | 310 code.Orc, |
362 float64(*dr.Zeropoint), | 311 code.Hectometre, |
363 geodref, | 312 dr.Objname.Loc, |
364 &dateInfo, | 313 dr.Lon, dr.Lat, |
365 source, | |
366 time.Time(*dr.Lastupdate), | |
367 ).Scan(&dummy) | |
368 switch { | |
369 case err == sql.ErrNoRows: | |
370 // Assume constraint conflict, try to update | |
371 err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx, | |
372 ic.code.CountryCode, | |
373 ic.code.LoCode, | |
374 ic.code.FairwaySection, | |
375 ic.code.Orc, | |
376 ic.code.Hectometre, | |
377 string(*dr.Objname.Loc), | |
378 float64(*dr.Lon), float64(*dr.Lat), | |
379 from, | 314 from, |
380 to, | 315 to, |
381 float64(*dr.Zeropoint), | 316 &validity, |
317 dr.Zeropoint, | |
382 geodref, | 318 geodref, |
383 &dateInfo, | 319 &dateInfo, |
384 source, | 320 source, |
385 time.Time(*dr.Lastupdate), | 321 time.Time(*dr.Lastupdate), |
386 ).Scan(&dummy) | 322 ).Scan(&dummy) |
387 switch { | 323 switch { |
388 case err2 == sql.ErrNoRows: | 324 case err == sql.ErrNoRows: |
389 feedback.Info("unchanged") | 325 // Assume constraint conflict, try to update |
390 if err3 := tx.Rollback(); err3 != nil { | 326 err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx, |
391 return nil, err3 | 327 code.CountryCode, |
392 } | 328 code.LoCode, |
393 unchanged++ | 329 code.FairwaySection, |
394 continue | 330 code.Orc, |
395 case err2 != nil: | 331 code.Hectometre, |
396 feedback.Warn(handleError(err2).Error()) | 332 dr.Objname.Loc, |
397 if err3 := tx.Rollback(); err3 != nil { | 333 dr.Lon, dr.Lat, |
398 return nil, err3 | 334 from, |
335 to, | |
336 dr.Zeropoint, | |
337 geodref, | |
338 &dateInfo, | |
339 source, | |
340 time.Time(*dr.Lastupdate), | |
341 ).Scan(&dummy) | |
342 switch { | |
343 case err2 == sql.ErrNoRows: | |
344 feedback.Info("unchanged") | |
345 if err3 := tx.Rollback(); err3 != nil { | |
346 return nil, err3 | |
347 } | |
348 unchanged++ | |
349 continue | |
350 case err2 != nil: | |
351 feedback.Warn(handleError(err2).Error()) | |
352 if err3 := tx.Rollback(); err3 != nil { | |
353 return nil, err3 | |
354 } | |
355 unchanged++ | |
356 continue | |
357 default: | |
358 feedback.Info("update") | |
359 } | |
360 | |
361 // Remove obsolete reference water levels | |
362 var currLevels pgtype.VarcharArray | |
363 currLevels.Set([]string{ | |
364 string(*dr.Reflevel1code), | |
365 string(*dr.Reflevel2code), | |
366 string(*dr.Reflevel3code), | |
367 }) | |
368 rwls, err := tx.StmtContext(ctx, | |
369 deleteReferenceWaterLevelsStmt).QueryContext(ctx, | |
370 code.String(), | |
371 &validity, | |
372 &currLevels, | |
373 ) | |
374 if err != nil { | |
375 return nil, err | |
376 } | |
377 defer rwls.Close() | |
378 for rwls.Next() { | |
379 var delRef string | |
380 if err := rwls.Scan(&delRef); err != nil { | |
381 return nil, err | |
382 } | |
383 feedback.Warn("Removed reference water level %s from %s", | |
384 delRef, code) | |
385 } | |
386 if err := rwls.Err(); err != nil { | |
387 return nil, err | |
388 } | |
389 case err != nil: | |
390 feedback.Warn(handleError(err).Error()) | |
391 if err2 := tx.Rollback(); err2 != nil { | |
392 return nil, err2 | |
399 } | 393 } |
400 unchanged++ | 394 unchanged++ |
401 continue | 395 continue |
402 default: | 396 default: |
403 feedback.Info("update") | 397 feedback.Info("insert new version") |
404 } | 398 } |
405 | 399 |
406 // Remove obsolete reference water levels | 400 // "Upsert" reference water levels |
407 var currLevels pgtype.VarcharArray | 401 for _, wl := range []struct { |
408 currLevels.Set([]string{ | 402 level **erdms.RisreflevelcodeType |
409 string(*dr.Reflevel1code), | 403 value **erdms.RisreflevelvalueType |
410 string(*dr.Reflevel2code), | 404 }{ |
411 string(*dr.Reflevel3code), | 405 {&dr.Reflevel1code, &dr.Reflevel1value}, |
412 }) | 406 {&dr.Reflevel2code, &dr.Reflevel2value}, |
413 rwls, err := tx.StmtContext(ctx, | 407 {&dr.Reflevel3code, &dr.Reflevel3value}, |
414 deleteReferenceWaterLevelsStmt).QueryContext(ctx, | 408 } { |
415 ic.code.String(), | 409 if *wl.level == nil || *wl.value == nil { |
416 &validity, | 410 continue |
417 &currLevels, | 411 } |
418 ) | 412 |
419 if err != nil { | 413 var isNtSDepthRef bool |
414 if err := tx.StmtContext( | |
415 ctx, isNtSDepthRefStmt).QueryRowContext(ctx, | |
416 string(**wl.level), | |
417 ).Scan( | |
418 &isNtSDepthRef, | |
419 ); err != nil { | |
420 return nil, err | |
421 } | |
422 if !isNtSDepthRef { | |
423 feedback.Warn( | |
424 "Reference level code '%s' is not in line "+ | |
425 "with the NtS reference_code table", | |
426 string(**wl.level)) | |
427 } | |
428 | |
429 if _, err := tx.StmtContext( | |
430 ctx, insertWaterLevelStmt).ExecContext(ctx, | |
431 code.CountryCode, | |
432 code.LoCode, | |
433 code.FairwaySection, | |
434 code.Orc, | |
435 code.Hectometre, | |
436 &validity, | |
437 string(**wl.level), | |
438 int64(**wl.value), | |
439 ); err != nil { | |
440 feedback.Warn(handleError(err).Error()) | |
441 tx.Rollback() | |
442 continue | |
443 } | |
444 } | |
445 | |
446 if err = tx.Commit(); err != nil { | |
420 return nil, err | 447 return nil, err |
421 } | 448 } |
422 defer rwls.Close() | |
423 for rwls.Next() { | |
424 var delRef string | |
425 if err := rwls.Scan(&delRef); err != nil { | |
426 return nil, err | |
427 } | |
428 feedback.Warn("Removed reference water level %s from %s", | |
429 delRef, ic.code) | |
430 } | |
431 if err := rwls.Err(); err != nil { | |
432 return nil, err | |
433 } | |
434 case err != nil: | |
435 feedback.Warn(handleError(err).Error()) | |
436 if err2 := tx.Rollback(); err2 != nil { | |
437 return nil, err2 | |
438 } | |
439 unchanged++ | |
440 continue | |
441 default: | |
442 feedback.Info("insert new version") | |
443 } | |
444 | |
445 // "Upsert" reference water levels | |
446 for _, wl := range []struct { | |
447 level **erdms.RisreflevelcodeType | |
448 value **erdms.RisreflevelvalueType | |
449 }{ | |
450 {&dr.Reflevel1code, &dr.Reflevel1value}, | |
451 {&dr.Reflevel2code, &dr.Reflevel2value}, | |
452 {&dr.Reflevel3code, &dr.Reflevel3value}, | |
453 } { | |
454 if *wl.level == nil || *wl.value == nil { | |
455 continue | |
456 } | |
457 | |
458 var isNtSDepthRef bool | |
459 if err := tx.StmtContext(ctx, isNtSDepthRefStmt).QueryRowContext( | |
460 ctx, | |
461 string(**wl.level), | |
462 ).Scan( | |
463 &isNtSDepthRef, | |
464 ); err != nil { | |
465 return nil, err | |
466 } | |
467 if !isNtSDepthRef { | |
468 feedback.Warn( | |
469 "Reference level code '%s' is not in line "+ | |
470 "with the NtS reference_code table", | |
471 string(**wl.level)) | |
472 } | |
473 | |
474 if _, err := tx.StmtContext(ctx, insertWaterLevelStmt).ExecContext( | |
475 ctx, | |
476 ic.code.CountryCode, | |
477 ic.code.LoCode, | |
478 ic.code.FairwaySection, | |
479 ic.code.Orc, | |
480 ic.code.Hectometre, | |
481 &validity, | |
482 string(**wl.level), | |
483 int64(**wl.value), | |
484 ); err != nil { | |
485 feedback.Warn(handleError(err).Error()) | |
486 tx.Rollback() | |
487 continue | |
488 } | |
489 } | |
490 | |
491 if err = tx.Commit(); err != nil { | |
492 return nil, err | |
493 } | 449 } |
494 } | 450 } |
495 | 451 |
496 feedback.Info("Importing gauges took %s", | 452 feedback.Info("Importing gauges took %s", |
497 time.Since(start)) | 453 time.Since(start)) |
498 | 454 |
455 if len(gauges) == 0 { | |
456 return nil, UnchangedError("No gauges returned from ERDMS") | |
457 } | |
458 | |
499 if unchanged == len(gauges) { | 459 if unchanged == len(gauges) { |
500 return nil, UnchangedError("All gauges unchanged") | 460 return nil, UnchangedError("All gauges unchanged") |
501 } | 461 } |
502 | 462 |
503 return nil, err | 463 return nil, err |