Mercurial > gemma
comparison pkg/imports/fa.go @ 4655:1e96ff2da1f3
Fixed the time range for which data from FA service is requested.
author | Sascha Wilde <wilde@intevation.de> |
---|---|
date | Mon, 14 Oct 2019 13:17:02 +0200 |
parents | 7465b6244d5e |
children | ca6a5f722471 |
comparison
equal
deleted
inserted
replaced
4651:e9a99e81f723 | 4655:1e96ff2da1f3 |
---|---|
46 const ( | 46 const ( |
47 // maxHistoryDays is the numbers of days to look back. | 47 // maxHistoryDays is the numbers of days to look back. |
48 maxHistoryDays = 7 | 48 maxHistoryDays = 7 |
49 ) | 49 ) |
50 | 50 |
51 // Find bottlenecks of the current users country for a given time range | |
51 const ( | 52 const ( |
52 listBottlenecksSQL = ` | 53 listBottlenecksSQL = ` |
53 SELECT DISTINCT | 54 SELECT DISTINCT |
54 bottleneck_id | 55 bottleneck_id |
55 FROM waterway.bottlenecks | 56 FROM waterway.bottlenecks |
56 WHERE responsible_country = ( | 57 WHERE responsible_country = ( |
57 SELECT country FROM users.list_users WHERE username = current_user) | 58 SELECT country FROM users.list_users WHERE username = current_user) |
58 AND staging_done = true | 59 AND staging_done = true |
60 AND validity && $1 | |
59 ORDER BY bottleneck_id | 61 ORDER BY bottleneck_id |
60 ` | 62 ` |
61 | 63 |
62 latestMeasureDateSQL = ` | 64 latestMeasureDateSQL = ` |
63 SELECT | 65 SELECT CASE WHEN (SELECT array_agg(DISTINCT bottleneck_id) @> $1::varchar[] |
64 measure_date | 66 FROM waterway.fairway_availability) |
65 FROM waterway.effective_fairway_availability | 67 THEN (SELECT min(x.m) FROM |
66 ORDER BY measure_date DESC LIMIT 1 | 68 (SELECT max(efa.measure_date) AS m |
69 FROM waterway.fairway_availability fa, | |
70 waterway.effective_fairway_availability efa | |
71 WHERE fa.bottleneck_id = ANY($1) | |
72 AND efa.fairway_availability_id = fa.id | |
73 GROUP BY fa.bottleneck_id) AS x) | |
74 END | |
67 ` | 75 ` |
68 | 76 |
69 insertFASQL = ` | 77 insertFASQL = ` |
70 INSERT INTO waterway.fairway_availability ( | 78 INSERT INTO waterway.fairway_availability ( |
71 position, | 79 position, |
191 } | 199 } |
192 | 200 |
193 func loadBottleneckCountries(ctx context.Context, tx *sql.Tx) (bottlenecks, error) { | 201 func loadBottleneckCountries(ctx context.Context, tx *sql.Tx) (bottlenecks, error) { |
194 | 202 |
195 // Get available bottlenecks from database for use as filter in SOAP request | 203 // Get available bottlenecks from database for use as filter in SOAP request |
196 rows, err := tx.QueryContext(ctx, listBottlenecksSQL) | 204 // We are only interested in bottlenecks which were valid in the last |
205 // maxHistoryDays | |
206 var tfrom pgtype.Timestamptz | |
207 tfrom.Set(time.Now().AddDate(0, 0, -maxHistoryDays)) | |
208 trange := pgtype.Tstzrange{ | |
209 Lower: tfrom, | |
210 LowerType: pgtype.Inclusive, | |
211 UpperType: pgtype.Unbounded, | |
212 Status: pgtype.Present, | |
213 } | |
214 | |
215 rows, err := tx.QueryContext(ctx, listBottlenecksSQL, trange) | |
197 if err != nil { | 216 if err != nil { |
198 return nil, err | 217 return nil, err |
199 } | 218 } |
200 defer rows.Close() | 219 defer rows.Close() |
201 | 220 |
213 } | 232 } |
214 | 233 |
215 return bns, nil | 234 return bns, nil |
216 } | 235 } |
217 | 236 |
218 func latestDate(ctx context.Context, tx *sql.Tx) (pgtype.Timestamp, error) { | 237 // Get the earliest of all the latest measure_dates for a list of bottlenecks. |
219 var date pgtype.Timestamp | 238 // We do not pick the latest of all dates, so that we don't miss data if one of |
220 err := tx.QueryRowContext(ctx, latestMeasureDateSQL).Scan(&date) | 239 // the bottlenecks we are taking into account was not active for some time... |
240 func latestDate( | |
241 ctx context.Context, | |
242 tx *sql.Tx, | |
243 bns bottlenecks, | |
244 ) (pgtype.Timestamp, error) { | |
245 var ( | |
246 date pgtype.Timestamp | |
247 pgbns pgtype.TextArray | |
248 ) | |
249 pgbns.Set(bns) | |
250 err := tx.QueryRowContext(ctx, latestMeasureDateSQL, &pgbns).Scan(&date) | |
221 switch { | 251 switch { |
222 case err == sql.ErrNoRows: | 252 case err == sql.ErrNoRows: |
223 date = pgtype.Timestamp{ | 253 date = pgtype.Timestamp{ |
224 Time: time.Now().AddDate(0, 0, -maxHistoryDays), | 254 Time: time.Now().AddDate(0, 0, -maxHistoryDays), |
225 } | 255 } |
450 | 480 |
451 fetch := func( | 481 fetch := func( |
452 ctx context.Context, | 482 ctx context.Context, |
453 tx *sql.Tx, bns bottlenecks, | 483 tx *sql.Tx, bns bottlenecks, |
454 ) ([]*ifaf.FairwayAvailability, error) { | 484 ) ([]*ifaf.FairwayAvailability, error) { |
455 | 485 feedback.Info("Requesting data for: %v", bns) |
456 latest, err := latestDate(ctx, tx) | 486 |
487 latest, err := latestDate(ctx, tx, bns) | |
457 if err != nil { | 488 if err != nil { |
458 return nil, err | 489 return nil, err |
459 } | 490 } |
491 feedback.Info("Requesting data starting from %s", latest.Time) | |
460 | 492 |
461 client := ifaf.NewFairwayAvailabilityService(fa.URL, fa.Insecure, nil) | 493 client := ifaf.NewFairwayAvailabilityService(fa.URL, fa.Insecure, nil) |
462 | 494 |
463 period := ifaf.RequestedPeriod{ | 495 period := ifaf.RequestedPeriod{ |
464 Date_start: latest.Time, | 496 Date_start: latest.Time, |