Mercurial > gemma
comparison pkg/imports/wg.go @ 3163:d9903cb34842
Handle failing INSERTs gracefully during gauges import
Using the special table EXCLUDED in INSERT statements makes
functionally no difference, but makes editing of the statements easier.
Since reference water levels are not deleted all at once before
(re-)importing anymore, take the chance to report those that were
deleted.
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Mon, 06 May 2019 13:25:49 +0200 |
parents | eb1d119f253f |
children | 4acbee65275d |
comparison
equal
deleted
inserted
replaced
3162:659549608644 | 3163:d9903cb34842 |
---|---|
15 package imports | 15 package imports |
16 | 16 |
17 import ( | 17 import ( |
18 "context" | 18 "context" |
19 "database/sql" | 19 "database/sql" |
20 "errors" | |
21 "time" | 20 "time" |
22 | 21 |
23 "github.com/jackc/pgx/pgtype" | 22 "github.com/jackc/pgx/pgtype" |
24 | 23 |
25 "gemma.intevation.de/gemma/pkg/models" | 24 "gemma.intevation.de/gemma/pkg/models" |
64 | 63 |
65 // CleanUp does nothing as there is nothing to cleanup with gauges. | 64 // CleanUp does nothing as there is nothing to cleanup with gauges. |
66 func (*WaterwayGauge) CleanUp() error { return nil } | 65 func (*WaterwayGauge) CleanUp() error { return nil } |
67 | 66 |
68 const ( | 67 const ( |
69 hasGaugeSQL = ` | |
70 SELECT true | |
71 FROM waterway.gauges | |
72 WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)` | |
73 | |
74 deleteReferenceWaterLevelsSQL = ` | 68 deleteReferenceWaterLevelsSQL = ` |
75 DELETE FROM waterway.gauges_reference_water_levels | 69 DELETE FROM waterway.gauges_reference_water_levels |
76 WHERE gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)` | 70 WHERE gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) |
71 AND depth_reference <> ALL($6) | |
72 RETURNING depth_reference | |
73 ` | |
77 | 74 |
78 insertGaugeSQL = ` | 75 insertGaugeSQL = ` |
79 INSERT INTO waterway.gauges ( | 76 INSERT INTO waterway.gauges ( |
80 location, | 77 location, |
81 objname, | 78 objname, |
97 $12, | 94 $12, |
98 $13, | 95 $13, |
99 $14, | 96 $14, |
100 $15 | 97 $15 |
101 ) ON CONFLICT (location) DO UPDATE SET | 98 ) ON CONFLICT (location) DO UPDATE SET |
102 objname = $6, | 99 objname = EXCLUDED.objname, |
103 geom = ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography, | 100 geom = EXCLUDED.geom, |
104 applicability_from_km = $9, | 101 applicability_from_km = EXCLUDED.applicability_from_km, |
105 applicability_to_km = $10, | 102 applicability_to_km = EXCLUDED.applicability_to_km, |
106 validity = $11, | 103 validity = EXCLUDED.validity, |
107 zero_point = $12, | 104 zero_point = EXCLUDED.zero_point, |
108 geodref = $13, | 105 geodref = EXCLUDED.geodref, |
109 date_info = $14, | 106 date_info = EXCLUDED.date_info, |
110 source_organization = $15 | 107 source_organization = EXCLUDED.source_organization |
111 ` | 108 ` |
109 | |
112 isNtSDepthRefSQL = ` | 110 isNtSDepthRefSQL = ` |
113 SELECT EXISTS(SELECT 1 FROM depth_references WHERE depth_reference = $1)` | 111 SELECT EXISTS(SELECT 1 FROM depth_references WHERE depth_reference = $1)` |
114 | 112 |
115 insertReferenceWaterLevelsSQL = ` | 113 insertReferenceWaterLevelsSQL = ` |
116 INSERT INTO waterway.gauges_reference_water_levels ( | 114 INSERT INTO waterway.gauges_reference_water_levels ( |
119 value | 117 value |
120 ) VALUES ( | 118 ) VALUES ( |
121 ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), | 119 ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), |
122 $6, | 120 $6, |
123 $7 | 121 $7 |
124 ) | 122 ) ON CONFLICT (gauge_id, depth_reference) DO UPDATE SET |
123 value = EXCLUDED.value | |
125 ` | 124 ` |
126 ) | 125 ) |
127 | 126 |
128 func (wg *WaterwayGauge) Do( | 127 func (wg *WaterwayGauge) Do( |
129 ctx context.Context, | 128 ctx context.Context, |
132 feedback Feedback, | 131 feedback Feedback, |
133 ) (interface{}, error) { | 132 ) (interface{}, error) { |
134 | 133 |
135 start := time.Now() | 134 start := time.Now() |
136 | 135 |
137 tx, err := conn.BeginTx(ctx, nil) | |
138 if err != nil { | |
139 return nil, err | |
140 } | |
141 defer tx.Rollback() | |
142 | |
143 responseData, err := getRisData( | 136 responseData, err := getRisData( |
144 tx, | |
145 ctx, | 137 ctx, |
138 conn, | |
146 feedback, | 139 feedback, |
147 wg.Username, | 140 wg.Username, |
148 wg.Password, | 141 wg.Password, |
149 wg.URL, | 142 wg.URL, |
150 wg.Insecure, | 143 wg.Insecure, |
151 "wtwgag") | 144 "wtwgag") |
152 if err != nil { | 145 if err != nil { |
153 return nil, err | 146 return nil, err |
154 } | 147 } |
155 | 148 |
156 hasGaugeStmt, err := tx.PrepareContext(ctx, hasGaugeSQL) | |
157 if err != nil { | |
158 return nil, err | |
159 } | |
160 defer hasGaugeStmt.Close() | |
161 | |
162 var ignored int | 149 var ignored int |
163 | 150 |
164 type idxCode struct { | 151 type idxCode struct { |
165 jdx int | 152 jdx int |
166 idx int | 153 idx int |
167 code *models.Isrs | 154 code *models.Isrs |
168 } | 155 } |
169 | 156 |
170 var news, olds []idxCode | 157 var gauges []idxCode |
171 | 158 |
172 for j, data := range responseData { | 159 for j, data := range responseData { |
173 for i, dr := range data.RisdataReturn { | 160 for i, dr := range data.RisdataReturn { |
174 if dr.RisidxCode == nil { | 161 if dr.RisidxCode == nil { |
175 ignored++ | 162 ignored++ |
198 feedback.Warn("missing zeropoint: %s", code) | 185 feedback.Warn("missing zeropoint: %s", code) |
199 ignored++ | 186 ignored++ |
200 continue | 187 continue |
201 } | 188 } |
202 | 189 |
203 var dummy bool | 190 gauges = append(gauges, idxCode{jdx: j, idx: i, code: code}) |
204 err = hasGaugeStmt.QueryRowContext(ctx, | |
205 code.CountryCode, | |
206 code.LoCode, | |
207 code.FairwaySection, | |
208 code.Orc, | |
209 code.Hectometre, | |
210 ).Scan(&dummy) | |
211 switch { | |
212 case err == sql.ErrNoRows: | |
213 news = append(news, idxCode{jdx: j, idx: i, code: code}) | |
214 case err != nil: | |
215 return nil, err | |
216 case !dummy: | |
217 return nil, errors.New("Unexpected result") | |
218 default: | |
219 olds = append(olds, idxCode{jdx: j, idx: i, code: code}) | |
220 } | |
221 } | 191 } |
222 } | 192 } |
223 feedback.Info("ignored gauges: %d", ignored) | 193 feedback.Info("ignored gauges: %d", ignored) |
224 feedback.Info("new gauges: %d", len(news)) | 194 feedback.Info("insert/update gauges: %d", len(gauges)) |
225 feedback.Info("update gauges: %d", len(olds)) | 195 |
226 | 196 if len(gauges) == 0 { |
227 if len(news) == 0 && len(olds) == 0 { | |
228 return nil, UnchangedError("nothing to do") | 197 return nil, UnchangedError("nothing to do") |
229 } | 198 } |
230 | 199 |
231 // delete reference water leves of the old. | 200 // insert/update the gauges |
232 if len(olds) > 0 { | 201 var insertStmt, deleteReferenceWaterLevelsStmt, |
233 deleteReferenceWaterLevelsStmt, err := tx.PrepareContext( | 202 isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt |
234 ctx, deleteReferenceWaterLevelsSQL) | 203 for _, x := range []struct { |
235 if err != nil { | 204 sql string |
205 stmt **sql.Stmt | |
206 }{ | |
207 {insertGaugeSQL, &insertStmt}, | |
208 {deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt}, | |
209 {isNtSDepthRefSQL, &isNtSDepthRefStmt}, | |
210 {insertReferenceWaterLevelsSQL, &insertWaterLevelStmt}, | |
211 } { | |
212 var err error | |
213 if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil { | |
236 return nil, err | 214 return nil, err |
237 } | 215 } |
238 defer deleteReferenceWaterLevelsStmt.Close() | 216 defer (*x.stmt).Close() |
239 for i := range olds { | 217 } |
240 code := olds[i].code | 218 |
241 if _, err := deleteReferenceWaterLevelsStmt.ExecContext(ctx, | 219 for i := range gauges { |
242 code.CountryCode, | 220 ic := &gauges[i] |
243 code.LoCode, | |
244 code.FairwaySection, | |
245 code.Orc, | |
246 code.Hectometre, | |
247 ); err != nil { | |
248 return nil, err | |
249 } | |
250 } | |
251 // treat them as new | |
252 news = append(news, olds...) | |
253 } | |
254 | |
255 insertStmt, err := tx.PrepareContext(ctx, insertGaugeSQL) | |
256 if err != nil { | |
257 return nil, err | |
258 } | |
259 defer insertStmt.Close() | |
260 | |
261 insertWaterLevelStmt, err := tx.PrepareContext( | |
262 ctx, insertReferenceWaterLevelsSQL) | |
263 if err != nil { | |
264 return nil, err | |
265 } | |
266 defer insertWaterLevelStmt.Close() | |
267 | |
268 isNtSDepthRefStmt, err := tx.PrepareContext(ctx, isNtSDepthRefSQL) | |
269 if err != nil { | |
270 return nil, err | |
271 } | |
272 defer isNtSDepthRefStmt.Close() | |
273 | |
274 // insert/update the gauges | |
275 for i := range news { | |
276 ic := &news[i] | |
277 dr := responseData[ic.jdx].RisdataReturn[ic.idx] | 221 dr := responseData[ic.jdx].RisdataReturn[ic.idx] |
278 | 222 |
279 feedback.Info("insert/update %s", ic.code) | 223 feedback.Info("insert/update %s", ic.code) |
280 | 224 |
281 var from, to sql.NullInt64 | 225 var from, to sql.NullInt64 |
350 String: string(*dr.Source), | 294 String: string(*dr.Source), |
351 Valid: true, | 295 Valid: true, |
352 } | 296 } |
353 } | 297 } |
354 | 298 |
355 if _, err := insertStmt.ExecContext(ctx, | 299 tx, err := conn.BeginTx(ctx, nil) |
300 if err != nil { | |
301 return nil, err | |
302 } | |
303 defer tx.Rollback() | |
304 | |
305 if _, err := tx.StmtContext(ctx, insertStmt).ExecContext(ctx, | |
356 ic.code.CountryCode, | 306 ic.code.CountryCode, |
357 ic.code.LoCode, | 307 ic.code.LoCode, |
358 ic.code.FairwaySection, | 308 ic.code.FairwaySection, |
359 ic.code.Orc, | 309 ic.code.Orc, |
360 ic.code.Hectometre, | 310 ic.code.Hectometre, |
366 float64(*dr.Zeropoint), | 316 float64(*dr.Zeropoint), |
367 geodref, | 317 geodref, |
368 &dateInfo, | 318 &dateInfo, |
369 source, | 319 source, |
370 ); err != nil { | 320 ); err != nil { |
321 feedback.Warn(handleError(err).Error()) | |
322 tx.Rollback() | |
323 continue | |
324 } | |
325 | |
326 // Remove obsolete reference water levels | |
327 var currLevels pgtype.VarcharArray | |
328 currLevels.Set([]string{ | |
329 string(*dr.Reflevel1code), | |
330 string(*dr.Reflevel2code), | |
331 string(*dr.Reflevel3code), | |
332 }) | |
333 var delRef string | |
334 err = tx.StmtContext( | |
335 ctx, deleteReferenceWaterLevelsStmt).QueryRowContext(ctx, | |
336 ic.code.CountryCode, | |
337 ic.code.LoCode, | |
338 ic.code.FairwaySection, | |
339 ic.code.Orc, | |
340 ic.code.Hectometre, | |
341 &currLevels, | |
342 ).Scan(&delRef) | |
343 switch { | |
344 case err == sql.ErrNoRows: | |
345 // There was nothing to delete | |
346 case err != nil: | |
371 return nil, err | 347 return nil, err |
372 } | 348 default: |
373 | 349 feedback.Info("Removed reference water level %s from %s", |
350 delRef, ic.code) | |
351 } | |
352 | |
353 // Insert/update reference water levels | |
374 for _, wl := range []struct { | 354 for _, wl := range []struct { |
375 level **erdms.RisreflevelcodeType | 355 level **erdms.RisreflevelcodeType |
376 value **erdms.RisreflevelvalueType | 356 value **erdms.RisreflevelvalueType |
377 }{ | 357 }{ |
378 {&dr.Reflevel1code, &dr.Reflevel1value}, | 358 {&dr.Reflevel1code, &dr.Reflevel1value}, |
382 if *wl.level == nil || *wl.value == nil { | 362 if *wl.level == nil || *wl.value == nil { |
383 continue | 363 continue |
384 } | 364 } |
385 | 365 |
386 var isNtSDepthRef bool | 366 var isNtSDepthRef bool |
387 if err := isNtSDepthRefStmt.QueryRowContext( | 367 if err := tx.StmtContext(ctx, isNtSDepthRefStmt).QueryRowContext( |
388 ctx, | 368 ctx, |
389 string(**wl.level), | 369 string(**wl.level), |
390 ).Scan( | 370 ).Scan( |
391 &isNtSDepthRef, | 371 &isNtSDepthRef, |
392 ); err != nil { | 372 ); err != nil { |
393 return nil, err | 373 return nil, err |
394 } | 374 } |
395 if !isNtSDepthRef { | 375 if !isNtSDepthRef { |
396 feedback.Warn( | 376 feedback.Warn( |
397 "Reference level code '%s' is not in line with the NtS reference_code table", | 377 "Reference level code '%s' is not in line "+ |
378 "with the NtS reference_code table", | |
398 string(**wl.level)) | 379 string(**wl.level)) |
399 } | 380 } |
400 | 381 |
401 if _, err := insertWaterLevelStmt.ExecContext( | 382 if _, err := tx.StmtContext(ctx, insertWaterLevelStmt).ExecContext( |
402 ctx, | 383 ctx, |
403 ic.code.CountryCode, | 384 ic.code.CountryCode, |
404 ic.code.LoCode, | 385 ic.code.LoCode, |
405 ic.code.FairwaySection, | 386 ic.code.FairwaySection, |
406 ic.code.Orc, | 387 ic.code.Orc, |
407 ic.code.Hectometre, | 388 ic.code.Hectometre, |
408 string(**wl.level), | 389 string(**wl.level), |
409 int64(**wl.value), | 390 int64(**wl.value), |
410 ); err != nil { | 391 ); err != nil { |
411 return nil, err | 392 feedback.Warn(handleError(err).Error()) |
412 } | 393 tx.Rollback() |
413 } | 394 continue |
414 } | 395 } |
415 | 396 } |
416 if err = tx.Commit(); err == nil { | 397 |
417 feedback.Info("Refreshing gauges successfully took %s.", | 398 if err = tx.Commit(); err != nil { |
418 time.Since(start)) | 399 return nil, err |
419 } else { | 400 } |
420 feedback.Error("Refreshing gauges failed after %s.", | 401 } |
421 time.Since(start)) | 402 |
422 } | 403 feedback.Info("Refreshing gauges took %s.", |
404 time.Since(start)) | |
423 | 405 |
424 return nil, err | 406 return nil, err |
425 } | 407 } |