comparison pkg/imports/agm.go @ 3277:232fc90e6ee2

Disentangle gauge measurements and predictions Representing both in one table has led to the necessity to make the distinction at many places such as statements, definitions of partial indexes and application code. At least in one place in the AGM import the distinction in application code was too late and measurements matching an approved measurement could have been missed.
author Tom Gottfried <tom@intevation.de>
date Wed, 15 May 2019 19:08:49 +0200
parents 56b297592c0a
children 831193935739
comparison
equal deleted inserted replaced
3276:75db3199f76e 3277:232fc90e6ee2
76 FROM waterway.gauge_measurements o 76 FROM waterway.gauge_measurements o
77 JOIN waterway.gauge_measurements n 77 JOIN waterway.gauge_measurements n
78 ON n.fk_gauge_id = o.fk_gauge_id AND n.measure_date = o.measure_date 78 ON n.fk_gauge_id = o.fk_gauge_id AND n.measure_date = o.measure_date
79 WHERE n.id IN (SELECT key FROM staged) 79 WHERE n.id IN (SELECT key FROM staged)
80 AND o.id NOT IN (SELECT key FROM staged) 80 AND o.id NOT IN (SELECT key FROM staged)
81 AND NOT o.predicted
82 ) 81 )
83 DELETE FROM waterway.gauge_measurements WHERE id IN (SELECT id from to_delete)` 82 DELETE FROM waterway.gauge_measurements WHERE id IN (SELECT id from to_delete)`
84 83
85 agmStageDoneSQL = ` 84 agmStageDoneSQL = `
86 UPDATE waterway.gauge_measurements SET staging_done = true 85 UPDATE waterway.gauge_measurements SET staging_done = true
118 func (ttz *timetz) MarshalJSON() ([]byte, error) { 117 func (ttz *timetz) MarshalJSON() ([]byte, error) {
119 return json.Marshal(ttz.Time.Format("2006-01-02T15:04:05-07:00")) 118 return json.Marshal(ttz.Time.Format("2006-01-02T15:04:05-07:00"))
120 } 119 }
121 120
122 type agmLine struct { 121 type agmLine struct {
123 CountryCode string `json:"country-code"` 122 CountryCode string `json:"country-code"`
124 Sender string `json:"sender"` 123 Sender string `json:"sender"`
125 LanguageCode string `json:"language-code"` 124 LanguageCode string `json:"language-code"`
126 DateIssue timetz `json:"date-issue"` 125 DateIssue timetz `json:"date-issue"`
127 ReferenceCode string `json:"reference-code"` 126 ReferenceCode string `json:"reference-code"`
128 WaterLevel float64 `json:"water-level"` 127 WaterLevel float64 `json:"water-level"`
129 Predicted bool `json:"predicted"` 128 DateInfo timetz `json:"date-info"`
130 ValueMin *float64 `json:"value-min"` 129 SourceOrganization string `json:"source-organization"`
131 ValueMax *float64 `json:"value-max"`
132 DateInfo timetz `json:"date-info"`
133 SourceOrganization string `json:"source-organization"`
134 } 130 }
135 131
136 func (a *agmLine) hasDiff(b *agmLine) bool { 132 func (a *agmLine) hasDiff(b *agmLine) bool {
137 const eps = 0.00001 133 const eps = 0.00001
138 fdiff := func(x, y *float64) bool {
139 if x == nil && y == nil {
140 return false
141 }
142 if (x == nil && y != nil) || (x != nil && y == nil) {
143 return true
144 }
145 return math.Abs(*x-*y) > eps
146 }
147 return a.CountryCode != b.CountryCode || 134 return a.CountryCode != b.CountryCode ||
148 a.Sender != b.Sender || 135 a.Sender != b.Sender ||
149 a.LanguageCode != b.LanguageCode || 136 a.LanguageCode != b.LanguageCode ||
150 a.ReferenceCode != b.ReferenceCode || 137 a.ReferenceCode != b.ReferenceCode ||
151 math.Abs(a.WaterLevel-b.WaterLevel) > eps || 138 math.Abs(a.WaterLevel-b.WaterLevel) > eps ||
152 a.Predicted != b.Predicted ||
153 fdiff(a.ValueMin, b.ValueMin) ||
154 fdiff(a.ValueMax, b.ValueMax) ||
155 a.SourceOrganization != b.SourceOrganization 139 a.SourceOrganization != b.SourceOrganization
156 } 140 }
157 141
158 type agmSummaryEntry struct { 142 type agmSummaryEntry struct {
159 FKGaugeID models.Isrs `json:"fk-gauge-id"` 143 FKGaugeID models.Isrs `json:"fk-gauge-id"`
169 sender, 153 sender,
170 language_code, 154 language_code,
171 date_issue, 155 date_issue,
172 reference_code, 156 reference_code,
173 water_level, 157 water_level,
174 predicted,
175 value_min,
176 value_max,
177 date_info, 158 date_info,
178 source_organization 159 source_organization
179 FROM waterway.gauge_measurements 160 FROM waterway.gauge_measurements
180 WHERE 161 WHERE
181 fk_gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) AND 162 fk_gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) AND
189 sender, 170 sender,
190 language_code, 171 language_code,
191 date_issue, 172 date_issue,
192 reference_code, 173 reference_code,
193 water_level, 174 water_level,
194 predicted,
195 value_min,
196 value_max,
197 date_info, 175 date_info,
198 source_organization, 176 source_organization,
199 is_waterlevel, 177 is_waterlevel,
200 staging_done 178 staging_done
201 ) VALUES( 179 ) VALUES(
207 $10, 185 $10,
208 $11, 186 $11,
209 $12, 187 $12,
210 $13, 188 $13,
211 $14, 189 $14,
212 $15,
213 $16,
214 $17,
215 true, 190 true,
216 false 191 false
217 ) 192 )
218 RETURNING id` 193 RETURNING id`
219 194
380 oldSender string 355 oldSender string
381 oldLanguageCode string 356 oldLanguageCode string
382 oldDateIssue time.Time 357 oldDateIssue time.Time
383 oldReferenceCode string 358 oldReferenceCode string
384 oldValue float64 359 oldValue float64
385 oldPredicted bool
386 oldValueMin sql.NullFloat64
387 oldValueMax sql.NullFloat64
388 oldDateInfo time.Time 360 oldDateInfo time.Time
389 oldSourceOrganization string 361 oldSourceOrganization string
390 ) 362 )
391 363
392 err = selectStmt.QueryRowContext( 364 err = selectStmt.QueryRowContext(
403 &oldSender, 375 &oldSender,
404 &oldLanguageCode, 376 &oldLanguageCode,
405 &oldDateIssue, 377 &oldDateIssue,
406 &oldReferenceCode, 378 &oldReferenceCode,
407 &oldValue, 379 &oldValue,
408 &oldPredicted,
409 &oldValueMin,
410 &oldValueMax,
411 &oldDateInfo, 380 &oldDateInfo,
412 &oldSourceOrganization, 381 &oldSourceOrganization,
413 ) 382 )
414 383
415 var newEntry bool 384 var newEntry bool
430 value, err := strconv.ParseFloat(row[valueIdx], 32) 399 value, err := strconv.ParseFloat(row[valueIdx], 32)
431 if err != nil { 400 if err != nil {
432 return nil, fmt.Errorf("Invalid 'value' line %d: %v", line, err) 401 return nil, fmt.Errorf("Invalid 'value' line %d: %v", line, err)
433 } 402 }
434 newValue := value 403 newValue := value
435
436 newPredicted := false
437
438 newValueMin := sql.NullFloat64{
439 Float64: 0,
440 Valid: true,
441 }
442 newValueMax := sql.NullFloat64{
443 Float64: 0,
444 Valid: true,
445 }
446 404
447 newDateInfo := newDateIssue 405 newDateInfo := newDateIssue
448 406
449 newSourceOrganization := newSender 407 newSourceOrganization := newSender
450 408
468 newSender, 426 newSender,
469 newLanguageCode, 427 newLanguageCode,
470 newDateIssue, 428 newDateIssue,
471 newReferenceCode, 429 newReferenceCode,
472 newValue, 430 newValue,
473 newPredicted,
474 newValueMin,
475 newValueMax,
476 newDateInfo, 431 newDateInfo,
477 newSourceOrganization, 432 newSourceOrganization,
478 ).Scan(&newID); err != nil { 433 ).Scan(&newID); err != nil {
479 feedback.Warn(handleError(err).Error()) 434 feedback.Warn(handleError(err).Error())
480 if err := tx.Rollback(); err != nil { 435 if err := tx.Rollback(); err != nil {
497 newSender, 452 newSender,
498 newLanguageCode, 453 newLanguageCode,
499 newDateIssue, 454 newDateIssue,
500 newReferenceCode, 455 newReferenceCode,
501 newValue, 456 newValue,
502 newPredicted,
503 newValueMin,
504 newValueMax,
505 newDateInfo, 457 newDateInfo,
506 newSourceOrganization, 458 newSourceOrganization,
507 ) 459 )
508 460
509 ase := &agmSummaryEntry{ 461 ase := &agmSummaryEntry{
519 oldSender, 471 oldSender,
520 oldLanguageCode, 472 oldLanguageCode,
521 oldDateIssue, 473 oldDateIssue,
522 oldReferenceCode, 474 oldReferenceCode,
523 oldValue, 475 oldValue,
524 oldPredicted,
525 oldValueMin,
526 oldValueMax,
527 oldDateInfo, 476 oldDateInfo,
528 oldSourceOrganization, 477 oldSourceOrganization,
529 ) 478 )
530 // Ignore if there is no diff. 479 // Ignore if there is no diff.
531 if o.Predicted || !n.hasDiff(o) { 480 if !n.hasDiff(o) {
532 continue 481 continue
533 } 482 }
534 ase.Versions = []*agmLine{o, n} 483 ase.Versions = []*agmLine{o, n}
535 } 484 }
536 entries = append(entries, ase) 485 entries = append(entries, ase)
547 sender string, 496 sender string,
548 languageCode string, 497 languageCode string,
549 dateIssue time.Time, 498 dateIssue time.Time,
550 referenceCode string, 499 referenceCode string,
551 waterLevel float64, 500 waterLevel float64,
552 predicted bool,
553 valueMin sql.NullFloat64,
554 valueMax sql.NullFloat64,
555 dateInfo time.Time, 501 dateInfo time.Time,
556 sourceOrganization string, 502 sourceOrganization string,
557 ) *agmLine { 503 ) *agmLine {
558 nilFloat := func(v sql.NullFloat64) *float64 {
559 var p *float64
560 if v.Valid {
561 p = &v.Float64
562 }
563 return p
564 }
565 return &agmLine{ 504 return &agmLine{
566 CountryCode: countryCode, 505 CountryCode: countryCode,
567 Sender: sender, 506 Sender: sender,
568 LanguageCode: languageCode, 507 LanguageCode: languageCode,
569 DateIssue: timetz{dateIssue}, 508 DateIssue: timetz{dateIssue},
570 ReferenceCode: referenceCode, 509 ReferenceCode: referenceCode,
571 WaterLevel: waterLevel, 510 WaterLevel: waterLevel,
572 Predicted: predicted,
573 ValueMin: nilFloat(valueMin),
574 ValueMax: nilFloat(valueMax),
575 DateInfo: timetz{dateInfo}, 511 DateInfo: timetz{dateInfo},
576 SourceOrganization: sourceOrganization, 512 SourceOrganization: sourceOrganization,
577 } 513 }
578 } 514 }