comparison pkg/imports/wg.go @ 3163:d9903cb34842

Handle failing INSERTs gracefully during gauges import Using the special table EXCLUDED in INSERT statements makes functionally no difference, but makes editing of the statements easier. Since reference water levels are not deleted all at once before (re-)importing anymore, take the chance to report those that were deleted.
author Tom Gottfried <tom@intevation.de>
date Mon, 06 May 2019 13:25:49 +0200
parents eb1d119f253f
children 4acbee65275d
comparison
equal deleted inserted replaced
3162:659549608644 3163:d9903cb34842
15 package imports 15 package imports
16 16
17 import ( 17 import (
18 "context" 18 "context"
19 "database/sql" 19 "database/sql"
20 "errors"
21 "time" 20 "time"
22 21
23 "github.com/jackc/pgx/pgtype" 22 "github.com/jackc/pgx/pgtype"
24 23
25 "gemma.intevation.de/gemma/pkg/models" 24 "gemma.intevation.de/gemma/pkg/models"
64 63
65 // CleanUp does nothing as there is nothing to cleanup with gauges. 64 // CleanUp does nothing as there is nothing to cleanup with gauges.
66 func (*WaterwayGauge) CleanUp() error { return nil } 65 func (*WaterwayGauge) CleanUp() error { return nil }
67 66
68 const ( 67 const (
69 hasGaugeSQL = `
70 SELECT true
71 FROM waterway.gauges
72 WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)`
73
74 deleteReferenceWaterLevelsSQL = ` 68 deleteReferenceWaterLevelsSQL = `
75 DELETE FROM waterway.gauges_reference_water_levels 69 DELETE FROM waterway.gauges_reference_water_levels
76 WHERE gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)` 70 WHERE gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
71 AND depth_reference <> ALL($6)
72 RETURNING depth_reference
73 `
77 74
78 insertGaugeSQL = ` 75 insertGaugeSQL = `
79 INSERT INTO waterway.gauges ( 76 INSERT INTO waterway.gauges (
80 location, 77 location,
81 objname, 78 objname,
97 $12, 94 $12,
98 $13, 95 $13,
99 $14, 96 $14,
100 $15 97 $15
101 ) ON CONFLICT (location) DO UPDATE SET 98 ) ON CONFLICT (location) DO UPDATE SET
102 objname = $6, 99 objname = EXCLUDED.objname,
103 geom = ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography, 100 geom = EXCLUDED.geom,
104 applicability_from_km = $9, 101 applicability_from_km = EXCLUDED.applicability_from_km,
105 applicability_to_km = $10, 102 applicability_to_km = EXCLUDED.applicability_to_km,
106 validity = $11, 103 validity = EXCLUDED.validity,
107 zero_point = $12, 104 zero_point = EXCLUDED.zero_point,
108 geodref = $13, 105 geodref = EXCLUDED.geodref,
109 date_info = $14, 106 date_info = EXCLUDED.date_info,
110 source_organization = $15 107 source_organization = EXCLUDED.source_organization
111 ` 108 `
109
112 isNtSDepthRefSQL = ` 110 isNtSDepthRefSQL = `
113 SELECT EXISTS(SELECT 1 FROM depth_references WHERE depth_reference = $1)` 111 SELECT EXISTS(SELECT 1 FROM depth_references WHERE depth_reference = $1)`
114 112
115 insertReferenceWaterLevelsSQL = ` 113 insertReferenceWaterLevelsSQL = `
116 INSERT INTO waterway.gauges_reference_water_levels ( 114 INSERT INTO waterway.gauges_reference_water_levels (
119 value 117 value
120 ) VALUES ( 118 ) VALUES (
121 ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), 119 ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
122 $6, 120 $6,
123 $7 121 $7
124 ) 122 ) ON CONFLICT (gauge_id, depth_reference) DO UPDATE SET
123 value = EXCLUDED.value
125 ` 124 `
126 ) 125 )
127 126
128 func (wg *WaterwayGauge) Do( 127 func (wg *WaterwayGauge) Do(
129 ctx context.Context, 128 ctx context.Context,
132 feedback Feedback, 131 feedback Feedback,
133 ) (interface{}, error) { 132 ) (interface{}, error) {
134 133
135 start := time.Now() 134 start := time.Now()
136 135
137 tx, err := conn.BeginTx(ctx, nil)
138 if err != nil {
139 return nil, err
140 }
141 defer tx.Rollback()
142
143 responseData, err := getRisData( 136 responseData, err := getRisData(
144 tx,
145 ctx, 137 ctx,
138 conn,
146 feedback, 139 feedback,
147 wg.Username, 140 wg.Username,
148 wg.Password, 141 wg.Password,
149 wg.URL, 142 wg.URL,
150 wg.Insecure, 143 wg.Insecure,
151 "wtwgag") 144 "wtwgag")
152 if err != nil { 145 if err != nil {
153 return nil, err 146 return nil, err
154 } 147 }
155 148
156 hasGaugeStmt, err := tx.PrepareContext(ctx, hasGaugeSQL)
157 if err != nil {
158 return nil, err
159 }
160 defer hasGaugeStmt.Close()
161
162 var ignored int 149 var ignored int
163 150
164 type idxCode struct { 151 type idxCode struct {
165 jdx int 152 jdx int
166 idx int 153 idx int
167 code *models.Isrs 154 code *models.Isrs
168 } 155 }
169 156
170 var news, olds []idxCode 157 var gauges []idxCode
171 158
172 for j, data := range responseData { 159 for j, data := range responseData {
173 for i, dr := range data.RisdataReturn { 160 for i, dr := range data.RisdataReturn {
174 if dr.RisidxCode == nil { 161 if dr.RisidxCode == nil {
175 ignored++ 162 ignored++
198 feedback.Warn("missing zeropoint: %s", code) 185 feedback.Warn("missing zeropoint: %s", code)
199 ignored++ 186 ignored++
200 continue 187 continue
201 } 188 }
202 189
203 var dummy bool 190 gauges = append(gauges, idxCode{jdx: j, idx: i, code: code})
204 err = hasGaugeStmt.QueryRowContext(ctx,
205 code.CountryCode,
206 code.LoCode,
207 code.FairwaySection,
208 code.Orc,
209 code.Hectometre,
210 ).Scan(&dummy)
211 switch {
212 case err == sql.ErrNoRows:
213 news = append(news, idxCode{jdx: j, idx: i, code: code})
214 case err != nil:
215 return nil, err
216 case !dummy:
217 return nil, errors.New("Unexpected result")
218 default:
219 olds = append(olds, idxCode{jdx: j, idx: i, code: code})
220 }
221 } 191 }
222 } 192 }
223 feedback.Info("ignored gauges: %d", ignored) 193 feedback.Info("ignored gauges: %d", ignored)
224 feedback.Info("new gauges: %d", len(news)) 194 feedback.Info("insert/update gauges: %d", len(gauges))
225 feedback.Info("update gauges: %d", len(olds)) 195
226 196 if len(gauges) == 0 {
227 if len(news) == 0 && len(olds) == 0 {
228 return nil, UnchangedError("nothing to do") 197 return nil, UnchangedError("nothing to do")
229 } 198 }
230 199
231 // delete reference water leves of the old. 200 // insert/update the gauges
232 if len(olds) > 0 { 201 var insertStmt, deleteReferenceWaterLevelsStmt,
233 deleteReferenceWaterLevelsStmt, err := tx.PrepareContext( 202 isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt
234 ctx, deleteReferenceWaterLevelsSQL) 203 for _, x := range []struct {
235 if err != nil { 204 sql string
205 stmt **sql.Stmt
206 }{
207 {insertGaugeSQL, &insertStmt},
208 {deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt},
209 {isNtSDepthRefSQL, &isNtSDepthRefStmt},
210 {insertReferenceWaterLevelsSQL, &insertWaterLevelStmt},
211 } {
212 var err error
213 if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil {
236 return nil, err 214 return nil, err
237 } 215 }
238 defer deleteReferenceWaterLevelsStmt.Close() 216 defer (*x.stmt).Close()
239 for i := range olds { 217 }
240 code := olds[i].code 218
241 if _, err := deleteReferenceWaterLevelsStmt.ExecContext(ctx, 219 for i := range gauges {
242 code.CountryCode, 220 ic := &gauges[i]
243 code.LoCode,
244 code.FairwaySection,
245 code.Orc,
246 code.Hectometre,
247 ); err != nil {
248 return nil, err
249 }
250 }
251 // treat them as new
252 news = append(news, olds...)
253 }
254
255 insertStmt, err := tx.PrepareContext(ctx, insertGaugeSQL)
256 if err != nil {
257 return nil, err
258 }
259 defer insertStmt.Close()
260
261 insertWaterLevelStmt, err := tx.PrepareContext(
262 ctx, insertReferenceWaterLevelsSQL)
263 if err != nil {
264 return nil, err
265 }
266 defer insertWaterLevelStmt.Close()
267
268 isNtSDepthRefStmt, err := tx.PrepareContext(ctx, isNtSDepthRefSQL)
269 if err != nil {
270 return nil, err
271 }
272 defer isNtSDepthRefStmt.Close()
273
274 // insert/update the gauges
275 for i := range news {
276 ic := &news[i]
277 dr := responseData[ic.jdx].RisdataReturn[ic.idx] 221 dr := responseData[ic.jdx].RisdataReturn[ic.idx]
278 222
279 feedback.Info("insert/update %s", ic.code) 223 feedback.Info("insert/update %s", ic.code)
280 224
281 var from, to sql.NullInt64 225 var from, to sql.NullInt64
350 String: string(*dr.Source), 294 String: string(*dr.Source),
351 Valid: true, 295 Valid: true,
352 } 296 }
353 } 297 }
354 298
355 if _, err := insertStmt.ExecContext(ctx, 299 tx, err := conn.BeginTx(ctx, nil)
300 if err != nil {
301 return nil, err
302 }
303 defer tx.Rollback()
304
305 if _, err := tx.StmtContext(ctx, insertStmt).ExecContext(ctx,
356 ic.code.CountryCode, 306 ic.code.CountryCode,
357 ic.code.LoCode, 307 ic.code.LoCode,
358 ic.code.FairwaySection, 308 ic.code.FairwaySection,
359 ic.code.Orc, 309 ic.code.Orc,
360 ic.code.Hectometre, 310 ic.code.Hectometre,
366 float64(*dr.Zeropoint), 316 float64(*dr.Zeropoint),
367 geodref, 317 geodref,
368 &dateInfo, 318 &dateInfo,
369 source, 319 source,
370 ); err != nil { 320 ); err != nil {
321 feedback.Warn(handleError(err).Error())
322 tx.Rollback()
323 continue
324 }
325
326 // Remove obsolete reference water levels
327 var currLevels pgtype.VarcharArray
328 currLevels.Set([]string{
329 string(*dr.Reflevel1code),
330 string(*dr.Reflevel2code),
331 string(*dr.Reflevel3code),
332 })
333 var delRef string
334 err = tx.StmtContext(
335 ctx, deleteReferenceWaterLevelsStmt).QueryRowContext(ctx,
336 ic.code.CountryCode,
337 ic.code.LoCode,
338 ic.code.FairwaySection,
339 ic.code.Orc,
340 ic.code.Hectometre,
341 &currLevels,
342 ).Scan(&delRef)
343 switch {
344 case err == sql.ErrNoRows:
345 // There was nothing to delete
346 case err != nil:
371 return nil, err 347 return nil, err
372 } 348 default:
373 349 feedback.Info("Removed reference water level %s from %s",
350 delRef, ic.code)
351 }
352
353 // Insert/update reference water levels
374 for _, wl := range []struct { 354 for _, wl := range []struct {
375 level **erdms.RisreflevelcodeType 355 level **erdms.RisreflevelcodeType
376 value **erdms.RisreflevelvalueType 356 value **erdms.RisreflevelvalueType
377 }{ 357 }{
378 {&dr.Reflevel1code, &dr.Reflevel1value}, 358 {&dr.Reflevel1code, &dr.Reflevel1value},
382 if *wl.level == nil || *wl.value == nil { 362 if *wl.level == nil || *wl.value == nil {
383 continue 363 continue
384 } 364 }
385 365
386 var isNtSDepthRef bool 366 var isNtSDepthRef bool
387 if err := isNtSDepthRefStmt.QueryRowContext( 367 if err := tx.StmtContext(ctx, isNtSDepthRefStmt).QueryRowContext(
388 ctx, 368 ctx,
389 string(**wl.level), 369 string(**wl.level),
390 ).Scan( 370 ).Scan(
391 &isNtSDepthRef, 371 &isNtSDepthRef,
392 ); err != nil { 372 ); err != nil {
393 return nil, err 373 return nil, err
394 } 374 }
395 if !isNtSDepthRef { 375 if !isNtSDepthRef {
396 feedback.Warn( 376 feedback.Warn(
397 "Reference level code '%s' is not in line with the NtS reference_code table", 377 "Reference level code '%s' is not in line "+
378 "with the NtS reference_code table",
398 string(**wl.level)) 379 string(**wl.level))
399 } 380 }
400 381
401 if _, err := insertWaterLevelStmt.ExecContext( 382 if _, err := tx.StmtContext(ctx, insertWaterLevelStmt).ExecContext(
402 ctx, 383 ctx,
403 ic.code.CountryCode, 384 ic.code.CountryCode,
404 ic.code.LoCode, 385 ic.code.LoCode,
405 ic.code.FairwaySection, 386 ic.code.FairwaySection,
406 ic.code.Orc, 387 ic.code.Orc,
407 ic.code.Hectometre, 388 ic.code.Hectometre,
408 string(**wl.level), 389 string(**wl.level),
409 int64(**wl.value), 390 int64(**wl.value),
410 ); err != nil { 391 ); err != nil {
411 return nil, err 392 feedback.Warn(handleError(err).Error())
412 } 393 tx.Rollback()
413 } 394 continue
414 } 395 }
415 396 }
416 if err = tx.Commit(); err == nil { 397
417 feedback.Info("Refreshing gauges successfully took %s.", 398 if err = tx.Commit(); err != nil {
418 time.Since(start)) 399 return nil, err
419 } else { 400 }
420 feedback.Error("Refreshing gauges failed after %s.", 401 }
421 time.Since(start)) 402
422 } 403 feedback.Info("Refreshing gauges took %s.",
404 time.Since(start))
423 405
424 return nil, err 406 return nil, err
425 } 407 }