comparison pkg/imports/wg.go @ 1836:4dcdd8891770

Waterway gauges import: Fixed insert/update of gauges. TODO: Re-insert reference water levels.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 16 Jan 2019 22:39:24 +0100
parents f7b926440449
children 00d63eb9306a
comparison
equal deleted inserted replaced
1835:f7b926440449 1836:4dcdd8891770
84 WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)` 84 WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)`
85 85
86 deleteReferenceWaterLevelsSQL = ` 86 deleteReferenceWaterLevelsSQL = `
87 DELETE FROM waterway.gauges_reference_water_levels 87 DELETE FROM waterway.gauges_reference_water_levels
88 WHERE gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)` 88 WHERE gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)`
89
90 deleteGaugeSQL = `
91 DELETE FROM waterway.gauges
92 WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)`
93 89
94 insertGaugeSQL = ` 90 insertGaugeSQL = `
95 INSERT INTO waterway.gauges ( 91 INSERT INTO waterway.gauges (
96 location, 92 location,
97 objname, 93 objname,
112 $11, 108 $11,
113 $12, 109 $12,
114 $13, 110 $13,
115 $14, 111 $14,
116 $15 112 $15
117 )` 113 ) ON CONFLICT (location) DO UPDATE SET
114 objname = $6,
115 geom = ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography,
116 applicability_from_km = $9,
117 applicability_to_km = $10,
118 validity = $11,
119 zero_point = $12,
120 geodref = $13,
121 date_info = $14,
122 source_organization = $15
123 `
118 ) 124 )
119 125
120 func (wg *WaterwayGauge) Do( 126 func (wg *WaterwayGauge) Do(
121 ctx context.Context, 127 ctx context.Context,
122 importID int64, 128 importID int64,
178 idx int 184 idx int
179 code *models.Isrs 185 code *models.Isrs
180 } 186 }
181 187
182 var news, olds []idxCode 188 var news, olds []idxCode
183
184 const layout = "2006-01-02T15:04:05.999-07:00"
185 189
186 for i, dr := range data.RisdataReturn { 190 for i, dr := range data.RisdataReturn {
187 if dr.RisidxCode == nil { 191 if dr.RisidxCode == nil {
188 ignored++ 192 ignored++
189 continue 193 continue
221 code.Orc, 225 code.Orc,
222 code.Hectometre, 226 code.Hectometre,
223 ).Scan(&dummy) 227 ).Scan(&dummy)
224 switch { 228 switch {
225 case err == sql.ErrNoRows: 229 case err == sql.ErrNoRows:
226 olds = append(olds, idxCode{idx: i, code: code}) 230 news = append(news, idxCode{idx: i, code: code})
227 case err != nil: 231 case err != nil:
228 return nil, err 232 return nil, err
229 case !dummy: 233 case !dummy:
230 return nil, errors.New("Unexpected result") 234 return nil, errors.New("Unexpected result")
231 default: 235 default:
232 news = append(news, idxCode{idx: i, code: code}) 236 olds = append(olds, idxCode{idx: i, code: code})
233 } 237 }
234 } 238 }
235 feedback.Info("ignored gauges: %d", ignored) 239 feedback.Info("ignored gauges: %d", ignored)
236 feedback.Info("new gauges: %d", len(news)) 240 feedback.Info("new gauges: %d", len(news))
237 feedback.Info("update gauges: %d", len(olds)) 241 feedback.Info("update gauges: %d", len(olds))
238 242
239 if len(news) == 0 && len(olds) == 0 { 243 if len(news) == 0 && len(olds) == 0 {
240 return nil, errors.New("nothing to do") 244 return nil, errors.New("nothing to do")
241 } 245 }
242 246
243 // delete the old 247 // delete reference water leves of the old.
244 if len(olds) > 0 { 248 if len(olds) > 0 {
245 deleteReferenceWaterLevelsStmt, err := tx.PrepareContext(ctx, deleteReferenceWaterLevelsSQL) 249 deleteReferenceWaterLevelsStmt, err := tx.PrepareContext(
250 ctx, deleteReferenceWaterLevelsSQL)
246 if err != nil { 251 if err != nil {
247 return nil, err 252 return nil, err
248 } 253 }
249 defer deleteReferenceWaterLevelsStmt.Close() 254 defer deleteReferenceWaterLevelsStmt.Close()
250 deleteGaugeStmt, err := tx.PrepareContext(ctx, deleteGaugeSQL)
251 if err != nil {
252 return nil, err
253 }
254 for i := range olds { 255 for i := range olds {
255 ic := &olds[i] 256 code := olds[i].code
256
257 if _, err := deleteReferenceWaterLevelsStmt.ExecContext(ctx, 257 if _, err := deleteReferenceWaterLevelsStmt.ExecContext(ctx,
258 ic.code.CountryCode, 258 code.CountryCode,
259 ic.code.LoCode, 259 code.LoCode,
260 ic.code.FairwaySection, 260 code.FairwaySection,
261 ic.code.Orc, 261 code.Orc,
262 ic.code.Hectometre, 262 code.Hectometre,
263 ); err != nil { 263 ); err != nil {
264 return nil, err 264 return nil, err
265 } 265 }
266 if _, err := deleteGaugeStmt.ExecContext(ctx,
267 ic.code.CountryCode,
268 ic.code.LoCode,
269 ic.code.FairwaySection,
270 ic.code.Orc,
271 ic.code.Hectometre,
272 ); err != nil {
273 return nil, err
274 }
275 } 266 }
276 // treat them as new 267 // treat them as new
277 news = append(news, olds...) 268 news = append(news, olds...)
278 } 269 }
279 270
280 if len(news) == 0 {
281 return nil, errors.New("nothing to do")
282 }
283
284 insertStmt, err := tx.PrepareContext(ctx, insertGaugeSQL) 271 insertStmt, err := tx.PrepareContext(ctx, insertGaugeSQL)
285 if err != nil { 272 if err != nil {
286 return nil, err 273 return nil, err
287 } 274 }
288 defer insertStmt.Close() 275 defer insertStmt.Close()
289 276
290 // (re)-insert the gauges 277 // insert/update the gauges
291 for i := range news { 278 for i := range news {
292 ic := &news[i] 279 ic := &news[i]
293 dr := data.RisdataReturn[ic.idx] 280 dr := data.RisdataReturn[ic.idx]
281
282 feedback.Info("insert/update %s", ic.code)
294 283
295 var from, to sql.NullInt64 284 var from, to sql.NullInt64
296 285
297 if dr.Applicabilityfromkm != nil { 286 if dr.Applicabilityfromkm != nil {
298 from = sql.NullInt64{ 287 from = sql.NullInt64{
308 } 297 }
309 298
310 var tfrom, tto, dateInfo pgtype.Timestamptz 299 var tfrom, tto, dateInfo pgtype.Timestamptz
311 300
312 if dr.Startdate != nil { 301 if dr.Startdate != nil {
313 tfrom = pgtype.Timestamptz{Time: time.Time(*dr.Startdate)} 302 tfrom = pgtype.Timestamptz{
303 Time: time.Time(*dr.Startdate),
304 Status: pgtype.Present,
305 }
314 } else { 306 } else {
315 tfrom = pgtype.Timestamptz{Status: pgtype.Null} 307 tfrom = pgtype.Timestamptz{
308 Status: pgtype.Null,
309 }
316 } 310 }
317 311
318 if dr.Enddate != nil { 312 if dr.Enddate != nil {
319 tto = pgtype.Timestamptz{Time: time.Time(*dr.Enddate)} 313 tto = pgtype.Timestamptz{
314 Time: time.Time(*dr.Enddate),
315 Status: pgtype.Present,
316 }
320 } else { 317 } else {
321 tto = pgtype.Timestamptz{Status: pgtype.Null} 318 tto = pgtype.Timestamptz{
319 Status: pgtype.Null,
320 }
322 } 321 }
323 322
324 validity := pgtype.Tstzrange{ 323 validity := pgtype.Tstzrange{
325 Lower: tfrom, 324 Lower: tfrom,
326 Upper: tto, 325 Upper: tto,
326 LowerType: pgtype.Inclusive,
327 UpperType: pgtype.Inclusive,
328 Status: pgtype.Present,
327 } 329 }
328 330
329 if dr.Infodate != nil { 331 if dr.Infodate != nil {
330 dateInfo = pgtype.Timestamptz{Time: time.Time(*dr.Infodate)} 332 dateInfo = pgtype.Timestamptz{
333 Time: time.Time(*dr.Infodate),
334 Status: pgtype.Present,
335 }
331 } else { 336 } else {
332 dateInfo = pgtype.Timestamptz{Status: pgtype.Null} 337 dateInfo = pgtype.Timestamptz{
338 Status: pgtype.Null,
339 }
333 } 340 }
334 341
335 var geodref sql.NullString 342 var geodref sql.NullString
336 if dr.Geodref != nil { 343 if dr.Geodref != nil {
337 geodref = sql.NullString{String: string(*dr.Geodref), Valid: true} 344 geodref = sql.NullString{
345 String: string(*dr.Geodref),
346 Valid: true,
347 }
338 } 348 }
339 349
340 var source sql.NullString 350 var source sql.NullString
341 if dr.Source != nil { 351 if dr.Source != nil {
342 source = sql.NullString{String: string(*dr.Source), Valid: true} 352 source = sql.NullString{
353 String: string(*dr.Source),
354 Valid: true,
355 }
343 } 356 }
344 357
345 if _, err := insertStmt.ExecContext(ctx, 358 if _, err := insertStmt.ExecContext(ctx,
346 ic.code.CountryCode, 359 ic.code.CountryCode,
347 ic.code.LoCode, 360 ic.code.LoCode,
350 ic.code.Hectometre, 363 ic.code.Hectometre,
351 string(*dr.Objname.Loc), 364 string(*dr.Objname.Loc),
352 int64(*dr.Lat), int64(*dr.Lon), 365 int64(*dr.Lat), int64(*dr.Lon),
353 from, 366 from,
354 to, 367 to,
355 validity, 368 &validity,
356 float64(*dr.Zeropoint), 369 float64(*dr.Zeropoint),
357 geodref, 370 geodref,
358 dateInfo, 371 &dateInfo,
359 source, 372 source,
360 ); err != nil { 373 ); err != nil {
361 return nil, err 374 return nil, err
362 } 375 }
363 376