Mercurial > gemma
view pkg/imports/agm.go @ 2130:f3aabc05f9b2
Fix constraints on waterway profiles
staging_done in the UNIQUE constraint had no effect, because the
exclusion constraint prevented two rows with equal location and
validity anyhow. Adding staging_done to the exclusion constraint
makes the UNIQUE constraint checking only a corner case of what
the exclusion constraint checks. Thus, remove the UNIQUE constraint.
Casting staging_done to int is needed because there is no appropriate
operator class for booleans. Casting to smallint or even bit would have
been better (i.e. should result in smaller index size), but that would
have required creating such a CAST, in addition.
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Wed, 06 Feb 2019 15:42:32 +0100 |
parents | 817cd8b89a86 |
children | b868cb653c4d |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "bufio" "context" "database/sql" "encoding/csv" "encoding/json" "fmt" "io" "os" "path/filepath" "strconv" "strings" "time" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/misc" "gemma.intevation.de/gemma/pkg/models" ) type ApprovedGaugeMeasurements struct { Dir string `json:"dir"` } // GMAPJobKind is the unique name of an approved gauge measurements import job. const AGMJobKind JobKind = "agm" type agmJobCreator struct{} func init() { RegisterJobCreator(AGMJobKind, agmJobCreator{}) } func (agmJobCreator) AutoAccept() bool { return false } func (agmJobCreator) Description() string { return "approved gauge measurements" } func (agmJobCreator) Create(_ JobKind, data string) (Job, error) { agm := new(ApprovedGaugeMeasurements) if err := common.FromJSONString(data, agm); err != nil { return nil, err } return agm, nil } func (agmJobCreator) Depends() []string { return []string{ "gauges", "gauge_measurements", } } const ( // delete the old and keep the new measures. agmStageDoneDeleteSQL = ` WITH staged AS ( SELECT key FROM import.track_imports WHERE import_id = $1 AND relation = 'waterway.gauge_measurements'::regclass ), to_delete AS ( SELECT o.id AS id FROM waterway.gauge_measurements o JOIN waterway.gauge_measurements n ON n.fk_gauge_id = o.fk_gauge_id AND n.measure_date = o.measure_date WHERE n.id IN (SELECT key FROM staged) AND o.id NOT IN (SELECT key FROM staged) ) DELETE FROM waterway.gauge_measurements WHERE id IN (SELECT id from to_delete)` agmStageDoneSQL = ` UPDATE waterway.gauge_measurements SET staging_done = true WHERE id IN ( SELECT key FROM import.track_imports WHERE import_id = $1 AND relation = 'waterway.gauge_measurements'::regclass)` ) func (agmJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, ) error { _, err := tx.ExecContext(ctx, agmStageDoneDeleteSQL, id) if err == nil { _, err = tx.ExecContext(ctx, agmStageDoneSQL, id) } return err } // CleanUp removes the folder containing the CSV file with the // the approved gauge measurements. func (agm *ApprovedGaugeMeasurements) CleanUp() error { return os.RemoveAll(agm.Dir) } var guessDate = misc.TimeGuesser([]string{ "02.01.2006 15:04", "2006-01-02T15:04:05-07:00", }).Guess type timetz struct{ time.Time } func (ttz *timetz) MarshalJSON() ([]byte, error) { return json.Marshal(ttz.Time.Format("2006-01-02T15:04:05-07:00")) } type agmLine struct { CountryCode string `json:"country-code"` Sender string `json:"sender"` LanguageCode string `json:"language-code"` DateIssue timetz `json:"date-issue"` ReferenceCode string `json:"reference-code"` WaterLevel float64 `json:"water-level"` Predicted bool `json:"predicted"` ValueMin *float64 `json:"value-min"` ValueMax *float64 `json:"value-max"` DateInfo timetz `json:"date-info"` SourceOrganization string `json:"source-organization"` } type agmSummaryEntry struct { FKGaugeID models.Isrs `json:"fk-gauge-id"` MeasureDate timetz `json:"measure-date"` Versions []*agmLine `json:"versions"` } const ( agmSelectSQL = ` SELECT id, country_code, sender, language_code, date_issue, reference_code, water_level, predicted, value_min, value_max, date_info, source_organization FROM waterway.gauge_measurements WHERE fk_gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) AND measure_date = $6 AND staging_done` agmInsertSQL = ` INSERT INTO waterway.gauge_measurements ( fk_gauge_id, measure_date, country_code, sender, language_code, date_issue, reference_code, water_level, predicted, value_min, value_max, date_info, source_organization, is_waterlevel, staging_done ) VALUES( ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, true, false ) RETURNING id` ) // Do executes the actual approved gauge measurements import. func (agm *ApprovedGaugeMeasurements) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() f, err := os.Open(filepath.Join(agm.Dir, "agm.csv")) if err != nil { return nil, err } defer f.Close() r := csv.NewReader(bufio.NewReader(f)) r.Comma = ';' r.ReuseRecord = true headers, err := r.Read() if err != nil { return nil, err } var ( fkGaugeIDIdx = -1 measureDateIdx = -1 fromIdx = -1 languageCodeIdx = -1 countryCodeIdx = -1 dateIssueIdx = -1 referenceCodeIdx = -1 valueIdx = -1 predictedIdx = -1 valueMinIdx = -1 valueMaxIdx = -1 dateInfoIdx = -1 originatorIdx = -1 unitIdx = -1 ) headerFields := []struct { idx *int name string }{ {&fkGaugeIDIdx, "fk_gauge_id"}, {&measureDateIdx, "measure_date"}, {&fromIdx, "from"}, // "sender", {&languageCodeIdx, "language_code"}, {&countryCodeIdx, "country_code"}, {&dateIssueIdx, "date_issue"}, {&referenceCodeIdx, "reference_code"}, {&valueIdx, "value"}, // "water_level", {&predictedIdx, "predicted"}, // "is_waterlevel", {&valueMinIdx, "value_min"}, {&valueMaxIdx, "value_max"}, {&dateInfoIdx, "date_info"}, {&originatorIdx, "originator"}, // "source_organization", {&unitIdx, "unit"}, } nextHeader: for i, f := range headers { h := strings.Replace(strings.ToLower( strings.TrimSpace(f)), " ", "_", -1) for j := range headerFields { if headerFields[j].name == h { if *headerFields[j].idx != -1 { return nil, fmt.Errorf( "There is more than one column namend '%s'", h) } *headerFields[j].idx = i continue nextHeader } } } var missing []string for i := range headerFields { if headerFields[i].name != "unit" && *headerFields[i].idx == -1 { missing = append(missing, headerFields[i].name) } } if len(missing) > 0 { return nil, fmt.Errorf("Missing columns: %s", strings.Join(missing, ", ")) } inCm, _ := rescale("cm") scaler := func(row []string) (func(float32) float32, error) { if unitIdx == -1 { return inCm, nil } unit := row[unitIdx] if unit == "cm" { return inCm, nil } s, err := rescale(unit) return s, err } tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() selectStmt, err := tx.PrepareContext(ctx, agmSelectSQL) if err != nil { return nil, err } defer selectStmt.Close() insertStmt, err := tx.PrepareContext(ctx, agmInsertSQL) if err != nil { return nil, err } defer insertStmt.Close() trackStmt, err := tx.PrepareContext(ctx, trackImportSQL) if err != nil { return nil, err } defer trackStmt.Close() entries := []*agmSummaryEntry{} lines: for line := 1; ; line++ { row, err := r.Read() switch { case err == io.EOF || len(row) == 0: break lines case err != nil: return nil, fmt.Errorf("CSV parsing failed: %v", err) } convert, err := scaler(row) if err != nil { return nil, fmt.Errorf("line %d: %v", line, err) } gids := row[fkGaugeIDIdx] gid, err := models.IsrsFromString(gids) if err != nil { return nil, fmt.Errorf("Invalid ISRS code line %d: %v", line, err) } md, err := guessDate(row[measureDateIdx]) if err != nil { return nil, fmt.Errorf("Invalid 'measure_date' line %d: %v", line, err) } var ( oldID int64 oldCountryCode string oldSender string oldLanguageCode string oldDateIssue time.Time oldReferenceCode string oldValue float64 oldPredicted bool oldValueMin sql.NullFloat64 oldValueMax sql.NullFloat64 oldDateInfo time.Time oldSourceOrganization string ) err = selectStmt.QueryRowContext( ctx, gid.CountryCode, gid.LoCode, gid.FairwaySection, gid.Orc, gid.Hectometre, md, ).Scan( &oldID, &oldCountryCode, &oldSender, &oldLanguageCode, &oldDateIssue, &oldReferenceCode, &oldValue, &oldPredicted, &oldValueMin, &oldValueMax, &oldDateInfo, &oldSourceOrganization, ) var newEntry bool switch { case err == sql.ErrNoRows: // Complete new one newEntry = true case err != nil: return nil, err } newSender := row[fromIdx] newLanguageCode := row[languageCodeIdx] newCountryCode := row[countryCodeIdx] dis, err := guessDate(row[dateIssueIdx]) if err != nil { return nil, fmt.Errorf("Invalid 'date_issue' line %d: %v", line, err) } newDateIssue := dis newReferenceCode := row[referenceCodeIdx] value, err := strconv.ParseFloat(row[valueIdx], 32) if err != nil { return nil, fmt.Errorf("Invalid 'value' line %d: %v", line, err) } newValue := float64(convert(float32(value))) newPredicted := strings.ToLower(row[predictedIdx]) == "true" var newValueMin sql.NullFloat64 if vm := row[valueMinIdx]; vm != "" { valueMin, err := strconv.ParseFloat(vm, 32) if err != nil { return nil, fmt.Errorf("Invalid 'value_min' line %d: %v", line, err) } newValueMin = sql.NullFloat64{ Float64: float64(convert(float32(valueMin))), Valid: true, } } var newValueMax sql.NullFloat64 if vm := row[valueMaxIdx]; vm != "" { valueMax, err := strconv.ParseFloat(vm, 32) if err != nil { return nil, fmt.Errorf("Invalid 'value_max' line %d: %v", line, err) } newValueMax = sql.NullFloat64{ Float64: float64(convert(float32(valueMax))), Valid: true, } } din, err := guessDate(row[dateInfoIdx]) if err != nil { return nil, fmt.Errorf("Invalid 'date_info' line %d: %v", line, err) } newDateInfo := din newSourceOrganization := row[originatorIdx] var newID int64 if err := insertStmt.QueryRowContext( ctx, gid.CountryCode, gid.LoCode, gid.FairwaySection, gid.Orc, gid.Hectometre, md, newCountryCode, newSender, newLanguageCode, newDateIssue, newReferenceCode, newValue, newPredicted, newValueMin, newValueMax, newDateInfo, newSourceOrganization, ).Scan(&newID); err != nil { return nil, err } if _, err := trackStmt.ExecContext( ctx, importID, "waterway.gauge_measurements", newID, ); err != nil { return nil, err } ase := &agmSummaryEntry{ FKGaugeID: *gid, MeasureDate: timetz{md}, Versions: []*agmLine{ newAGMLine( newCountryCode, newSender, newLanguageCode, newDateIssue, newReferenceCode, newValue, newPredicted, newValueMin, newValueMax, newDateInfo, newSourceOrganization, ), }, } if !newEntry { ase.Versions = []*agmLine{ newAGMLine( oldCountryCode, oldSender, oldLanguageCode, oldDateIssue, oldReferenceCode, oldValue, oldPredicted, oldValueMin, oldValueMax, oldDateInfo, oldSourceOrganization, ), ase.Versions[0], } } entries = append(entries, ase) } if err := tx.Commit(); err != nil { return nil, fmt.Errorf("Commit failed: %v", err) } feedback.Info("Importing approved gauge measurements took %s", time.Since(start)) return entries, nil } func newAGMLine( countryCode string, sender string, languageCode string, dateIssue time.Time, referenceCode string, waterLevel float64, predicted bool, valueMin sql.NullFloat64, valueMax sql.NullFloat64, dateInfo time.Time, sourceOrganization string, ) *agmLine { nilFloat := func(v sql.NullFloat64) *float64 { var p *float64 if v.Valid { p = &v.Float64 } return p } return &agmLine{ CountryCode: countryCode, Sender: sender, LanguageCode: languageCode, DateIssue: timetz{dateIssue}, ReferenceCode: referenceCode, WaterLevel: waterLevel, Predicted: predicted, ValueMin: nilFloat(valueMin), ValueMax: nilFloat(valueMax), DateInfo: timetz{dateInfo}, SourceOrganization: sourceOrganization, } }