comparison pkg/imports/fa.go @ 4655:1e96ff2da1f3

Fixed the time range for which data from FA service is requested.
author Sascha Wilde <wilde@intevation.de>
date Mon, 14 Oct 2019 13:17:02 +0200
parents 7465b6244d5e
children ca6a5f722471
comparison
equal deleted inserted replaced
4651:e9a99e81f723 4655:1e96ff2da1f3
46 const ( 46 const (
47 // maxHistoryDays is the numbers of days to look back. 47 // maxHistoryDays is the numbers of days to look back.
48 maxHistoryDays = 7 48 maxHistoryDays = 7
49 ) 49 )
50 50
51 // Find bottlenecks of the current users country for a given time range
51 const ( 52 const (
52 listBottlenecksSQL = ` 53 listBottlenecksSQL = `
53 SELECT DISTINCT 54 SELECT DISTINCT
54 bottleneck_id 55 bottleneck_id
55 FROM waterway.bottlenecks 56 FROM waterway.bottlenecks
56 WHERE responsible_country = ( 57 WHERE responsible_country = (
57 SELECT country FROM users.list_users WHERE username = current_user) 58 SELECT country FROM users.list_users WHERE username = current_user)
58 AND staging_done = true 59 AND staging_done = true
60 AND validity && $1
59 ORDER BY bottleneck_id 61 ORDER BY bottleneck_id
60 ` 62 `
61 63
62 latestMeasureDateSQL = ` 64 latestMeasureDateSQL = `
63 SELECT 65 SELECT CASE WHEN (SELECT array_agg(DISTINCT bottleneck_id) @> $1::varchar[]
64 measure_date 66 FROM waterway.fairway_availability)
65 FROM waterway.effective_fairway_availability 67 THEN (SELECT min(x.m) FROM
66 ORDER BY measure_date DESC LIMIT 1 68 (SELECT max(efa.measure_date) AS m
69 FROM waterway.fairway_availability fa,
70 waterway.effective_fairway_availability efa
71 WHERE fa.bottleneck_id = ANY($1)
72 AND efa.fairway_availability_id = fa.id
73 GROUP BY fa.bottleneck_id) AS x)
74 END
67 ` 75 `
68 76
69 insertFASQL = ` 77 insertFASQL = `
70 INSERT INTO waterway.fairway_availability ( 78 INSERT INTO waterway.fairway_availability (
71 position, 79 position,
191 } 199 }
192 200
193 func loadBottleneckCountries(ctx context.Context, tx *sql.Tx) (bottlenecks, error) { 201 func loadBottleneckCountries(ctx context.Context, tx *sql.Tx) (bottlenecks, error) {
194 202
195 // Get available bottlenecks from database for use as filter in SOAP request 203 // Get available bottlenecks from database for use as filter in SOAP request
196 rows, err := tx.QueryContext(ctx, listBottlenecksSQL) 204 // We are only interested in bottlenecks which were valid in the last
205 // maxHistoryDays
206 var tfrom pgtype.Timestamptz
207 tfrom.Set(time.Now().AddDate(0, 0, -maxHistoryDays))
208 trange := pgtype.Tstzrange{
209 Lower: tfrom,
210 LowerType: pgtype.Inclusive,
211 UpperType: pgtype.Unbounded,
212 Status: pgtype.Present,
213 }
214
215 rows, err := tx.QueryContext(ctx, listBottlenecksSQL, trange)
197 if err != nil { 216 if err != nil {
198 return nil, err 217 return nil, err
199 } 218 }
200 defer rows.Close() 219 defer rows.Close()
201 220
213 } 232 }
214 233
215 return bns, nil 234 return bns, nil
216 } 235 }
217 236
218 func latestDate(ctx context.Context, tx *sql.Tx) (pgtype.Timestamp, error) { 237 // Get the earliest of all the latest measure_dates for a list of bottlenecks.
219 var date pgtype.Timestamp 238 // We do not pick the latest of all dates, so that we don't miss data if one of
220 err := tx.QueryRowContext(ctx, latestMeasureDateSQL).Scan(&date) 239 // the bottlenecks we are taking into account was not active for some time...
240 func latestDate(
241 ctx context.Context,
242 tx *sql.Tx,
243 bns bottlenecks,
244 ) (pgtype.Timestamp, error) {
245 var (
246 date pgtype.Timestamp
247 pgbns pgtype.TextArray
248 )
249 pgbns.Set(bns)
250 err := tx.QueryRowContext(ctx, latestMeasureDateSQL, &pgbns).Scan(&date)
221 switch { 251 switch {
222 case err == sql.ErrNoRows: 252 case err == sql.ErrNoRows:
223 date = pgtype.Timestamp{ 253 date = pgtype.Timestamp{
224 Time: time.Now().AddDate(0, 0, -maxHistoryDays), 254 Time: time.Now().AddDate(0, 0, -maxHistoryDays),
225 } 255 }
450 480
451 fetch := func( 481 fetch := func(
452 ctx context.Context, 482 ctx context.Context,
453 tx *sql.Tx, bns bottlenecks, 483 tx *sql.Tx, bns bottlenecks,
454 ) ([]*ifaf.FairwayAvailability, error) { 484 ) ([]*ifaf.FairwayAvailability, error) {
455 485 feedback.Info("Requesting data for: %v", bns)
456 latest, err := latestDate(ctx, tx) 486
487 latest, err := latestDate(ctx, tx, bns)
457 if err != nil { 488 if err != nil {
458 return nil, err 489 return nil, err
459 } 490 }
491 feedback.Info("Requesting data starting from %s", latest.Time)
460 492
461 client := ifaf.NewFairwayAvailabilityService(fa.URL, fa.Insecure, nil) 493 client := ifaf.NewFairwayAvailabilityService(fa.URL, fa.Insecure, nil)
462 494
463 period := ifaf.RequestedPeriod{ 495 period := ifaf.RequestedPeriod{
464 Date_start: latest.Time, 496 Date_start: latest.Time,