Mercurial > gemma
changeset 2105:58a28715e386
Approved gauge measurement import: Added diff-summary. XXX: May be broken!
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Mon, 04 Feb 2019 15:39:58 +0100 |
parents | c9af355d4a2c |
children | 2b72f5e005aa |
files | pkg/imports/agm.go |
diffstat | 1 files changed, 265 insertions(+), 47 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/imports/agm.go Mon Feb 04 14:35:47 2019 +0100 +++ b/pkg/imports/agm.go Mon Feb 04 15:39:58 2019 +0100 @@ -18,6 +18,7 @@ "context" "database/sql" "encoding/csv" + "encoding/json" "fmt" "io" "os" @@ -115,6 +116,89 @@ "2006-01-02T15:04:05-07:00", }).Guess +type timetz struct{ time.Time } + +func (ttz *timetz) MarshalJSON() ([]byte, error) { + return json.Marshal(ttz.Time.Format("2006-01-02T15:04:05-07:00")) +} + +type agmLine struct { + CountryCode string `json:"country-code"` + Sender string `json:"sender"` + LanguageCode string `json:"language-code"` + DateIssue timetz `json:"date-issue"` + ReferenceCode string `json:"reference-code"` + WaterLevel float64 `json:"water-level"` + Predicted bool `json:"predicted"` + ValueMin *float64 `json:"value-min"` + ValueMax *float64 `json:"value-max"` + DateInfo timetz `json:"date-info"` + SourceOrganization string `json:"source-organization"` +} + +type agmSummaryEntry struct { + FKGaugeID models.Isrs `json:"fk-gauge-id"` + MeasureDate timetz `json:"measure-date"` + Versions []*agmLine `json:"versions"` +} + +const ( + agmSelectSQL = ` +SELECT + id, + country_code, + sender, + language_code, + date_issue, + reference_code, + water_level, + predicted, + value_min, + value_max, + date_info, + source_organization +FROM waterway.gauge_measurements +WHERE + fk_gauge_id = ($1, $2, $3, $4, $5) AND + measure_date = $6 AND staging_done` + + agmInsertSQL = ` +INSERT INTO waterway.gauge_measurements ( + fk_gauge_id, + measure_date, + country_code, + sender, + language_code, + date_issue, + reference_code, + water_level, + predicted, + value_min, + value_max, + date_info, + source_organization, + is_waterlevel, + staging_done +) VALUES( + ($1, $2, $3, $4, $5), + $6, + $7, + $8, + $9, + $10, + $11, + $12, + $13, + $14, + $15, + $16, + $17, + true, + false +) +RETURNING id` +) + // Do executes the actual approved gauge measurements import. func (agm *ApprovedGaugeMeasurements) Do( ctx context.Context, @@ -196,13 +280,11 @@ } var missing []string - for i := range headerFields { if headerFields[i].name != "unit" && *headerFields[i].idx == -1 { missing = append(missing, headerFields[i].name) } } - if len(missing) > 0 { return nil, fmt.Errorf("Missing columns: %s", strings.Join(missing, ", ")) } @@ -226,7 +308,12 @@ } defer tx.Rollback() - insertStmt, err := tx.PrepareContext(ctx, insertGMSQL) + selectStmt, err := tx.PrepareContext(ctx, agmSelectSQL) + if err != nil { + return nil, err + } + defer selectStmt.Close() + insertStmt, err := tx.PrepareContext(ctx, agmInsertSQL) if err != nil { return nil, err } @@ -237,11 +324,7 @@ } defer trackStmt.Close() - ids := []int64{} - - args := make([]interface{}, 19) - - args[18] = false // staging_done + entries := []*agmSummaryEntry{} lines: for line := 1; ; line++ { @@ -264,74 +347,179 @@ return nil, fmt.Errorf("Invalid ISRS code line %d: %v", line, err) } - args[0] = gid.CountryCode - args[1] = gid.LoCode - args[2] = gid.FairwaySection - args[3] = gid.Orc - args[4] = gid.Hectometre - md, err := guessDate(row[measureDateIdx]) if err != nil { return nil, fmt.Errorf("Invalid 'measure_date' line %d: %v", line, err) } - args[5] = md + + var ( + oldID int64 + oldCountryCode string + oldSender string + oldLanguageCode string + oldDateIssue time.Time + oldReferenceCode string + oldValue float64 + oldPredicted bool + oldValueMin sql.NullFloat64 + oldValueMax sql.NullFloat64 + oldDateInfo time.Time + oldSourceOrganization string + ) - args[6] = row[fromIdx] - args[7] = row[languageCodeIdx] - args[8] = row[countryCodeIdx] + err = selectStmt.QueryRowContext( + ctx, + gid.CountryCode, + gid.LoCode, + gid.FairwaySection, + gid.Orc, + gid.Hectometre, + md, + ).Scan( + &oldID, + &oldCountryCode, + &oldSender, + &oldLanguageCode, + &oldDateIssue, + &oldReferenceCode, + &oldValue, + &oldPredicted, + &oldValueMin, + &oldValueMax, + &oldDateInfo, + &oldSourceOrganization, + ) + + var newEntry bool + switch { + case err == sql.ErrNoRows: + // Complete new one + newEntry = true + case err != nil: + return nil, err + } + + newSender := row[fromIdx] + newLanguageCode := row[languageCodeIdx] + newCountryCode := row[countryCodeIdx] dis, err := guessDate(row[dateIssueIdx]) if err != nil { return nil, fmt.Errorf("Invalid 'date_issue' line %d: %v", line, err) } - args[9] = dis + newDateIssue := dis - args[10] = row[referenceCodeIdx] + newReferenceCode := row[referenceCodeIdx] value, err := strconv.ParseFloat(row[valueIdx], 32) if err != nil { return nil, fmt.Errorf("Invalid 'value' line %d: %v", line, err) } - args[11] = convert(float32(value)) + newValue := float64(convert(float32(value))) - predicted := strings.ToLower(row[predictedIdx]) == "true" - args[12] = predicted - - args[13] = true // is_waterlevel + newPredicted := strings.ToLower(row[predictedIdx]) == "true" - valueMin, err := strconv.ParseFloat(row[valueMinIdx], 32) - if err != nil { - return nil, fmt.Errorf("Invalid 'value_min' line %d: %v", line, err) + var newValueMin sql.NullFloat64 + if vm := row[valueMinIdx]; vm != "" { + valueMin, err := strconv.ParseFloat(vm, 32) + if err != nil { + return nil, fmt.Errorf("Invalid 'value_min' line %d: %v", line, err) + } + newValueMin = sql.NullFloat64{ + Float64: float64(convert(float32(valueMin))), + Valid: true, + } } - args[14] = convert(float32(valueMin)) - valueMax, err := strconv.ParseFloat(row[valueMaxIdx], 32) - if err != nil { - return nil, fmt.Errorf("Invalid 'value_max' line %d: %v", line, err) + var newValueMax sql.NullFloat64 + if vm := row[valueMaxIdx]; vm != "" { + valueMax, err := strconv.ParseFloat(vm, 32) + if err != nil { + return nil, fmt.Errorf("Invalid 'value_max' line %d: %v", line, err) + } + newValueMax = sql.NullFloat64{ + Float64: float64(convert(float32(valueMax))), + Valid: true, + } } - args[15] = convert(float32(valueMax)) din, err := guessDate(row[dateInfoIdx]) if err != nil { return nil, fmt.Errorf("Invalid 'date_info' line %d: %v", line, err) } - args[16] = din + newDateInfo := din - args[17] = row[originatorIdx] + newSourceOrganization := row[originatorIdx] - // args[18] (staging_done) is set to true outside the loop. + var newID int64 - var id int64 - if err := insertStmt.QueryRowContext(ctx, args...).Scan(&id); err != nil { - return nil, fmt.Errorf("Failed to insert line %d: %v", line, err) + if err := insertStmt.QueryRowContext( + ctx, + gid.CountryCode, + gid.LoCode, + gid.FairwaySection, + gid.Orc, + gid.Hectometre, + md, + newCountryCode, + newSender, + newLanguageCode, + newDateIssue, + newReferenceCode, + newValue, + newPredicted, + newValueMin, + newValueMax, + newDateInfo, + newSourceOrganization, + ).Scan(&newID); err != nil { + return nil, err } - ids = append(ids, id) - if _, err := trackStmt.ExecContext( - ctx, importID, "waterway.gauge_measurements", id, + ctx, importID, "waterway.gauge_measurements", newID, ); err != nil { return nil, err } + + ase := &agmSummaryEntry{ + FKGaugeID: *gid, + MeasureDate: timetz{md}, + Versions: []*agmLine{ + newAGMLine( + newCountryCode, + newSender, + newLanguageCode, + newDateIssue, + newReferenceCode, + newValue, + newPredicted, + newValueMin, + newValueMax, + newDateInfo, + newSourceOrganization, + ), + }, + } + + if !newEntry { + ase.Versions = []*agmLine{ + newAGMLine( + oldCountryCode, + oldSender, + oldLanguageCode, + oldDateIssue, + oldReferenceCode, + oldValue, + oldPredicted, + oldValueMin, + oldValueMax, + oldDateInfo, + oldSourceOrganization, + ), + ase.Versions[0], + } + } + entries = append(entries, ase) } if err := tx.Commit(); err != nil { @@ -341,10 +529,40 @@ feedback.Info("Importing approved gauge measurements took %s", time.Since(start)) - summary := struct { - IDs []int64 `json:"ids"` - }{ - IDs: ids, + return entries, nil +} + +func newAGMLine( + countryCode string, + sender string, + languageCode string, + dateIssue time.Time, + referenceCode string, + waterLevel float64, + predicted bool, + valueMin sql.NullFloat64, + valueMax sql.NullFloat64, + dateInfo time.Time, + sourceOrganization string, +) *agmLine { + nilFloat := func(v sql.NullFloat64) *float64 { + var p *float64 + if v.Valid { + p = &v.Float64 + } + return p } - return &summary, nil + return &agmLine{ + CountryCode: countryCode, + Sender: sender, + LanguageCode: languageCode, + DateIssue: timetz{dateIssue}, + ReferenceCode: referenceCode, + WaterLevel: waterLevel, + Predicted: predicted, + ValueMin: nilFloat(valueMin), + ValueMax: nilFloat(valueMax), + DateInfo: timetz{dateInfo}, + SourceOrganization: sourceOrganization, + } }