Mercurial > gemma
comparison pkg/imports/agm.go @ 2562:ce39e9954e85
Make upload of AGM require only "fk_gauge_id" "measure_date" and "value"
author | Sascha Wilde <wilde@intevation.de> |
---|---|
date | Fri, 08 Mar 2019 18:57:58 +0100 |
parents | de4dc3d16647 |
children | 6d4f361c36e8 |
comparison
equal
deleted
inserted
replaced
2561:4d5f419a2318 | 2562:ce39e9954e85 |
---|---|
8 // – Österreichische Wasserstraßen-Gesellschaft mbH | 8 // – Österreichische Wasserstraßen-Gesellschaft mbH |
9 // Software engineering by Intevation GmbH | 9 // Software engineering by Intevation GmbH |
10 // | 10 // |
11 // Author(s): | 11 // Author(s): |
12 // * Sascha L. Teichmann <sascha.teichmann@intevation.de> | 12 // * Sascha L. Teichmann <sascha.teichmann@intevation.de> |
13 // * Sascha Wilde <wilde@intevation.de> | |
13 | 14 |
14 package imports | 15 package imports |
15 | 16 |
16 import ( | 17 import ( |
17 "bufio" | 18 "bufio" |
31 "gemma.intevation.de/gemma/pkg/misc" | 32 "gemma.intevation.de/gemma/pkg/misc" |
32 "gemma.intevation.de/gemma/pkg/models" | 33 "gemma.intevation.de/gemma/pkg/models" |
33 ) | 34 ) |
34 | 35 |
35 type ApprovedGaugeMeasurements struct { | 36 type ApprovedGaugeMeasurements struct { |
36 Dir string `json:"dir"` | 37 Dir string `json:"dir"` |
38 Originator string `json:"originator"` | |
37 } | 39 } |
38 | 40 |
39 // GMAPJobKind is the unique name of an approved gauge measurements import job. | 41 // GMAPJobKind is the unique name of an approved gauge measurements import job. |
40 const AGMJobKind JobKind = "agm" | 42 const AGMJobKind JobKind = "agm" |
41 | 43 |
142 return math.Abs(*x-*y) > eps | 144 return math.Abs(*x-*y) > eps |
143 } | 145 } |
144 return a.CountryCode != b.CountryCode || | 146 return a.CountryCode != b.CountryCode || |
145 a.Sender != b.Sender || | 147 a.Sender != b.Sender || |
146 a.LanguageCode != b.LanguageCode || | 148 a.LanguageCode != b.LanguageCode || |
147 !a.DateIssue.Time.Equal(b.DateIssue.Time) || | |
148 a.ReferenceCode != b.ReferenceCode || | 149 a.ReferenceCode != b.ReferenceCode || |
149 math.Abs(a.WaterLevel-b.WaterLevel) > eps || | 150 math.Abs(a.WaterLevel-b.WaterLevel) > eps || |
150 a.Predicted != b.Predicted || | 151 a.Predicted != b.Predicted || |
151 fdiff(a.ValueMin, b.ValueMin) || | 152 fdiff(a.ValueMin, b.ValueMin) || |
152 fdiff(a.ValueMax, b.ValueMax) || | 153 fdiff(a.ValueMax, b.ValueMax) || |
153 !a.DateInfo.Time.Equal(b.DateInfo.Time) || | |
154 a.SourceOrganization != b.SourceOrganization | 154 a.SourceOrganization != b.SourceOrganization |
155 } | 155 } |
156 | 156 |
157 type agmSummaryEntry struct { | 157 type agmSummaryEntry struct { |
158 FKGaugeID models.Isrs `json:"fk-gauge-id"` | 158 FKGaugeID models.Isrs `json:"fk-gauge-id"` |
247 if err != nil { | 247 if err != nil { |
248 return nil, err | 248 return nil, err |
249 } | 249 } |
250 | 250 |
251 var ( | 251 var ( |
252 fkGaugeIDIdx = -1 | 252 fkGaugeIDIdx = -1 |
253 measureDateIdx = -1 | 253 measureDateIdx = -1 |
254 fromIdx = -1 | 254 valueIdx = -1 |
255 languageCodeIdx = -1 | |
256 countryCodeIdx = -1 | |
257 dateIssueIdx = -1 | |
258 referenceCodeIdx = -1 | |
259 valueIdx = -1 | |
260 predictedIdx = -1 | |
261 valueMinIdx = -1 | |
262 valueMaxIdx = -1 | |
263 dateInfoIdx = -1 | |
264 originatorIdx = -1 | |
265 unitIdx = -1 | |
266 ) | 255 ) |
267 | 256 |
268 headerFields := []struct { | 257 headerFields := []struct { |
269 idx *int | 258 idx *int |
270 name string | 259 name string |
271 }{ | 260 }{ |
272 {&fkGaugeIDIdx, "fk_gauge_id"}, | 261 {&fkGaugeIDIdx, "fk_gauge_id"}, |
273 {&measureDateIdx, "measure_date"}, | 262 {&measureDateIdx, "measure_date"}, |
274 {&fromIdx, "from"}, // "sender", | |
275 {&languageCodeIdx, "language_code"}, | |
276 {&countryCodeIdx, "country_code"}, | |
277 {&dateIssueIdx, "date_issue"}, | |
278 {&referenceCodeIdx, "reference_code"}, | |
279 {&valueIdx, "value"}, // "water_level", | 263 {&valueIdx, "value"}, // "water_level", |
280 {&predictedIdx, "predicted"}, | |
281 // "is_waterlevel", | |
282 {&valueMinIdx, "value_min"}, | |
283 {&valueMaxIdx, "value_max"}, | |
284 {&dateInfoIdx, "date_info"}, | |
285 {&originatorIdx, "originator"}, // "source_organization", | |
286 {&unitIdx, "unit"}, | |
287 } | 264 } |
288 | 265 |
289 nextHeader: | 266 nextHeader: |
290 for i, f := range headers { | 267 for i, f := range headers { |
291 h := strings.Replace(strings.ToLower( | 268 h := strings.Replace(strings.ToLower( |
311 } | 288 } |
312 if len(missing) > 0 { | 289 if len(missing) > 0 { |
313 return nil, fmt.Errorf("Missing columns: %s", strings.Join(missing, ", ")) | 290 return nil, fmt.Errorf("Missing columns: %s", strings.Join(missing, ", ")) |
314 } | 291 } |
315 | 292 |
316 inCm, _ := rescale("cm") | |
317 scaler := func(row []string) (func(float32) float32, error) { | |
318 if unitIdx == -1 { | |
319 return inCm, nil | |
320 } | |
321 unit := row[unitIdx] | |
322 if unit == "cm" { | |
323 return inCm, nil | |
324 } | |
325 s, err := rescale(unit) | |
326 return s, err | |
327 } | |
328 | |
329 tx, err := conn.BeginTx(ctx, nil) | 293 tx, err := conn.BeginTx(ctx, nil) |
330 if err != nil { | 294 if err != nil { |
331 return nil, err | 295 return nil, err |
332 } | 296 } |
333 defer tx.Rollback() | 297 defer tx.Rollback() |
364 switch { | 328 switch { |
365 case err == io.EOF || len(row) == 0: | 329 case err == io.EOF || len(row) == 0: |
366 break lines | 330 break lines |
367 case err != nil: | 331 case err != nil: |
368 return nil, fmt.Errorf("CSV parsing failed: %v", err) | 332 return nil, fmt.Errorf("CSV parsing failed: %v", err) |
369 } | |
370 convert, err := scaler(row) | |
371 if err != nil { | |
372 return nil, fmt.Errorf("line %d: %v", line, err) | |
373 } | 333 } |
374 | 334 |
375 gids := row[fkGaugeIDIdx] | 335 gids := row[fkGaugeIDIdx] |
376 gid, err := models.IsrsFromString(gids) | 336 gid, err := models.IsrsFromString(gids) |
377 if err != nil { | 337 if err != nil { |
451 newEntry = true | 411 newEntry = true |
452 case err != nil: | 412 case err != nil: |
453 return nil, err | 413 return nil, err |
454 } | 414 } |
455 | 415 |
456 newSender := row[fromIdx] | 416 newSender := agm.Originator |
457 newLanguageCode := row[languageCodeIdx] | 417 newCountryCode := gid.CountryCode |
458 newCountryCode := row[countryCodeIdx] | 418 newLanguageCode := misc.CCtoLang[gid.CountryCode] |
459 | 419 newDateIssue := time.Now() |
460 dis, err := guessDate(row[dateIssueIdx]) | 420 newReferenceCode := "ZPG" |
461 if err != nil { | |
462 return nil, fmt.Errorf("Invalid 'date_issue' line %d: %v", line, err) | |
463 } | |
464 newDateIssue := dis | |
465 | |
466 newReferenceCode := row[referenceCodeIdx] | |
467 | 421 |
468 value, err := strconv.ParseFloat(row[valueIdx], 32) | 422 value, err := strconv.ParseFloat(row[valueIdx], 32) |
469 if err != nil { | 423 if err != nil { |
470 return nil, fmt.Errorf("Invalid 'value' line %d: %v", line, err) | 424 return nil, fmt.Errorf("Invalid 'value' line %d: %v", line, err) |
471 } | 425 } |
472 newValue := float64(convert(float32(value))) | 426 newValue := value |
473 | 427 |
474 newPredicted := strings.ToLower(row[predictedIdx]) == "true" | 428 newPredicted := false |
475 | 429 |
476 var newValueMin sql.NullFloat64 | 430 newValueMin := sql.NullFloat64{ |
477 if vm := row[valueMinIdx]; vm != "" { | 431 Float64: 0, |
478 valueMin, err := strconv.ParseFloat(vm, 32) | 432 Valid: true, |
479 if err != nil { | 433 } |
480 return nil, fmt.Errorf("Invalid 'value_min' line %d: %v", line, err) | 434 newValueMax := sql.NullFloat64{ |
481 } | 435 Float64: 0, |
482 newValueMin = sql.NullFloat64{ | 436 Valid: true, |
483 Float64: float64(convert(float32(valueMin))), | 437 } |
484 Valid: true, | 438 |
485 } | 439 newDateInfo := newDateIssue |
486 } | 440 |
487 | 441 newSourceOrganization := newSender |
488 var newValueMax sql.NullFloat64 | |
489 if vm := row[valueMaxIdx]; vm != "" { | |
490 valueMax, err := strconv.ParseFloat(vm, 32) | |
491 if err != nil { | |
492 return nil, fmt.Errorf("Invalid 'value_max' line %d: %v", line, err) | |
493 } | |
494 newValueMax = sql.NullFloat64{ | |
495 Float64: float64(convert(float32(valueMax))), | |
496 Valid: true, | |
497 } | |
498 } | |
499 | |
500 din, err := guessDate(row[dateInfoIdx]) | |
501 if err != nil { | |
502 return nil, fmt.Errorf("Invalid 'date_info' line %d: %v", line, err) | |
503 } | |
504 newDateInfo := din | |
505 | |
506 newSourceOrganization := row[originatorIdx] | |
507 | 442 |
508 var newID int64 | 443 var newID int64 |
509 | 444 |
510 if err := insertStmt.QueryRowContext( | 445 if err := insertStmt.QueryRowContext( |
511 ctx, | 446 ctx, |