comparison pkg/imports/agm.go @ 2562:ce39e9954e85

Make upload of AGM require only "fk_gauge_id" "measure_date" and "value"
author Sascha Wilde <wilde@intevation.de>
date Fri, 08 Mar 2019 18:57:58 +0100
parents de4dc3d16647
children 6d4f361c36e8
comparison
equal deleted inserted replaced
2561:4d5f419a2318 2562:ce39e9954e85
8 // – Österreichische Wasserstraßen-Gesellschaft mbH 8 // – Österreichische Wasserstraßen-Gesellschaft mbH
9 // Software engineering by Intevation GmbH 9 // Software engineering by Intevation GmbH
10 // 10 //
11 // Author(s): 11 // Author(s):
12 // * Sascha L. Teichmann <sascha.teichmann@intevation.de> 12 // * Sascha L. Teichmann <sascha.teichmann@intevation.de>
13 // * Sascha Wilde <wilde@intevation.de>
13 14
14 package imports 15 package imports
15 16
16 import ( 17 import (
17 "bufio" 18 "bufio"
31 "gemma.intevation.de/gemma/pkg/misc" 32 "gemma.intevation.de/gemma/pkg/misc"
32 "gemma.intevation.de/gemma/pkg/models" 33 "gemma.intevation.de/gemma/pkg/models"
33 ) 34 )
34 35
35 type ApprovedGaugeMeasurements struct { 36 type ApprovedGaugeMeasurements struct {
36 Dir string `json:"dir"` 37 Dir string `json:"dir"`
38 Originator string `json:"originator"`
37 } 39 }
38 40
39 // GMAPJobKind is the unique name of an approved gauge measurements import job. 41 // GMAPJobKind is the unique name of an approved gauge measurements import job.
40 const AGMJobKind JobKind = "agm" 42 const AGMJobKind JobKind = "agm"
41 43
142 return math.Abs(*x-*y) > eps 144 return math.Abs(*x-*y) > eps
143 } 145 }
144 return a.CountryCode != b.CountryCode || 146 return a.CountryCode != b.CountryCode ||
145 a.Sender != b.Sender || 147 a.Sender != b.Sender ||
146 a.LanguageCode != b.LanguageCode || 148 a.LanguageCode != b.LanguageCode ||
147 !a.DateIssue.Time.Equal(b.DateIssue.Time) ||
148 a.ReferenceCode != b.ReferenceCode || 149 a.ReferenceCode != b.ReferenceCode ||
149 math.Abs(a.WaterLevel-b.WaterLevel) > eps || 150 math.Abs(a.WaterLevel-b.WaterLevel) > eps ||
150 a.Predicted != b.Predicted || 151 a.Predicted != b.Predicted ||
151 fdiff(a.ValueMin, b.ValueMin) || 152 fdiff(a.ValueMin, b.ValueMin) ||
152 fdiff(a.ValueMax, b.ValueMax) || 153 fdiff(a.ValueMax, b.ValueMax) ||
153 !a.DateInfo.Time.Equal(b.DateInfo.Time) ||
154 a.SourceOrganization != b.SourceOrganization 154 a.SourceOrganization != b.SourceOrganization
155 } 155 }
156 156
157 type agmSummaryEntry struct { 157 type agmSummaryEntry struct {
158 FKGaugeID models.Isrs `json:"fk-gauge-id"` 158 FKGaugeID models.Isrs `json:"fk-gauge-id"`
247 if err != nil { 247 if err != nil {
248 return nil, err 248 return nil, err
249 } 249 }
250 250
251 var ( 251 var (
252 fkGaugeIDIdx = -1 252 fkGaugeIDIdx = -1
253 measureDateIdx = -1 253 measureDateIdx = -1
254 fromIdx = -1 254 valueIdx = -1
255 languageCodeIdx = -1
256 countryCodeIdx = -1
257 dateIssueIdx = -1
258 referenceCodeIdx = -1
259 valueIdx = -1
260 predictedIdx = -1
261 valueMinIdx = -1
262 valueMaxIdx = -1
263 dateInfoIdx = -1
264 originatorIdx = -1
265 unitIdx = -1
266 ) 255 )
267 256
268 headerFields := []struct { 257 headerFields := []struct {
269 idx *int 258 idx *int
270 name string 259 name string
271 }{ 260 }{
272 {&fkGaugeIDIdx, "fk_gauge_id"}, 261 {&fkGaugeIDIdx, "fk_gauge_id"},
273 {&measureDateIdx, "measure_date"}, 262 {&measureDateIdx, "measure_date"},
274 {&fromIdx, "from"}, // "sender",
275 {&languageCodeIdx, "language_code"},
276 {&countryCodeIdx, "country_code"},
277 {&dateIssueIdx, "date_issue"},
278 {&referenceCodeIdx, "reference_code"},
279 {&valueIdx, "value"}, // "water_level", 263 {&valueIdx, "value"}, // "water_level",
280 {&predictedIdx, "predicted"},
281 // "is_waterlevel",
282 {&valueMinIdx, "value_min"},
283 {&valueMaxIdx, "value_max"},
284 {&dateInfoIdx, "date_info"},
285 {&originatorIdx, "originator"}, // "source_organization",
286 {&unitIdx, "unit"},
287 } 264 }
288 265
289 nextHeader: 266 nextHeader:
290 for i, f := range headers { 267 for i, f := range headers {
291 h := strings.Replace(strings.ToLower( 268 h := strings.Replace(strings.ToLower(
311 } 288 }
312 if len(missing) > 0 { 289 if len(missing) > 0 {
313 return nil, fmt.Errorf("Missing columns: %s", strings.Join(missing, ", ")) 290 return nil, fmt.Errorf("Missing columns: %s", strings.Join(missing, ", "))
314 } 291 }
315 292
316 inCm, _ := rescale("cm")
317 scaler := func(row []string) (func(float32) float32, error) {
318 if unitIdx == -1 {
319 return inCm, nil
320 }
321 unit := row[unitIdx]
322 if unit == "cm" {
323 return inCm, nil
324 }
325 s, err := rescale(unit)
326 return s, err
327 }
328
329 tx, err := conn.BeginTx(ctx, nil) 293 tx, err := conn.BeginTx(ctx, nil)
330 if err != nil { 294 if err != nil {
331 return nil, err 295 return nil, err
332 } 296 }
333 defer tx.Rollback() 297 defer tx.Rollback()
364 switch { 328 switch {
365 case err == io.EOF || len(row) == 0: 329 case err == io.EOF || len(row) == 0:
366 break lines 330 break lines
367 case err != nil: 331 case err != nil:
368 return nil, fmt.Errorf("CSV parsing failed: %v", err) 332 return nil, fmt.Errorf("CSV parsing failed: %v", err)
369 }
370 convert, err := scaler(row)
371 if err != nil {
372 return nil, fmt.Errorf("line %d: %v", line, err)
373 } 333 }
374 334
375 gids := row[fkGaugeIDIdx] 335 gids := row[fkGaugeIDIdx]
376 gid, err := models.IsrsFromString(gids) 336 gid, err := models.IsrsFromString(gids)
377 if err != nil { 337 if err != nil {
451 newEntry = true 411 newEntry = true
452 case err != nil: 412 case err != nil:
453 return nil, err 413 return nil, err
454 } 414 }
455 415
456 newSender := row[fromIdx] 416 newSender := agm.Originator
457 newLanguageCode := row[languageCodeIdx] 417 newCountryCode := gid.CountryCode
458 newCountryCode := row[countryCodeIdx] 418 newLanguageCode := misc.CCtoLang[gid.CountryCode]
459 419 newDateIssue := time.Now()
460 dis, err := guessDate(row[dateIssueIdx]) 420 newReferenceCode := "ZPG"
461 if err != nil {
462 return nil, fmt.Errorf("Invalid 'date_issue' line %d: %v", line, err)
463 }
464 newDateIssue := dis
465
466 newReferenceCode := row[referenceCodeIdx]
467 421
468 value, err := strconv.ParseFloat(row[valueIdx], 32) 422 value, err := strconv.ParseFloat(row[valueIdx], 32)
469 if err != nil { 423 if err != nil {
470 return nil, fmt.Errorf("Invalid 'value' line %d: %v", line, err) 424 return nil, fmt.Errorf("Invalid 'value' line %d: %v", line, err)
471 } 425 }
472 newValue := float64(convert(float32(value))) 426 newValue := value
473 427
474 newPredicted := strings.ToLower(row[predictedIdx]) == "true" 428 newPredicted := false
475 429
476 var newValueMin sql.NullFloat64 430 newValueMin := sql.NullFloat64{
477 if vm := row[valueMinIdx]; vm != "" { 431 Float64: 0,
478 valueMin, err := strconv.ParseFloat(vm, 32) 432 Valid: true,
479 if err != nil { 433 }
480 return nil, fmt.Errorf("Invalid 'value_min' line %d: %v", line, err) 434 newValueMax := sql.NullFloat64{
481 } 435 Float64: 0,
482 newValueMin = sql.NullFloat64{ 436 Valid: true,
483 Float64: float64(convert(float32(valueMin))), 437 }
484 Valid: true, 438
485 } 439 newDateInfo := newDateIssue
486 } 440
487 441 newSourceOrganization := newSender
488 var newValueMax sql.NullFloat64
489 if vm := row[valueMaxIdx]; vm != "" {
490 valueMax, err := strconv.ParseFloat(vm, 32)
491 if err != nil {
492 return nil, fmt.Errorf("Invalid 'value_max' line %d: %v", line, err)
493 }
494 newValueMax = sql.NullFloat64{
495 Float64: float64(convert(float32(valueMax))),
496 Valid: true,
497 }
498 }
499
500 din, err := guessDate(row[dateInfoIdx])
501 if err != nil {
502 return nil, fmt.Errorf("Invalid 'date_info' line %d: %v", line, err)
503 }
504 newDateInfo := din
505
506 newSourceOrganization := row[originatorIdx]
507 442
508 var newID int64 443 var newID int64
509 444
510 if err := insertStmt.QueryRowContext( 445 if err := insertStmt.QueryRowContext(
511 ctx, 446 ctx,