Mercurial > gemma
comparison pkg/imports/fa.go @ 1755:d3fe20a13339
Import: Filter fairway availabilities by period and resolve insert confilcts.
author | Raimund Renkert <raimund.renkert@intevation.de> |
---|---|
date | Thu, 10 Jan 2019 16:43:42 +0100 |
parents | 807569b08513 |
children | 295c82c5bc3e |
comparison
equal
deleted
inserted
replaced
1754:807569b08513 | 1755:d3fe20a13339 |
---|---|
45 bottleneck_id, | 45 bottleneck_id, |
46 responsible_country | 46 responsible_country |
47 FROM waterway.bottlenecks | 47 FROM waterway.bottlenecks |
48 WHERE responsible_country = users.current_user_country() | 48 WHERE responsible_country = users.current_user_country() |
49 AND staging_done = true | 49 AND staging_done = true |
50 ` | |
51 latestMeasureDateSQL = ` | |
52 SELECT | |
53 measure_date | |
54 FROM waterway.effective_fairway_availability | |
55 ORDER BY measure_date DESC LIMIT 1 | |
56 ` | |
57 listFairwayAvailabilitySQL = ` | |
58 SELECT | |
59 fa.id, | |
60 bn.bottleneck_id, | |
61 fa.surdat | |
62 FROM waterway.fairway_availability fa | |
63 JOIN waterway.bottlenecks bn ON bn.id = fa.bottleneck_id | |
50 ` | 64 ` |
51 insertFASQL = ` | 65 insertFASQL = ` |
52 INSERT INTO waterway.fairway_availability ( | 66 INSERT INTO waterway.fairway_availability ( |
53 position_code, | 67 position_code, |
54 bottleneck_id, | 68 bottleneck_id, |
77 $1, | 91 $1, |
78 $2, | 92 $2, |
79 $3, | 93 $3, |
80 $4, | 94 $4, |
81 $5 | 95 $5 |
82 )` | 96 ) ON CONFLICT ON CONSTRAINT bottleneck_pdfs_pkey DO NOTHING` |
83 insertEFASQL = ` | 97 insertEFASQL = ` |
84 INSERT INTO waterway.effective_fairway_availability ( | 98 INSERT INTO waterway.effective_fairway_availability ( |
85 fairway_availability_id, | 99 fairway_availability_id, |
86 measure_date, | 100 measure_date, |
87 level_of_service, | 101 level_of_service, |
104 $6, | 118 $6, |
105 $7, | 119 $7, |
106 $8, | 120 $8, |
107 $9, | 121 $9, |
108 $10 | 122 $10 |
109 )` | 123 ) ON CONFLICT ON CONSTRAINT effective_fairway_availability_pkey DO NOTHING` |
110 insertFAVSQL = ` | 124 insertFAVSQL = ` |
111 INSERT INTO waterway.fa_reference_values ( | 125 INSERT INTO waterway.fa_reference_values ( |
112 fairway_availability_id, | 126 fairway_availability_id, |
113 level_of_service, | 127 level_of_service, |
114 fairway_depth, | 128 fairway_depth, |
123 WHERE name = $2), | 137 WHERE name = $2), |
124 $3, | 138 $3, |
125 $4, | 139 $4, |
126 $5, | 140 $5, |
127 ST_MakePoint($6, $7)::geography | 141 ST_MakePoint($6, $7)::geography |
128 )` | 142 )ON CONFLICT ON CONSTRAINT fa_reference_values_pkey DO NOTHING` |
129 ) | 143 ) |
130 | 144 |
131 type faJobCreator struct{} | 145 type faJobCreator struct{} |
132 | 146 |
133 func init() { | 147 func init() { |
195 ); err != nil { | 209 ); err != nil { |
196 return nil, err | 210 return nil, err |
197 } | 211 } |
198 bottlenecks = append(bottlenecks, bn) | 212 bottlenecks = append(bottlenecks, bn) |
199 } | 213 } |
200 | |
201 if err = rows.Err(); err != nil { | 214 if err = rows.Err(); err != nil { |
202 return nil, err | 215 return nil, err |
203 } | 216 } |
204 | 217 |
205 faids, err := fa.doForFAs(ctx, bottlenecks, conn, feedback) | 218 var faRows *sql.Rows |
219 faRows, err = conn.QueryContext(ctx, listFairwayAvailabilitySQL) | |
220 if err != nil { | |
221 return nil, err | |
222 } | |
223 fairwayAvailabilities := map[models.UniqueFairwayAvailability]int64{} | |
224 for faRows.Next() { | |
225 var id int64 | |
226 var bnId string | |
227 var sd time.Time | |
228 if err = faRows.Scan( | |
229 &id, | |
230 &bnId, | |
231 &sd, | |
232 ); err != nil { | |
233 return nil, err | |
234 } | |
235 key := models.UniqueFairwayAvailability{ | |
236 BottleneckId: bnId, | |
237 Surdat: sd, | |
238 } | |
239 fairwayAvailabilities[key] = id | |
240 } | |
241 if err = faRows.Err(); err != nil { | |
242 return nil, err | |
243 } | |
244 | |
245 latestMeasureDateRow := conn.QueryRowContext(ctx, latestMeasureDateSQL) | |
246 var latestDate pgtype.Timestamp | |
247 err = latestMeasureDateRow.Scan(&latestDate) | |
248 switch { | |
249 case err == sql.ErrNoRows: | |
250 latestDate = pgtype.Timestamp{ | |
251 // Fill Database with data of the last 5 days. Change this to a more useful value. | |
252 Time: time.Now().AddDate(0, 0, -5), | |
253 } | |
254 case err != nil: | |
255 return nil, err | |
256 } | |
257 | |
258 faids, err := fa.doForFAs(ctx, bottlenecks, fairwayAvailabilities, latestDate, conn, feedback) | |
206 if err != nil { | 259 if err != nil { |
207 feedback.Error("Error processing data: %s", err) | 260 feedback.Error("Error processing data: %s", err) |
208 } | 261 } |
209 if len(faids) == 0 { | 262 if len(faids) == 0 { |
210 feedback.Info("No new fairway availablity data found") | 263 feedback.Info("No new fairway availablity data found") |
211 return nil, nil | 264 return nil, nil |
212 } | 265 } |
213 feedback.Info("Processed %d of %d bottlenecks", len(faids), len(bottlenecks)) | 266 feedback.Info("Processed %d fairway availabilities", len(faids)) |
214 // TODO: needs to be filled more useful. | 267 // TODO: needs to be filled more useful. |
215 summary := struct { | 268 summary := struct { |
216 FairwayAvailabilities []string `json:"fairwayAvailabilities"` | 269 FairwayAvailabilities []string `json:"fairwayAvailabilities"` |
217 }{ | 270 }{ |
218 FairwayAvailabilities: faids, | 271 FairwayAvailabilities: faids, |
221 } | 274 } |
222 | 275 |
223 func (fa *FairwayAvailability) doForFAs( | 276 func (fa *FairwayAvailability) doForFAs( |
224 ctx context.Context, | 277 ctx context.Context, |
225 bottlenecks []models.Bottleneck, | 278 bottlenecks []models.Bottleneck, |
279 fairwayAvailabilities map[models.UniqueFairwayAvailability]int64, | |
280 latestDate pgtype.Timestamp, | |
226 conn *sql.Conn, | 281 conn *sql.Conn, |
227 feedback Feedback, | 282 feedback Feedback, |
228 ) ([]string, error) { | 283 ) ([]string, error) { |
229 start := time.Now() | 284 start := time.Now() |
230 | 285 |
232 | 287 |
233 var bnIds []string | 288 var bnIds []string |
234 for _, bn := range bottlenecks { | 289 for _, bn := range bottlenecks { |
235 bnIds = append(bnIds, bn.ID) | 290 bnIds = append(bnIds, bn.ID) |
236 } | 291 } |
292 var period ifaf.RequestedPeriod | |
293 period.Date_start = latestDate.Time | |
294 period.Date_end = time.Now() | |
237 | 295 |
238 ids := ifaf.ArrayOfString{ | 296 ids := ifaf.ArrayOfString{ |
239 String: bnIds, | 297 String: bnIds, |
240 } | 298 } |
241 | 299 |
242 // TODO: Filter by period. Period should start after latest measurement date. | |
243 req := &ifaf.Get_bottleneck_fa{ | 300 req := &ifaf.Get_bottleneck_fa{ |
244 Bottleneck_id: &ids, | 301 Bottleneck_id: &ids, |
302 Period: &period, | |
245 } | 303 } |
246 resp, err := client.Get_bottleneck_fa(req) | 304 resp, err := client.Get_bottleneck_fa(req) |
247 if err != nil { | 305 if err != nil { |
248 feedback.Error("%v", err) | 306 feedback.Error("%v", err) |
249 return nil, err | 307 return nil, err |
285 | 343 |
286 var faIDs []string | 344 var faIDs []string |
287 var faID int64 | 345 var faID int64 |
288 feedback.Info("Found %d fairway availabilities", len(result.FairwayAvailability)) | 346 feedback.Info("Found %d fairway availabilities", len(result.FairwayAvailability)) |
289 for _, faRes := range result.FairwayAvailability { | 347 for _, faRes := range result.FairwayAvailability { |
290 // TODO: high frequent requests lead to "duplicate key value violates unique constraint "fairway_availability_bottleneck_id_surdat_key" | 348 uniqueFa := models.UniqueFairwayAvailability{ |
291 // in the database. This has to be resolved. | 349 BottleneckId: faRes.Bottleneck_id, |
292 // All data subsets can also ocure as duplicates! | 350 Surdat: faRes.SURDAT, |
293 err = insertFAStmt.QueryRowContext( | 351 } |
294 ctx, | 352 if _, ok := fairwayAvailabilities[uniqueFa]; !ok { |
295 faRes.POSITION, | 353 err = insertFAStmt.QueryRowContext( |
296 faRes.Bottleneck_id, | |
297 faRes.SURDAT, | |
298 faRes.Critical, | |
299 faRes.Date_Info, | |
300 faRes.Source, | |
301 ).Scan(&faID) | |
302 if err != nil { | |
303 return nil, err | |
304 } | |
305 feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id) | |
306 faIDs = append(faIDs, faRes.Bottleneck_id) | |
307 for _, bnPdfs := range faRes.Bottleneck_PDFs.PdfInfo { | |
308 _, err = insertBnPdfsStmt.ExecContext( | |
309 ctx, | 354 ctx, |
310 faID, | 355 faRes.POSITION, |
311 bnPdfs.ProfilePdfFilename, | 356 faRes.Bottleneck_id, |
312 bnPdfs.ProfilePdfURL, | 357 faRes.SURDAT, |
313 bnPdfs.PDF_Generation_Date, | 358 faRes.Critical, |
314 bnPdfs.Source, | 359 faRes.Date_Info, |
315 ) | 360 faRes.Source, |
361 ).Scan(&faID) | |
316 if err != nil { | 362 if err != nil { |
317 return nil, err | 363 return nil, err |
318 } | 364 } |
319 feedback.Info("Add %d Pdfs", len(faRes.Bottleneck_PDFs.PdfInfo)) | 365 fairwayAvailabilities[uniqueFa] = faID |
320 } | 366 } else { |
321 for _, efa := range faRes.Effective_fairway_availability.EffectiveFairwayAvailability { | 367 faID, _ = fairwayAvailabilities[uniqueFa] |
322 los := efa.Level_of_Service | 368 } |
323 fgt := efa.Forecast_generation_time | 369 feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id) |
324 if efa.Forecast_generation_time.Status == pgtype.Undefined { | 370 faIDs = append(faIDs, faRes.Bottleneck_id) |
325 fgt = pgtype.Timestamp{ | 371 if faRes.Bottleneck_PDFs != nil { |
326 Status: pgtype.Null, | 372 bnPdfCount := 0 |
373 for _, bnPdfs := range faRes.Bottleneck_PDFs.PdfInfo { | |
374 res, err := insertBnPdfsStmt.ExecContext( | |
375 ctx, | |
376 faID, | |
377 bnPdfs.ProfilePdfFilename, | |
378 bnPdfs.ProfilePdfURL, | |
379 bnPdfs.PDF_Generation_Date, | |
380 bnPdfs.Source, | |
381 ) | |
382 if err != nil { | |
383 return nil, err | |
384 } | |
385 affected, err := res.RowsAffected() | |
386 if err == nil { | |
387 bnPdfCount += int(affected) | |
388 } else { | |
389 bnPdfCount++ | |
327 } | 390 } |
328 } | 391 } |
329 _, err = insertEFAStmt.ExecContext( | 392 feedback.Info("Add %d Pdfs", bnPdfCount) |
330 ctx, | 393 } |
331 faID, | 394 if faRes.Effective_fairway_availability != nil { |
332 efa.Measure_date, | 395 efaCount := 0 |
333 string(*los), | 396 for _, efa := range faRes.Effective_fairway_availability.EffectiveFairwayAvailability { |
334 efa.Available_depth_value, | 397 los := efa.Level_of_Service |
335 efa.Available_width_value, | 398 fgt := efa.Forecast_generation_time |
336 efa.Water_level_value, | 399 if efa.Forecast_generation_time.Status == pgtype.Undefined { |
337 efa.Measure_type, | 400 fgt = pgtype.Timestamp{ |
338 efa.Source, | 401 Status: pgtype.Null, |
339 fgt, | 402 } |
340 efa.Value_lifetime, | 403 } |
341 ) | 404 res, err := insertEFAStmt.ExecContext( |
342 if err != nil { | 405 ctx, |
343 return nil, err | 406 faID, |
407 efa.Measure_date, | |
408 string(*los), | |
409 efa.Available_depth_value, | |
410 efa.Available_width_value, | |
411 efa.Water_level_value, | |
412 efa.Measure_type, | |
413 efa.Source, | |
414 fgt.Get(), | |
415 efa.Value_lifetime, | |
416 ) | |
417 if err != nil { | |
418 return nil, err | |
419 } | |
420 affected, err := res.RowsAffected() | |
421 if err == nil { | |
422 efaCount += int(affected) | |
423 } else { | |
424 efaCount++ | |
425 } | |
344 } | 426 } |
345 feedback.Info("Add %d Effective Fairway Availability", len( | 427 feedback.Info("Add %d Effective Fairway Availability", efaCount) |
346 faRes.Effective_fairway_availability.EffectiveFairwayAvailability)) | 428 } |
347 } | 429 |
348 for _, fav := range faRes.Reference_values.ReferenceValue { | 430 if faRes.Reference_values != nil { |
349 _, err = insertFAVStmt.ExecContext( | 431 rvCount := 0 |
350 ctx, | 432 for _, fav := range faRes.Reference_values.ReferenceValue { |
351 faID, | 433 res, err := insertFAVStmt.ExecContext( |
352 fav.Level_of_Service, | 434 ctx, |
353 fav.Fairway_depth, | 435 faID, |
354 fav.Fairway_width, | 436 fav.Level_of_Service, |
355 fav.Fairway_radius, | 437 fav.Fairway_depth, |
356 fav.Shallowest_spot_Lat, | 438 fav.Fairway_width, |
357 fav.Shallowest_spot_Lon, | 439 fav.Fairway_radius, |
358 ) | 440 fav.Shallowest_spot_Lat, |
359 if err != nil { | 441 fav.Shallowest_spot_Lon, |
360 return nil, err | 442 ) |
443 if err != nil { | |
444 return nil, err | |
445 } | |
446 affected, err := res.RowsAffected() | |
447 if err == nil { | |
448 rvCount += int(affected) | |
449 } else { | |
450 rvCount++ | |
451 } | |
361 } | 452 } |
362 feedback.Info("Add %d Reference Values", | 453 feedback.Info("Add %d Reference Values", rvCount) |
363 len(faRes.Reference_values.ReferenceValue)) | |
364 } | 454 } |
365 } | 455 } |
366 feedback.Info("Storing fairway availabilities took %s", time.Since(start)) | 456 feedback.Info("Storing fairway availabilities took %s", time.Since(start)) |
367 if err = tx.Commit(); err == nil { | 457 if err = tx.Commit(); err == nil { |
368 feedback.Info("Import of fairway availabilities was successful") | 458 feedback.Info("Import of fairway availabilities was successful") |