comparison pkg/imports/fa.go @ 1755:d3fe20a13339

Import: Filter fairway availabilities by period and resolve insert confilcts.
author Raimund Renkert <raimund.renkert@intevation.de>
date Thu, 10 Jan 2019 16:43:42 +0100
parents 807569b08513
children 295c82c5bc3e
comparison
equal deleted inserted replaced
1754:807569b08513 1755:d3fe20a13339
45 bottleneck_id, 45 bottleneck_id,
46 responsible_country 46 responsible_country
47 FROM waterway.bottlenecks 47 FROM waterway.bottlenecks
48 WHERE responsible_country = users.current_user_country() 48 WHERE responsible_country = users.current_user_country()
49 AND staging_done = true 49 AND staging_done = true
50 `
51 latestMeasureDateSQL = `
52 SELECT
53 measure_date
54 FROM waterway.effective_fairway_availability
55 ORDER BY measure_date DESC LIMIT 1
56 `
57 listFairwayAvailabilitySQL = `
58 SELECT
59 fa.id,
60 bn.bottleneck_id,
61 fa.surdat
62 FROM waterway.fairway_availability fa
63 JOIN waterway.bottlenecks bn ON bn.id = fa.bottleneck_id
50 ` 64 `
51 insertFASQL = ` 65 insertFASQL = `
52 INSERT INTO waterway.fairway_availability ( 66 INSERT INTO waterway.fairway_availability (
53 position_code, 67 position_code,
54 bottleneck_id, 68 bottleneck_id,
77 $1, 91 $1,
78 $2, 92 $2,
79 $3, 93 $3,
80 $4, 94 $4,
81 $5 95 $5
82 )` 96 ) ON CONFLICT ON CONSTRAINT bottleneck_pdfs_pkey DO NOTHING`
83 insertEFASQL = ` 97 insertEFASQL = `
84 INSERT INTO waterway.effective_fairway_availability ( 98 INSERT INTO waterway.effective_fairway_availability (
85 fairway_availability_id, 99 fairway_availability_id,
86 measure_date, 100 measure_date,
87 level_of_service, 101 level_of_service,
104 $6, 118 $6,
105 $7, 119 $7,
106 $8, 120 $8,
107 $9, 121 $9,
108 $10 122 $10
109 )` 123 ) ON CONFLICT ON CONSTRAINT effective_fairway_availability_pkey DO NOTHING`
110 insertFAVSQL = ` 124 insertFAVSQL = `
111 INSERT INTO waterway.fa_reference_values ( 125 INSERT INTO waterway.fa_reference_values (
112 fairway_availability_id, 126 fairway_availability_id,
113 level_of_service, 127 level_of_service,
114 fairway_depth, 128 fairway_depth,
123 WHERE name = $2), 137 WHERE name = $2),
124 $3, 138 $3,
125 $4, 139 $4,
126 $5, 140 $5,
127 ST_MakePoint($6, $7)::geography 141 ST_MakePoint($6, $7)::geography
128 )` 142 )ON CONFLICT ON CONSTRAINT fa_reference_values_pkey DO NOTHING`
129 ) 143 )
130 144
131 type faJobCreator struct{} 145 type faJobCreator struct{}
132 146
133 func init() { 147 func init() {
195 ); err != nil { 209 ); err != nil {
196 return nil, err 210 return nil, err
197 } 211 }
198 bottlenecks = append(bottlenecks, bn) 212 bottlenecks = append(bottlenecks, bn)
199 } 213 }
200
201 if err = rows.Err(); err != nil { 214 if err = rows.Err(); err != nil {
202 return nil, err 215 return nil, err
203 } 216 }
204 217
205 faids, err := fa.doForFAs(ctx, bottlenecks, conn, feedback) 218 var faRows *sql.Rows
219 faRows, err = conn.QueryContext(ctx, listFairwayAvailabilitySQL)
220 if err != nil {
221 return nil, err
222 }
223 fairwayAvailabilities := map[models.UniqueFairwayAvailability]int64{}
224 for faRows.Next() {
225 var id int64
226 var bnId string
227 var sd time.Time
228 if err = faRows.Scan(
229 &id,
230 &bnId,
231 &sd,
232 ); err != nil {
233 return nil, err
234 }
235 key := models.UniqueFairwayAvailability{
236 BottleneckId: bnId,
237 Surdat: sd,
238 }
239 fairwayAvailabilities[key] = id
240 }
241 if err = faRows.Err(); err != nil {
242 return nil, err
243 }
244
245 latestMeasureDateRow := conn.QueryRowContext(ctx, latestMeasureDateSQL)
246 var latestDate pgtype.Timestamp
247 err = latestMeasureDateRow.Scan(&latestDate)
248 switch {
249 case err == sql.ErrNoRows:
250 latestDate = pgtype.Timestamp{
251 // Fill Database with data of the last 5 days. Change this to a more useful value.
252 Time: time.Now().AddDate(0, 0, -5),
253 }
254 case err != nil:
255 return nil, err
256 }
257
258 faids, err := fa.doForFAs(ctx, bottlenecks, fairwayAvailabilities, latestDate, conn, feedback)
206 if err != nil { 259 if err != nil {
207 feedback.Error("Error processing data: %s", err) 260 feedback.Error("Error processing data: %s", err)
208 } 261 }
209 if len(faids) == 0 { 262 if len(faids) == 0 {
210 feedback.Info("No new fairway availablity data found") 263 feedback.Info("No new fairway availablity data found")
211 return nil, nil 264 return nil, nil
212 } 265 }
213 feedback.Info("Processed %d of %d bottlenecks", len(faids), len(bottlenecks)) 266 feedback.Info("Processed %d fairway availabilities", len(faids))
214 // TODO: needs to be filled more useful. 267 // TODO: needs to be filled more useful.
215 summary := struct { 268 summary := struct {
216 FairwayAvailabilities []string `json:"fairwayAvailabilities"` 269 FairwayAvailabilities []string `json:"fairwayAvailabilities"`
217 }{ 270 }{
218 FairwayAvailabilities: faids, 271 FairwayAvailabilities: faids,
221 } 274 }
222 275
223 func (fa *FairwayAvailability) doForFAs( 276 func (fa *FairwayAvailability) doForFAs(
224 ctx context.Context, 277 ctx context.Context,
225 bottlenecks []models.Bottleneck, 278 bottlenecks []models.Bottleneck,
279 fairwayAvailabilities map[models.UniqueFairwayAvailability]int64,
280 latestDate pgtype.Timestamp,
226 conn *sql.Conn, 281 conn *sql.Conn,
227 feedback Feedback, 282 feedback Feedback,
228 ) ([]string, error) { 283 ) ([]string, error) {
229 start := time.Now() 284 start := time.Now()
230 285
232 287
233 var bnIds []string 288 var bnIds []string
234 for _, bn := range bottlenecks { 289 for _, bn := range bottlenecks {
235 bnIds = append(bnIds, bn.ID) 290 bnIds = append(bnIds, bn.ID)
236 } 291 }
292 var period ifaf.RequestedPeriod
293 period.Date_start = latestDate.Time
294 period.Date_end = time.Now()
237 295
238 ids := ifaf.ArrayOfString{ 296 ids := ifaf.ArrayOfString{
239 String: bnIds, 297 String: bnIds,
240 } 298 }
241 299
242 // TODO: Filter by period. Period should start after latest measurement date.
243 req := &ifaf.Get_bottleneck_fa{ 300 req := &ifaf.Get_bottleneck_fa{
244 Bottleneck_id: &ids, 301 Bottleneck_id: &ids,
302 Period: &period,
245 } 303 }
246 resp, err := client.Get_bottleneck_fa(req) 304 resp, err := client.Get_bottleneck_fa(req)
247 if err != nil { 305 if err != nil {
248 feedback.Error("%v", err) 306 feedback.Error("%v", err)
249 return nil, err 307 return nil, err
285 343
286 var faIDs []string 344 var faIDs []string
287 var faID int64 345 var faID int64
288 feedback.Info("Found %d fairway availabilities", len(result.FairwayAvailability)) 346 feedback.Info("Found %d fairway availabilities", len(result.FairwayAvailability))
289 for _, faRes := range result.FairwayAvailability { 347 for _, faRes := range result.FairwayAvailability {
290 // TODO: high frequent requests lead to "duplicate key value violates unique constraint "fairway_availability_bottleneck_id_surdat_key" 348 uniqueFa := models.UniqueFairwayAvailability{
291 // in the database. This has to be resolved. 349 BottleneckId: faRes.Bottleneck_id,
292 // All data subsets can also ocure as duplicates! 350 Surdat: faRes.SURDAT,
293 err = insertFAStmt.QueryRowContext( 351 }
294 ctx, 352 if _, ok := fairwayAvailabilities[uniqueFa]; !ok {
295 faRes.POSITION, 353 err = insertFAStmt.QueryRowContext(
296 faRes.Bottleneck_id,
297 faRes.SURDAT,
298 faRes.Critical,
299 faRes.Date_Info,
300 faRes.Source,
301 ).Scan(&faID)
302 if err != nil {
303 return nil, err
304 }
305 feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id)
306 faIDs = append(faIDs, faRes.Bottleneck_id)
307 for _, bnPdfs := range faRes.Bottleneck_PDFs.PdfInfo {
308 _, err = insertBnPdfsStmt.ExecContext(
309 ctx, 354 ctx,
310 faID, 355 faRes.POSITION,
311 bnPdfs.ProfilePdfFilename, 356 faRes.Bottleneck_id,
312 bnPdfs.ProfilePdfURL, 357 faRes.SURDAT,
313 bnPdfs.PDF_Generation_Date, 358 faRes.Critical,
314 bnPdfs.Source, 359 faRes.Date_Info,
315 ) 360 faRes.Source,
361 ).Scan(&faID)
316 if err != nil { 362 if err != nil {
317 return nil, err 363 return nil, err
318 } 364 }
319 feedback.Info("Add %d Pdfs", len(faRes.Bottleneck_PDFs.PdfInfo)) 365 fairwayAvailabilities[uniqueFa] = faID
320 } 366 } else {
321 for _, efa := range faRes.Effective_fairway_availability.EffectiveFairwayAvailability { 367 faID, _ = fairwayAvailabilities[uniqueFa]
322 los := efa.Level_of_Service 368 }
323 fgt := efa.Forecast_generation_time 369 feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id)
324 if efa.Forecast_generation_time.Status == pgtype.Undefined { 370 faIDs = append(faIDs, faRes.Bottleneck_id)
325 fgt = pgtype.Timestamp{ 371 if faRes.Bottleneck_PDFs != nil {
326 Status: pgtype.Null, 372 bnPdfCount := 0
373 for _, bnPdfs := range faRes.Bottleneck_PDFs.PdfInfo {
374 res, err := insertBnPdfsStmt.ExecContext(
375 ctx,
376 faID,
377 bnPdfs.ProfilePdfFilename,
378 bnPdfs.ProfilePdfURL,
379 bnPdfs.PDF_Generation_Date,
380 bnPdfs.Source,
381 )
382 if err != nil {
383 return nil, err
384 }
385 affected, err := res.RowsAffected()
386 if err == nil {
387 bnPdfCount += int(affected)
388 } else {
389 bnPdfCount++
327 } 390 }
328 } 391 }
329 _, err = insertEFAStmt.ExecContext( 392 feedback.Info("Add %d Pdfs", bnPdfCount)
330 ctx, 393 }
331 faID, 394 if faRes.Effective_fairway_availability != nil {
332 efa.Measure_date, 395 efaCount := 0
333 string(*los), 396 for _, efa := range faRes.Effective_fairway_availability.EffectiveFairwayAvailability {
334 efa.Available_depth_value, 397 los := efa.Level_of_Service
335 efa.Available_width_value, 398 fgt := efa.Forecast_generation_time
336 efa.Water_level_value, 399 if efa.Forecast_generation_time.Status == pgtype.Undefined {
337 efa.Measure_type, 400 fgt = pgtype.Timestamp{
338 efa.Source, 401 Status: pgtype.Null,
339 fgt, 402 }
340 efa.Value_lifetime, 403 }
341 ) 404 res, err := insertEFAStmt.ExecContext(
342 if err != nil { 405 ctx,
343 return nil, err 406 faID,
407 efa.Measure_date,
408 string(*los),
409 efa.Available_depth_value,
410 efa.Available_width_value,
411 efa.Water_level_value,
412 efa.Measure_type,
413 efa.Source,
414 fgt.Get(),
415 efa.Value_lifetime,
416 )
417 if err != nil {
418 return nil, err
419 }
420 affected, err := res.RowsAffected()
421 if err == nil {
422 efaCount += int(affected)
423 } else {
424 efaCount++
425 }
344 } 426 }
345 feedback.Info("Add %d Effective Fairway Availability", len( 427 feedback.Info("Add %d Effective Fairway Availability", efaCount)
346 faRes.Effective_fairway_availability.EffectiveFairwayAvailability)) 428 }
347 } 429
348 for _, fav := range faRes.Reference_values.ReferenceValue { 430 if faRes.Reference_values != nil {
349 _, err = insertFAVStmt.ExecContext( 431 rvCount := 0
350 ctx, 432 for _, fav := range faRes.Reference_values.ReferenceValue {
351 faID, 433 res, err := insertFAVStmt.ExecContext(
352 fav.Level_of_Service, 434 ctx,
353 fav.Fairway_depth, 435 faID,
354 fav.Fairway_width, 436 fav.Level_of_Service,
355 fav.Fairway_radius, 437 fav.Fairway_depth,
356 fav.Shallowest_spot_Lat, 438 fav.Fairway_width,
357 fav.Shallowest_spot_Lon, 439 fav.Fairway_radius,
358 ) 440 fav.Shallowest_spot_Lat,
359 if err != nil { 441 fav.Shallowest_spot_Lon,
360 return nil, err 442 )
443 if err != nil {
444 return nil, err
445 }
446 affected, err := res.RowsAffected()
447 if err == nil {
448 rvCount += int(affected)
449 } else {
450 rvCount++
451 }
361 } 452 }
362 feedback.Info("Add %d Reference Values", 453 feedback.Info("Add %d Reference Values", rvCount)
363 len(faRes.Reference_values.ReferenceValue))
364 } 454 }
365 } 455 }
366 feedback.Info("Storing fairway availabilities took %s", time.Since(start)) 456 feedback.Info("Storing fairway availabilities took %s", time.Since(start))
367 if err = tx.Commit(); err == nil { 457 if err = tx.Commit(); err == nil {
368 feedback.Info("Import of fairway availabilities was successful") 458 feedback.Info("Import of fairway availabilities was successful")