Mercurial > gemma
view pkg/imports/agm.go @ 2307:e1aa9bb65da6
import_schedule: send email when manually triggering import
author | Thomas Junk <thomas.junk@intevation.de> |
---|---|
date | Mon, 18 Feb 2019 13:32:44 +0100 |
parents | 7c83b5277c1c |
children | de4dc3d16647 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "bufio" "context" "database/sql" "encoding/csv" "encoding/json" "fmt" "io" "math" "os" "path/filepath" "strconv" "strings" "time" "gemma.intevation.de/gemma/pkg/misc" "gemma.intevation.de/gemma/pkg/models" ) type ApprovedGaugeMeasurements struct { Dir string `json:"dir"` } // GMAPJobKind is the unique name of an approved gauge measurements import job. const AGMJobKind JobKind = "agm" type agmJobCreator struct{} func init() { RegisterJobCreator(AGMJobKind, agmJobCreator{}) } func (agmJobCreator) AutoAccept() bool { return false } func (agmJobCreator) Description() string { return "approved gauge measurements" } func (agmJobCreator) Create() Job { return new(ApprovedGaugeMeasurements) } func (agmJobCreator) Depends() []string { return []string{ "gauges", "gauge_measurements", } } const ( // delete the old and keep the new measures. agmStageDoneDeleteSQL = ` WITH staged AS ( SELECT key FROM import.track_imports WHERE import_id = $1 AND relation = 'waterway.gauge_measurements'::regclass ), to_delete AS ( SELECT o.id AS id FROM waterway.gauge_measurements o JOIN waterway.gauge_measurements n ON n.fk_gauge_id = o.fk_gauge_id AND n.measure_date = o.measure_date WHERE n.id IN (SELECT key FROM staged) AND o.id NOT IN (SELECT key FROM staged) ) DELETE FROM waterway.gauge_measurements WHERE id IN (SELECT id from to_delete)` agmStageDoneSQL = ` UPDATE waterway.gauge_measurements SET staging_done = true WHERE id IN ( SELECT key FROM import.track_imports WHERE import_id = $1 AND relation = 'waterway.gauge_measurements'::regclass)` ) func (agmJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, ) error { _, err := tx.ExecContext(ctx, agmStageDoneDeleteSQL, id) if err == nil { _, err = tx.ExecContext(ctx, agmStageDoneSQL, id) } return err } // CleanUp removes the folder containing the CSV file with the // the approved gauge measurements. func (agm *ApprovedGaugeMeasurements) CleanUp() error { return os.RemoveAll(agm.Dir) } var guessDate = misc.TimeGuesser([]string{ "02.01.2006 15:04", "2006-01-02T15:04:05-07:00", }).Guess type timetz struct{ time.Time } func (ttz *timetz) MarshalJSON() ([]byte, error) { return json.Marshal(ttz.Time.Format("2006-01-02T15:04:05-07:00")) } type agmLine struct { CountryCode string `json:"country-code"` Sender string `json:"sender"` LanguageCode string `json:"language-code"` DateIssue timetz `json:"date-issue"` ReferenceCode string `json:"reference-code"` WaterLevel float64 `json:"water-level"` Predicted bool `json:"predicted"` ValueMin *float64 `json:"value-min"` ValueMax *float64 `json:"value-max"` DateInfo timetz `json:"date-info"` SourceOrganization string `json:"source-organization"` } func (a *agmLine) hasDiff(b *agmLine) bool { const eps = 0.00001 fdiff := func(x, y *float64) bool { if x == nil && y == nil { return false } if (x == nil && y != nil) || (x != nil && y == nil) { return true } return math.Abs(*x-*y) > eps } return a.CountryCode != b.CountryCode || a.Sender != b.Sender || a.LanguageCode != b.LanguageCode || !a.DateIssue.Time.Equal(b.DateIssue.Time) || a.ReferenceCode != b.ReferenceCode || math.Abs(a.WaterLevel-b.WaterLevel) > eps || a.Predicted != b.Predicted || fdiff(a.ValueMin, b.ValueMin) || fdiff(a.ValueMax, b.ValueMax) || !a.DateInfo.Time.Equal(b.DateInfo.Time) || a.SourceOrganization != b.SourceOrganization } type agmSummaryEntry struct { FKGaugeID models.Isrs `json:"fk-gauge-id"` MeasureDate timetz `json:"measure-date"` Versions []*agmLine `json:"versions"` } const ( agmSelectSQL = ` SELECT id, country_code, sender, language_code, date_issue, reference_code, water_level, predicted, value_min, value_max, date_info, source_organization FROM waterway.gauge_measurements WHERE fk_gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) AND measure_date = $6 AND staging_done` agmInsertSQL = ` INSERT INTO waterway.gauge_measurements ( fk_gauge_id, measure_date, country_code, sender, language_code, date_issue, reference_code, water_level, predicted, value_min, value_max, date_info, source_organization, is_waterlevel, staging_done ) VALUES( ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int), $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, true, false ) RETURNING id` ) // Do executes the actual approved gauge measurements import. func (agm *ApprovedGaugeMeasurements) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() f, err := os.Open(filepath.Join(agm.Dir, "agm.csv")) if err != nil { return nil, err } defer f.Close() r := csv.NewReader(bufio.NewReader(f)) r.Comma = ';' r.ReuseRecord = true headers, err := r.Read() if err != nil { return nil, err } var ( fkGaugeIDIdx = -1 measureDateIdx = -1 fromIdx = -1 languageCodeIdx = -1 countryCodeIdx = -1 dateIssueIdx = -1 referenceCodeIdx = -1 valueIdx = -1 predictedIdx = -1 valueMinIdx = -1 valueMaxIdx = -1 dateInfoIdx = -1 originatorIdx = -1 unitIdx = -1 ) headerFields := []struct { idx *int name string }{ {&fkGaugeIDIdx, "fk_gauge_id"}, {&measureDateIdx, "measure_date"}, {&fromIdx, "from"}, // "sender", {&languageCodeIdx, "language_code"}, {&countryCodeIdx, "country_code"}, {&dateIssueIdx, "date_issue"}, {&referenceCodeIdx, "reference_code"}, {&valueIdx, "value"}, // "water_level", {&predictedIdx, "predicted"}, // "is_waterlevel", {&valueMinIdx, "value_min"}, {&valueMaxIdx, "value_max"}, {&dateInfoIdx, "date_info"}, {&originatorIdx, "originator"}, // "source_organization", {&unitIdx, "unit"}, } nextHeader: for i, f := range headers { h := strings.Replace(strings.ToLower( strings.TrimSpace(f)), " ", "_", -1) for j := range headerFields { if headerFields[j].name == h { if *headerFields[j].idx != -1 { return nil, fmt.Errorf( "There is more than one column namend '%s'", h) } *headerFields[j].idx = i continue nextHeader } } } var missing []string for i := range headerFields { if headerFields[i].name != "unit" && *headerFields[i].idx == -1 { missing = append(missing, headerFields[i].name) } } if len(missing) > 0 { return nil, fmt.Errorf("Missing columns: %s", strings.Join(missing, ", ")) } inCm, _ := rescale("cm") scaler := func(row []string) (func(float32) float32, error) { if unitIdx == -1 { return inCm, nil } unit := row[unitIdx] if unit == "cm" { return inCm, nil } s, err := rescale(unit) return s, err } tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() selectStmt, err := tx.PrepareContext(ctx, agmSelectSQL) if err != nil { return nil, err } defer selectStmt.Close() insertStmt, err := tx.PrepareContext(ctx, agmInsertSQL) if err != nil { return nil, err } defer insertStmt.Close() trackStmt, err := tx.PrepareContext(ctx, trackImportSQL) if err != nil { return nil, err } defer trackStmt.Close() entries := []*agmSummaryEntry{} lines: for line := 1; ; line++ { row, err := r.Read() switch { case err == io.EOF || len(row) == 0: break lines case err != nil: return nil, fmt.Errorf("CSV parsing failed: %v", err) } convert, err := scaler(row) if err != nil { return nil, fmt.Errorf("line %d: %v", line, err) } gids := row[fkGaugeIDIdx] gid, err := models.IsrsFromString(gids) if err != nil { return nil, fmt.Errorf("Invalid ISRS code line %d: %v", line, err) } md, err := guessDate(row[measureDateIdx]) if err != nil { return nil, fmt.Errorf("Invalid 'measure_date' line %d: %v", line, err) } var ( oldID int64 oldCountryCode string oldSender string oldLanguageCode string oldDateIssue time.Time oldReferenceCode string oldValue float64 oldPredicted bool oldValueMin sql.NullFloat64 oldValueMax sql.NullFloat64 oldDateInfo time.Time oldSourceOrganization string ) err = selectStmt.QueryRowContext( ctx, gid.CountryCode, gid.LoCode, gid.FairwaySection, gid.Orc, gid.Hectometre, md, ).Scan( &oldID, &oldCountryCode, &oldSender, &oldLanguageCode, &oldDateIssue, &oldReferenceCode, &oldValue, &oldPredicted, &oldValueMin, &oldValueMax, &oldDateInfo, &oldSourceOrganization, ) var newEntry bool switch { case err == sql.ErrNoRows: // Complete new one newEntry = true case err != nil: return nil, err } newSender := row[fromIdx] newLanguageCode := row[languageCodeIdx] newCountryCode := row[countryCodeIdx] dis, err := guessDate(row[dateIssueIdx]) if err != nil { return nil, fmt.Errorf("Invalid 'date_issue' line %d: %v", line, err) } newDateIssue := dis newReferenceCode := row[referenceCodeIdx] value, err := strconv.ParseFloat(row[valueIdx], 32) if err != nil { return nil, fmt.Errorf("Invalid 'value' line %d: %v", line, err) } newValue := float64(convert(float32(value))) newPredicted := strings.ToLower(row[predictedIdx]) == "true" var newValueMin sql.NullFloat64 if vm := row[valueMinIdx]; vm != "" { valueMin, err := strconv.ParseFloat(vm, 32) if err != nil { return nil, fmt.Errorf("Invalid 'value_min' line %d: %v", line, err) } newValueMin = sql.NullFloat64{ Float64: float64(convert(float32(valueMin))), Valid: true, } } var newValueMax sql.NullFloat64 if vm := row[valueMaxIdx]; vm != "" { valueMax, err := strconv.ParseFloat(vm, 32) if err != nil { return nil, fmt.Errorf("Invalid 'value_max' line %d: %v", line, err) } newValueMax = sql.NullFloat64{ Float64: float64(convert(float32(valueMax))), Valid: true, } } din, err := guessDate(row[dateInfoIdx]) if err != nil { return nil, fmt.Errorf("Invalid 'date_info' line %d: %v", line, err) } newDateInfo := din newSourceOrganization := row[originatorIdx] var newID int64 if err := insertStmt.QueryRowContext( ctx, gid.CountryCode, gid.LoCode, gid.FairwaySection, gid.Orc, gid.Hectometre, md, newCountryCode, newSender, newLanguageCode, newDateIssue, newReferenceCode, newValue, newPredicted, newValueMin, newValueMax, newDateInfo, newSourceOrganization, ).Scan(&newID); err != nil { return nil, err } if _, err := trackStmt.ExecContext( ctx, importID, "waterway.gauge_measurements", newID, ); err != nil { return nil, err } n := newAGMLine( newCountryCode, newSender, newLanguageCode, newDateIssue, newReferenceCode, newValue, newPredicted, newValueMin, newValueMax, newDateInfo, newSourceOrganization, ) ase := &agmSummaryEntry{ FKGaugeID: *gid, MeasureDate: timetz{md}, } if newEntry { ase.Versions = []*agmLine{n} } else { o := newAGMLine( oldCountryCode, oldSender, oldLanguageCode, oldDateIssue, oldReferenceCode, oldValue, oldPredicted, oldValueMin, oldValueMax, oldDateInfo, oldSourceOrganization, ) // Ignore if there is no diff. if !n.hasDiff(o) { continue } ase.Versions = []*agmLine{o, n} } entries = append(entries, ase) } if err := tx.Commit(); err != nil { return nil, fmt.Errorf("Commit failed: %v", err) } feedback.Info("Importing approved gauge measurements took %s", time.Since(start)) return entries, nil } func newAGMLine( countryCode string, sender string, languageCode string, dateIssue time.Time, referenceCode string, waterLevel float64, predicted bool, valueMin sql.NullFloat64, valueMax sql.NullFloat64, dateInfo time.Time, sourceOrganization string, ) *agmLine { nilFloat := func(v sql.NullFloat64) *float64 { var p *float64 if v.Valid { p = &v.Float64 } return p } return &agmLine{ CountryCode: countryCode, Sender: sender, LanguageCode: languageCode, DateIssue: timetz{dateIssue}, ReferenceCode: referenceCode, WaterLevel: waterLevel, Predicted: predicted, ValueMin: nilFloat(valueMin), ValueMax: nilFloat(valueMax), DateInfo: timetz{dateInfo}, SourceOrganization: sourceOrganization, } }