# HG changeset patch # User Tom Gottfried # Date 1563799541 -7200 # Node ID baa51bb823647165f289da3b49b1e8b76dd311bd # Parent efe0904b1d45dd8bb1328e4745cf9733cd711045 AGM import: parse CSV to completion before requesting database This is in preparation to avoidance of doing one SELECT per line. diff -r efe0904b1d45 -r baa51bb82364 pkg/imports/agm.go --- a/pkg/imports/agm.go Mon Jul 22 13:28:04 2019 +0200 +++ b/pkg/imports/agm.go Mon Jul 22 14:45:41 2019 +0200 @@ -4,13 +4,14 @@ // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // -// Copyright (C) 2018 by via donau +// Copyright (C) 2018, 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann // * Sascha Wilde +// * Tom Gottfried package imports @@ -120,14 +121,16 @@ } type agmLine struct { - CountryCode string `json:"country-code"` - Sender string `json:"sender"` - LanguageCode string `json:"language-code"` - DateIssue timetz `json:"date-issue"` - ReferenceCode string `json:"reference-code"` - WaterLevel float64 `json:"water-level"` - DateInfo timetz `json:"date-info"` - SourceOrganization string `json:"source-organization"` + Location models.Isrs `json:"fk-gauge-id"` + CountryCode string `json:"country-code"` + Sender string `json:"sender"` + LanguageCode string `json:"language-code"` + DateIssue timetz `json:"date-issue"` + ReferenceCode string `json:"reference-code"` + MeasureDate timetz `json:"measure-date"` + WaterLevel float64 `json:"water-level"` + DateInfo timetz `json:"date-info"` + SourceOrganization string `json:"source-organization"` } func (a *agmLine) hasDiff(b *agmLine) bool { @@ -309,8 +312,11 @@ warn := warnLimiter.Warn defer warnLimiter.Close() + agmLines := []*agmLine{} + ignored := 0 + lines: - for line, ignored := 1, 0; ; line++ { + for line := 1; ; line++ { row, err := r.Read() switch { @@ -322,7 +328,6 @@ if ignored == line-1 { return nil, UnchangedError("No entries imported") } - feedback.Info("Imported %d entries with changes", len(entries)) break lines case err != nil: return nil, fmt.Errorf("CSV parsing failed: %v", err) @@ -364,6 +369,38 @@ return nil, fmt.Errorf("Invalid 'measure_date' line %d: %v", line, err) } + newSender := agm.Originator + newCountryCode := gid.CountryCode + newLanguageCode := misc.CCtoLang[gid.CountryCode] + newDateIssue := time.Now() + newReferenceCode := "ZPG" + + value, err := strconv.ParseFloat(row[valueIdx], 32) + if err != nil { + return nil, fmt.Errorf("Invalid 'value' line %d: %v", line, err) + } + newValue := value + + newDateInfo := newDateIssue + + newSourceOrganization := newSender + + agmLines = append(agmLines, newAGMLine( + *gid, + newCountryCode, + newSender, + newLanguageCode, + newDateIssue, + newReferenceCode, + md, + newValue, + newDateInfo, + newSourceOrganization, + )) + } + +agmLines: + for _, line := range agmLines { var ( oldID int64 oldCountryCode string @@ -378,12 +415,12 @@ err = selectStmt.QueryRowContext( ctx, - gid.CountryCode, - gid.LoCode, - gid.FairwaySection, - gid.Orc, - gid.Hectometre, - md, + line.Location.CountryCode, + line.Location.LoCode, + line.Location.FairwaySection, + line.Location.Orc, + line.Location.Hectometre, + line.MeasureDate.Time, ).Scan( &oldID, &oldCountryCode, @@ -405,22 +442,6 @@ return nil, err } - newSender := agm.Originator - newCountryCode := gid.CountryCode - newLanguageCode := misc.CCtoLang[gid.CountryCode] - newDateIssue := time.Now() - newReferenceCode := "ZPG" - - value, err := strconv.ParseFloat(row[valueIdx], 32) - if err != nil { - return nil, fmt.Errorf("Invalid 'value' line %d: %v", line, err) - } - newValue := value - - newDateInfo := newDateIssue - - newSourceOrganization := newSender - switch err := func() error { tx, err := conn.BeginTx(ctx, nil) if err != nil { @@ -432,20 +453,20 @@ if err := tx.StmtContext(ctx, insertStmt).QueryRowContext( ctx, - gid.CountryCode, - gid.LoCode, - gid.FairwaySection, - gid.Orc, - gid.Hectometre, - md, - newCountryCode, - newSender, - newLanguageCode, - newDateIssue, - newReferenceCode, - newValue, - newDateInfo, - newSourceOrganization, + line.Location.CountryCode, + line.Location.LoCode, + line.Location.FairwaySection, + line.Location.Orc, + line.Location.Hectometre, + line.MeasureDate.Time, + line.CountryCode, + line.Sender, + line.LanguageCode, + line.DateIssue.Time, + line.ReferenceCode, + line.WaterLevel, + line.DateInfo.Time, + line.SourceOrganization, ).Scan(&newID); err != nil { warn(handleError(err).Error()) ignored++ @@ -464,49 +485,41 @@ return err }(); { case err == errContinue: - continue lines + continue agmLines case err != nil: return nil, err } - n := newAGMLine( - newCountryCode, - newSender, - newLanguageCode, - newDateIssue, - newReferenceCode, - newValue, - newDateInfo, - newSourceOrganization, - ) - ase := &agmSummaryEntry{ - FKGaugeID: *gid, - MeasureDate: timetz{md}, + FKGaugeID: line.Location, + MeasureDate: line.MeasureDate, } if newEntry { - ase.Versions = []*agmLine{n} + ase.Versions = []*agmLine{line} } else { o := newAGMLine( + line.Location, oldCountryCode, oldSender, oldLanguageCode, oldDateIssue, oldReferenceCode, + line.MeasureDate.Time, oldValue, oldDateInfo, oldSourceOrganization, ) // Ignore if there is no diff. - if !n.hasDiff(o) { + if !line.hasDiff(o) { continue } - ase.Versions = []*agmLine{o, n} + ase.Versions = []*agmLine{o, line} } entries = append(entries, ase) } + feedback.Info("Imported %d entries with changes", len(entries)) feedback.Info("Importing approved gauge measurements took %s", time.Since(start)) @@ -514,21 +527,25 @@ } func newAGMLine( + location models.Isrs, countryCode string, sender string, languageCode string, dateIssue time.Time, referenceCode string, + measureDate time.Time, waterLevel float64, dateInfo time.Time, sourceOrganization string, ) *agmLine { return &agmLine{ + Location: location, CountryCode: countryCode, Sender: sender, LanguageCode: languageCode, DateIssue: timetz{dateIssue}, ReferenceCode: referenceCode, + MeasureDate: timetz{measureDate}, WaterLevel: waterLevel, DateInfo: timetz{dateInfo}, SourceOrganization: sourceOrganization,