Mercurial > gemma
comparison pkg/imports/wg.go @ 1836:4dcdd8891770
Waterway gauges import: Fixed insert/update of gauges. TODO: Re-insert reference water levels.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 16 Jan 2019 22:39:24 +0100 |
parents | f7b926440449 |
children | 00d63eb9306a |
comparison
equal
deleted
inserted
replaced
1835:f7b926440449 | 1836:4dcdd8891770 |
---|---|
84 WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)` | 84 WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)` |
85 | 85 |
86 deleteReferenceWaterLevelsSQL = ` | 86 deleteReferenceWaterLevelsSQL = ` |
87 DELETE FROM waterway.gauges_reference_water_levels | 87 DELETE FROM waterway.gauges_reference_water_levels |
88 WHERE gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)` | 88 WHERE gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)` |
89 | |
90 deleteGaugeSQL = ` | |
91 DELETE FROM waterway.gauges | |
92 WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)` | |
93 | 89 |
94 insertGaugeSQL = ` | 90 insertGaugeSQL = ` |
95 INSERT INTO waterway.gauges ( | 91 INSERT INTO waterway.gauges ( |
96 location, | 92 location, |
97 objname, | 93 objname, |
112 $11, | 108 $11, |
113 $12, | 109 $12, |
114 $13, | 110 $13, |
115 $14, | 111 $14, |
116 $15 | 112 $15 |
117 )` | 113 ) ON CONFLICT (location) DO UPDATE SET |
114 objname = $6, | |
115 geom = ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography, | |
116 applicability_from_km = $9, | |
117 applicability_to_km = $10, | |
118 validity = $11, | |
119 zero_point = $12, | |
120 geodref = $13, | |
121 date_info = $14, | |
122 source_organization = $15 | |
123 ` | |
118 ) | 124 ) |
119 | 125 |
120 func (wg *WaterwayGauge) Do( | 126 func (wg *WaterwayGauge) Do( |
121 ctx context.Context, | 127 ctx context.Context, |
122 importID int64, | 128 importID int64, |
178 idx int | 184 idx int |
179 code *models.Isrs | 185 code *models.Isrs |
180 } | 186 } |
181 | 187 |
182 var news, olds []idxCode | 188 var news, olds []idxCode |
183 | |
184 const layout = "2006-01-02T15:04:05.999-07:00" | |
185 | 189 |
186 for i, dr := range data.RisdataReturn { | 190 for i, dr := range data.RisdataReturn { |
187 if dr.RisidxCode == nil { | 191 if dr.RisidxCode == nil { |
188 ignored++ | 192 ignored++ |
189 continue | 193 continue |
221 code.Orc, | 225 code.Orc, |
222 code.Hectometre, | 226 code.Hectometre, |
223 ).Scan(&dummy) | 227 ).Scan(&dummy) |
224 switch { | 228 switch { |
225 case err == sql.ErrNoRows: | 229 case err == sql.ErrNoRows: |
226 olds = append(olds, idxCode{idx: i, code: code}) | 230 news = append(news, idxCode{idx: i, code: code}) |
227 case err != nil: | 231 case err != nil: |
228 return nil, err | 232 return nil, err |
229 case !dummy: | 233 case !dummy: |
230 return nil, errors.New("Unexpected result") | 234 return nil, errors.New("Unexpected result") |
231 default: | 235 default: |
232 news = append(news, idxCode{idx: i, code: code}) | 236 olds = append(olds, idxCode{idx: i, code: code}) |
233 } | 237 } |
234 } | 238 } |
235 feedback.Info("ignored gauges: %d", ignored) | 239 feedback.Info("ignored gauges: %d", ignored) |
236 feedback.Info("new gauges: %d", len(news)) | 240 feedback.Info("new gauges: %d", len(news)) |
237 feedback.Info("update gauges: %d", len(olds)) | 241 feedback.Info("update gauges: %d", len(olds)) |
238 | 242 |
239 if len(news) == 0 && len(olds) == 0 { | 243 if len(news) == 0 && len(olds) == 0 { |
240 return nil, errors.New("nothing to do") | 244 return nil, errors.New("nothing to do") |
241 } | 245 } |
242 | 246 |
243 // delete the old | 247 // delete reference water leves of the old. |
244 if len(olds) > 0 { | 248 if len(olds) > 0 { |
245 deleteReferenceWaterLevelsStmt, err := tx.PrepareContext(ctx, deleteReferenceWaterLevelsSQL) | 249 deleteReferenceWaterLevelsStmt, err := tx.PrepareContext( |
250 ctx, deleteReferenceWaterLevelsSQL) | |
246 if err != nil { | 251 if err != nil { |
247 return nil, err | 252 return nil, err |
248 } | 253 } |
249 defer deleteReferenceWaterLevelsStmt.Close() | 254 defer deleteReferenceWaterLevelsStmt.Close() |
250 deleteGaugeStmt, err := tx.PrepareContext(ctx, deleteGaugeSQL) | |
251 if err != nil { | |
252 return nil, err | |
253 } | |
254 for i := range olds { | 255 for i := range olds { |
255 ic := &olds[i] | 256 code := olds[i].code |
256 | |
257 if _, err := deleteReferenceWaterLevelsStmt.ExecContext(ctx, | 257 if _, err := deleteReferenceWaterLevelsStmt.ExecContext(ctx, |
258 ic.code.CountryCode, | 258 code.CountryCode, |
259 ic.code.LoCode, | 259 code.LoCode, |
260 ic.code.FairwaySection, | 260 code.FairwaySection, |
261 ic.code.Orc, | 261 code.Orc, |
262 ic.code.Hectometre, | 262 code.Hectometre, |
263 ); err != nil { | 263 ); err != nil { |
264 return nil, err | 264 return nil, err |
265 } | 265 } |
266 if _, err := deleteGaugeStmt.ExecContext(ctx, | |
267 ic.code.CountryCode, | |
268 ic.code.LoCode, | |
269 ic.code.FairwaySection, | |
270 ic.code.Orc, | |
271 ic.code.Hectometre, | |
272 ); err != nil { | |
273 return nil, err | |
274 } | |
275 } | 266 } |
276 // treat them as new | 267 // treat them as new |
277 news = append(news, olds...) | 268 news = append(news, olds...) |
278 } | 269 } |
279 | 270 |
280 if len(news) == 0 { | |
281 return nil, errors.New("nothing to do") | |
282 } | |
283 | |
284 insertStmt, err := tx.PrepareContext(ctx, insertGaugeSQL) | 271 insertStmt, err := tx.PrepareContext(ctx, insertGaugeSQL) |
285 if err != nil { | 272 if err != nil { |
286 return nil, err | 273 return nil, err |
287 } | 274 } |
288 defer insertStmt.Close() | 275 defer insertStmt.Close() |
289 | 276 |
290 // (re)-insert the gauges | 277 // insert/update the gauges |
291 for i := range news { | 278 for i := range news { |
292 ic := &news[i] | 279 ic := &news[i] |
293 dr := data.RisdataReturn[ic.idx] | 280 dr := data.RisdataReturn[ic.idx] |
281 | |
282 feedback.Info("insert/update %s", ic.code) | |
294 | 283 |
295 var from, to sql.NullInt64 | 284 var from, to sql.NullInt64 |
296 | 285 |
297 if dr.Applicabilityfromkm != nil { | 286 if dr.Applicabilityfromkm != nil { |
298 from = sql.NullInt64{ | 287 from = sql.NullInt64{ |
308 } | 297 } |
309 | 298 |
310 var tfrom, tto, dateInfo pgtype.Timestamptz | 299 var tfrom, tto, dateInfo pgtype.Timestamptz |
311 | 300 |
312 if dr.Startdate != nil { | 301 if dr.Startdate != nil { |
313 tfrom = pgtype.Timestamptz{Time: time.Time(*dr.Startdate)} | 302 tfrom = pgtype.Timestamptz{ |
303 Time: time.Time(*dr.Startdate), | |
304 Status: pgtype.Present, | |
305 } | |
314 } else { | 306 } else { |
315 tfrom = pgtype.Timestamptz{Status: pgtype.Null} | 307 tfrom = pgtype.Timestamptz{ |
308 Status: pgtype.Null, | |
309 } | |
316 } | 310 } |
317 | 311 |
318 if dr.Enddate != nil { | 312 if dr.Enddate != nil { |
319 tto = pgtype.Timestamptz{Time: time.Time(*dr.Enddate)} | 313 tto = pgtype.Timestamptz{ |
314 Time: time.Time(*dr.Enddate), | |
315 Status: pgtype.Present, | |
316 } | |
320 } else { | 317 } else { |
321 tto = pgtype.Timestamptz{Status: pgtype.Null} | 318 tto = pgtype.Timestamptz{ |
319 Status: pgtype.Null, | |
320 } | |
322 } | 321 } |
323 | 322 |
324 validity := pgtype.Tstzrange{ | 323 validity := pgtype.Tstzrange{ |
325 Lower: tfrom, | 324 Lower: tfrom, |
326 Upper: tto, | 325 Upper: tto, |
326 LowerType: pgtype.Inclusive, | |
327 UpperType: pgtype.Inclusive, | |
328 Status: pgtype.Present, | |
327 } | 329 } |
328 | 330 |
329 if dr.Infodate != nil { | 331 if dr.Infodate != nil { |
330 dateInfo = pgtype.Timestamptz{Time: time.Time(*dr.Infodate)} | 332 dateInfo = pgtype.Timestamptz{ |
333 Time: time.Time(*dr.Infodate), | |
334 Status: pgtype.Present, | |
335 } | |
331 } else { | 336 } else { |
332 dateInfo = pgtype.Timestamptz{Status: pgtype.Null} | 337 dateInfo = pgtype.Timestamptz{ |
338 Status: pgtype.Null, | |
339 } | |
333 } | 340 } |
334 | 341 |
335 var geodref sql.NullString | 342 var geodref sql.NullString |
336 if dr.Geodref != nil { | 343 if dr.Geodref != nil { |
337 geodref = sql.NullString{String: string(*dr.Geodref), Valid: true} | 344 geodref = sql.NullString{ |
345 String: string(*dr.Geodref), | |
346 Valid: true, | |
347 } | |
338 } | 348 } |
339 | 349 |
340 var source sql.NullString | 350 var source sql.NullString |
341 if dr.Source != nil { | 351 if dr.Source != nil { |
342 source = sql.NullString{String: string(*dr.Source), Valid: true} | 352 source = sql.NullString{ |
353 String: string(*dr.Source), | |
354 Valid: true, | |
355 } | |
343 } | 356 } |
344 | 357 |
345 if _, err := insertStmt.ExecContext(ctx, | 358 if _, err := insertStmt.ExecContext(ctx, |
346 ic.code.CountryCode, | 359 ic.code.CountryCode, |
347 ic.code.LoCode, | 360 ic.code.LoCode, |
350 ic.code.Hectometre, | 363 ic.code.Hectometre, |
351 string(*dr.Objname.Loc), | 364 string(*dr.Objname.Loc), |
352 int64(*dr.Lat), int64(*dr.Lon), | 365 int64(*dr.Lat), int64(*dr.Lon), |
353 from, | 366 from, |
354 to, | 367 to, |
355 validity, | 368 &validity, |
356 float64(*dr.Zeropoint), | 369 float64(*dr.Zeropoint), |
357 geodref, | 370 geodref, |
358 dateInfo, | 371 &dateInfo, |
359 source, | 372 source, |
360 ); err != nil { | 373 ); err != nil { |
361 return nil, err | 374 return nil, err |
362 } | 375 } |
363 | 376 |