Mercurial > gemma
comparison pkg/imports/fa.go @ 4079:7fb75deff16b
Allow fairway availability data to be updated
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Thu, 25 Jul 2019 16:26:01 +0200 |
parents | db87f34805fb |
children | c9bef8c27685 |
comparison
equal
deleted
inserted
replaced
4075:cb74aa69954e | 4079:7fb75deff16b |
---|---|
35 // Insecure indicates if HTTPS traffic | 35 // Insecure indicates if HTTPS traffic |
36 // should validate certificates or not. | 36 // should validate certificates or not. |
37 Insecure bool `json:"insecure"` | 37 Insecure bool `json:"insecure"` |
38 } | 38 } |
39 | 39 |
40 type uniqueFairwayAvailability struct { | |
41 BottleneckId string | |
42 Surdat time.Time | |
43 } | |
44 | |
45 // FAJobKind is import queue type identifier. | 40 // FAJobKind is import queue type identifier. |
46 const FAJobKind JobKind = "fa" | 41 const FAJobKind JobKind = "fa" |
47 | 42 |
48 const ( | 43 const ( |
49 listBottlenecksSQL = ` | 44 listBottlenecksSQL = ` |
59 SELECT | 54 SELECT |
60 measure_date | 55 measure_date |
61 FROM waterway.effective_fairway_availability | 56 FROM waterway.effective_fairway_availability |
62 ORDER BY measure_date DESC LIMIT 1 | 57 ORDER BY measure_date DESC LIMIT 1 |
63 ` | 58 ` |
64 listFairwayAvailabilitySQL = ` | 59 |
65 SELECT | |
66 fa.id, | |
67 bn.bottleneck_id, | |
68 fa.surdat | |
69 FROM waterway.fairway_availability fa | |
70 JOIN waterway.bottlenecks bn ON bn.id = fa.bottleneck_id | |
71 ` | |
72 insertFASQL = ` | 60 insertFASQL = ` |
73 INSERT INTO waterway.fairway_availability ( | 61 INSERT INTO waterway.fairway_availability ( |
74 position_code, | 62 position_code, |
75 bottleneck_id, | 63 bottleneck_id, |
76 surdat, | 64 surdat, |
85 ORDER BY validity DESC FETCH FIRST ROW ONLY), | 73 ORDER BY validity DESC FETCH FIRST ROW ONLY), |
86 $3, | 74 $3, |
87 $4, | 75 $4, |
88 $5, | 76 $5, |
89 $6 | 77 $6 |
90 ) | 78 ) ON CONFLICT (bottleneck_id, surdat) DO UPDATE SET |
79 position_code = EXCLUDED.position_code, | |
80 critical = EXCLUDED.critical, | |
81 date_info = EXCLUDED.date_info, | |
82 source_organization = EXCLUDED.source_organization | |
91 RETURNING id` | 83 RETURNING id` |
92 | 84 |
93 insertBnPdfsSQL = ` | 85 insertBnPdfsSQL = ` |
94 INSERT INTO waterway.bottleneck_pdfs ( | 86 INSERT INTO waterway.bottleneck_pdfs ( |
95 fairway_availability_id, | 87 fairway_availability_id, |
102 $2, | 94 $2, |
103 $3, | 95 $3, |
104 $4, | 96 $4, |
105 $5 | 97 $5 |
106 ) ON CONFLICT ON CONSTRAINT bottleneck_pdfs_pkey DO NOTHING` | 98 ) ON CONFLICT ON CONSTRAINT bottleneck_pdfs_pkey DO NOTHING` |
99 | |
107 insertEFASQL = ` | 100 insertEFASQL = ` |
108 INSERT INTO waterway.effective_fairway_availability ( | 101 INSERT INTO waterway.effective_fairway_availability ( |
109 fairway_availability_id, | 102 fairway_availability_id, |
110 measure_date, | 103 measure_date, |
111 level_of_service, | 104 level_of_service, |
129 $7, | 122 $7, |
130 $8, | 123 $8, |
131 $9, | 124 $9, |
132 $10 | 125 $10 |
133 ) ON CONFLICT ON CONSTRAINT effective_fairway_availability_pkey DO NOTHING` | 126 ) ON CONFLICT ON CONSTRAINT effective_fairway_availability_pkey DO NOTHING` |
127 | |
134 insertFAVSQL = ` | 128 insertFAVSQL = ` |
135 INSERT INTO waterway.fa_reference_values ( | 129 INSERT INTO waterway.fa_reference_values ( |
136 fairway_availability_id, | 130 fairway_availability_id, |
137 level_of_service, | 131 level_of_service, |
138 fairway_depth, | 132 fairway_depth, |
213 } | 207 } |
214 | 208 |
215 return bns, nil | 209 return bns, nil |
216 } | 210 } |
217 | 211 |
218 func loadFairwayAvailabilities(ctx context.Context, tx *sql.Tx) (map[uniqueFairwayAvailability]int64, error) { | |
219 rows, err := tx.QueryContext(ctx, listFairwayAvailabilitySQL) | |
220 if err != nil { | |
221 return nil, err | |
222 } | |
223 defer rows.Close() | |
224 fairwayAvailabilities := map[uniqueFairwayAvailability]int64{} | |
225 for rows.Next() { | |
226 var id int64 | |
227 var bnId string | |
228 var sd time.Time | |
229 if err = rows.Scan( | |
230 &id, | |
231 &bnId, | |
232 &sd, | |
233 ); err != nil { | |
234 return nil, err | |
235 } | |
236 key := uniqueFairwayAvailability{ | |
237 BottleneckId: bnId, | |
238 Surdat: sd, | |
239 } | |
240 fairwayAvailabilities[key] = id | |
241 } | |
242 if err = rows.Err(); err != nil { | |
243 return nil, err | |
244 } | |
245 return fairwayAvailabilities, nil | |
246 } | |
247 | |
248 func latestDate(ctx context.Context, tx *sql.Tx) (pgtype.Timestamp, error) { | 212 func latestDate(ctx context.Context, tx *sql.Tx) (pgtype.Timestamp, error) { |
249 var date pgtype.Timestamp | 213 var date pgtype.Timestamp |
250 err := tx.QueryRowContext(ctx, latestMeasureDateSQL).Scan(&date) | 214 err := tx.QueryRowContext(ctx, latestMeasureDateSQL).Scan(&date) |
251 switch { | 215 switch { |
252 case err == sql.ErrNoRows: | 216 case err == sql.ErrNoRows: |
283 fas, err := fetch(ctx, tx, bns) | 247 fas, err := fetch(ctx, tx, bns) |
284 if err != nil { | 248 if err != nil { |
285 return nil, err | 249 return nil, err |
286 } | 250 } |
287 | 251 |
288 fairwayAvailabilities, err := loadFairwayAvailabilities(ctx, tx) | 252 faids, err := doForFAs(ctx, bns, fas, tx, feedback) |
289 if err != nil { | |
290 return nil, err | |
291 } | |
292 | |
293 faids, err := doForFAs(ctx, bns, fairwayAvailabilities, fas, tx, feedback) | |
294 if err != nil { | 253 if err != nil { |
295 return nil, fmt.Errorf("Error processing data: %v", err) | 254 return nil, fmt.Errorf("Error processing data: %v", err) |
296 } | 255 } |
297 if len(faids) == 0 { | 256 if len(faids) == 0 { |
298 feedback.Info("No new fairway availablity data found") | 257 feedback.Info("No new fairway availablity data found") |
318 } | 277 } |
319 | 278 |
320 func doForFAs( | 279 func doForFAs( |
321 ctx context.Context, | 280 ctx context.Context, |
322 bnIds bottlenecks, | 281 bnIds bottlenecks, |
323 fairwayAvailabilities map[uniqueFairwayAvailability]int64, | |
324 fas []*ifaf.FairwayAvailability, | 282 fas []*ifaf.FairwayAvailability, |
325 tx *sql.Tx, | 283 tx *sql.Tx, |
326 feedback Feedback, | 284 feedback Feedback, |
327 ) ([]string, error) { | 285 ) ([]string, error) { |
328 | 286 |
353 for _, faRes := range fas { | 311 for _, faRes := range fas { |
354 if !bnIds.contains(faRes.Bottleneck_id) { | 312 if !bnIds.contains(faRes.Bottleneck_id) { |
355 feedback.Warn("Bottleneck %s not found in database.", faRes.Bottleneck_id) | 313 feedback.Warn("Bottleneck %s not found in database.", faRes.Bottleneck_id) |
356 continue | 314 continue |
357 } | 315 } |
358 uniqueFa := uniqueFairwayAvailability{ | 316 err = insertFAStmt.QueryRowContext( |
359 BottleneckId: faRes.Bottleneck_id, | 317 ctx, |
360 Surdat: faRes.SURDAT, | 318 faRes.POSITION, |
361 } | 319 faRes.Bottleneck_id, |
362 var found bool | 320 faRes.SURDAT, |
363 if faID, found = fairwayAvailabilities[uniqueFa]; !found { | 321 faRes.Critical, |
364 err = insertFAStmt.QueryRowContext( | 322 faRes.Date_Info, |
365 ctx, | 323 faRes.Source, |
366 faRes.POSITION, | 324 ).Scan(&faID) |
367 faRes.Bottleneck_id, | 325 if err != nil { |
368 faRes.SURDAT, | 326 return nil, err |
369 faRes.Critical, | |
370 faRes.Date_Info, | |
371 faRes.Source, | |
372 ).Scan(&faID) | |
373 if err != nil { | |
374 return nil, err | |
375 } | |
376 fairwayAvailabilities[uniqueFa] = faID | |
377 } | 327 } |
378 feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id) | 328 feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id) |
379 faIDs = append(faIDs, faRes.Bottleneck_id) | 329 faIDs = append(faIDs, faRes.Bottleneck_id) |
380 if faRes.Bottleneck_PDFs != nil { | 330 if faRes.Bottleneck_PDFs != nil { |
381 bnPdfCount := 0 | 331 bnPdfCount := 0 |