comparison pkg/imports/fa.go @ 4079:7fb75deff16b

Allow fairway availability data to be updated
author Tom Gottfried <tom@intevation.de>
date Thu, 25 Jul 2019 16:26:01 +0200
parents db87f34805fb
children c9bef8c27685
comparison
equal deleted inserted replaced
4075:cb74aa69954e 4079:7fb75deff16b
35 // Insecure indicates if HTTPS traffic 35 // Insecure indicates if HTTPS traffic
36 // should validate certificates or not. 36 // should validate certificates or not.
37 Insecure bool `json:"insecure"` 37 Insecure bool `json:"insecure"`
38 } 38 }
39 39
40 type uniqueFairwayAvailability struct {
41 BottleneckId string
42 Surdat time.Time
43 }
44
45 // FAJobKind is import queue type identifier. 40 // FAJobKind is import queue type identifier.
46 const FAJobKind JobKind = "fa" 41 const FAJobKind JobKind = "fa"
47 42
48 const ( 43 const (
49 listBottlenecksSQL = ` 44 listBottlenecksSQL = `
59 SELECT 54 SELECT
60 measure_date 55 measure_date
61 FROM waterway.effective_fairway_availability 56 FROM waterway.effective_fairway_availability
62 ORDER BY measure_date DESC LIMIT 1 57 ORDER BY measure_date DESC LIMIT 1
63 ` 58 `
64 listFairwayAvailabilitySQL = ` 59
65 SELECT
66 fa.id,
67 bn.bottleneck_id,
68 fa.surdat
69 FROM waterway.fairway_availability fa
70 JOIN waterway.bottlenecks bn ON bn.id = fa.bottleneck_id
71 `
72 insertFASQL = ` 60 insertFASQL = `
73 INSERT INTO waterway.fairway_availability ( 61 INSERT INTO waterway.fairway_availability (
74 position_code, 62 position_code,
75 bottleneck_id, 63 bottleneck_id,
76 surdat, 64 surdat,
85 ORDER BY validity DESC FETCH FIRST ROW ONLY), 73 ORDER BY validity DESC FETCH FIRST ROW ONLY),
86 $3, 74 $3,
87 $4, 75 $4,
88 $5, 76 $5,
89 $6 77 $6
90 ) 78 ) ON CONFLICT (bottleneck_id, surdat) DO UPDATE SET
79 position_code = EXCLUDED.position_code,
80 critical = EXCLUDED.critical,
81 date_info = EXCLUDED.date_info,
82 source_organization = EXCLUDED.source_organization
91 RETURNING id` 83 RETURNING id`
92 84
93 insertBnPdfsSQL = ` 85 insertBnPdfsSQL = `
94 INSERT INTO waterway.bottleneck_pdfs ( 86 INSERT INTO waterway.bottleneck_pdfs (
95 fairway_availability_id, 87 fairway_availability_id,
102 $2, 94 $2,
103 $3, 95 $3,
104 $4, 96 $4,
105 $5 97 $5
106 ) ON CONFLICT ON CONSTRAINT bottleneck_pdfs_pkey DO NOTHING` 98 ) ON CONFLICT ON CONSTRAINT bottleneck_pdfs_pkey DO NOTHING`
99
107 insertEFASQL = ` 100 insertEFASQL = `
108 INSERT INTO waterway.effective_fairway_availability ( 101 INSERT INTO waterway.effective_fairway_availability (
109 fairway_availability_id, 102 fairway_availability_id,
110 measure_date, 103 measure_date,
111 level_of_service, 104 level_of_service,
129 $7, 122 $7,
130 $8, 123 $8,
131 $9, 124 $9,
132 $10 125 $10
133 ) ON CONFLICT ON CONSTRAINT effective_fairway_availability_pkey DO NOTHING` 126 ) ON CONFLICT ON CONSTRAINT effective_fairway_availability_pkey DO NOTHING`
127
134 insertFAVSQL = ` 128 insertFAVSQL = `
135 INSERT INTO waterway.fa_reference_values ( 129 INSERT INTO waterway.fa_reference_values (
136 fairway_availability_id, 130 fairway_availability_id,
137 level_of_service, 131 level_of_service,
138 fairway_depth, 132 fairway_depth,
213 } 207 }
214 208
215 return bns, nil 209 return bns, nil
216 } 210 }
217 211
218 func loadFairwayAvailabilities(ctx context.Context, tx *sql.Tx) (map[uniqueFairwayAvailability]int64, error) {
219 rows, err := tx.QueryContext(ctx, listFairwayAvailabilitySQL)
220 if err != nil {
221 return nil, err
222 }
223 defer rows.Close()
224 fairwayAvailabilities := map[uniqueFairwayAvailability]int64{}
225 for rows.Next() {
226 var id int64
227 var bnId string
228 var sd time.Time
229 if err = rows.Scan(
230 &id,
231 &bnId,
232 &sd,
233 ); err != nil {
234 return nil, err
235 }
236 key := uniqueFairwayAvailability{
237 BottleneckId: bnId,
238 Surdat: sd,
239 }
240 fairwayAvailabilities[key] = id
241 }
242 if err = rows.Err(); err != nil {
243 return nil, err
244 }
245 return fairwayAvailabilities, nil
246 }
247
248 func latestDate(ctx context.Context, tx *sql.Tx) (pgtype.Timestamp, error) { 212 func latestDate(ctx context.Context, tx *sql.Tx) (pgtype.Timestamp, error) {
249 var date pgtype.Timestamp 213 var date pgtype.Timestamp
250 err := tx.QueryRowContext(ctx, latestMeasureDateSQL).Scan(&date) 214 err := tx.QueryRowContext(ctx, latestMeasureDateSQL).Scan(&date)
251 switch { 215 switch {
252 case err == sql.ErrNoRows: 216 case err == sql.ErrNoRows:
283 fas, err := fetch(ctx, tx, bns) 247 fas, err := fetch(ctx, tx, bns)
284 if err != nil { 248 if err != nil {
285 return nil, err 249 return nil, err
286 } 250 }
287 251
288 fairwayAvailabilities, err := loadFairwayAvailabilities(ctx, tx) 252 faids, err := doForFAs(ctx, bns, fas, tx, feedback)
289 if err != nil {
290 return nil, err
291 }
292
293 faids, err := doForFAs(ctx, bns, fairwayAvailabilities, fas, tx, feedback)
294 if err != nil { 253 if err != nil {
295 return nil, fmt.Errorf("Error processing data: %v", err) 254 return nil, fmt.Errorf("Error processing data: %v", err)
296 } 255 }
297 if len(faids) == 0 { 256 if len(faids) == 0 {
298 feedback.Info("No new fairway availablity data found") 257 feedback.Info("No new fairway availablity data found")
318 } 277 }
319 278
320 func doForFAs( 279 func doForFAs(
321 ctx context.Context, 280 ctx context.Context,
322 bnIds bottlenecks, 281 bnIds bottlenecks,
323 fairwayAvailabilities map[uniqueFairwayAvailability]int64,
324 fas []*ifaf.FairwayAvailability, 282 fas []*ifaf.FairwayAvailability,
325 tx *sql.Tx, 283 tx *sql.Tx,
326 feedback Feedback, 284 feedback Feedback,
327 ) ([]string, error) { 285 ) ([]string, error) {
328 286
353 for _, faRes := range fas { 311 for _, faRes := range fas {
354 if !bnIds.contains(faRes.Bottleneck_id) { 312 if !bnIds.contains(faRes.Bottleneck_id) {
355 feedback.Warn("Bottleneck %s not found in database.", faRes.Bottleneck_id) 313 feedback.Warn("Bottleneck %s not found in database.", faRes.Bottleneck_id)
356 continue 314 continue
357 } 315 }
358 uniqueFa := uniqueFairwayAvailability{ 316 err = insertFAStmt.QueryRowContext(
359 BottleneckId: faRes.Bottleneck_id, 317 ctx,
360 Surdat: faRes.SURDAT, 318 faRes.POSITION,
361 } 319 faRes.Bottleneck_id,
362 var found bool 320 faRes.SURDAT,
363 if faID, found = fairwayAvailabilities[uniqueFa]; !found { 321 faRes.Critical,
364 err = insertFAStmt.QueryRowContext( 322 faRes.Date_Info,
365 ctx, 323 faRes.Source,
366 faRes.POSITION, 324 ).Scan(&faID)
367 faRes.Bottleneck_id, 325 if err != nil {
368 faRes.SURDAT, 326 return nil, err
369 faRes.Critical,
370 faRes.Date_Info,
371 faRes.Source,
372 ).Scan(&faID)
373 if err != nil {
374 return nil, err
375 }
376 fairwayAvailabilities[uniqueFa] = faID
377 } 327 }
378 feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id) 328 feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id)
379 faIDs = append(faIDs, faRes.Bottleneck_id) 329 faIDs = append(faIDs, faRes.Bottleneck_id)
380 if faRes.Bottleneck_PDFs != nil { 330 if faRes.Bottleneck_PDFs != nil {
381 bnPdfCount := 0 331 bnPdfCount := 0