view pkg/imports/agm.go @ 5560:f2204f91d286

Join the log lines of imports to the log exports to recover data from them. Used in SR export to extract information that where in the meta json but now are only found in the log.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 09 Feb 2022 18:34:40 +0100
parents 56c589f7435d
children 6270951dda28
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Sascha Wilde <wilde@intevation.de>
//  * Tom Gottfried <tom.gottfried@intevation.de>

package imports

import (
	"bufio"
	"context"
	"database/sql"
	"encoding/csv"
	"encoding/json"
	"fmt"
	"io"
	"math"
	"os"
	"path/filepath"
	"sort"
	"strconv"
	"strings"
	"time"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/models"
)

// ApprovedGaugeMeasurements is a Job to import
// approved gauge measurements from a CVS file
// into the database.
type ApprovedGaugeMeasurements struct {
	Dir        string `json:"dir"`
	Originator string `json:"originator"`
}

// AGMJobKind is the unique name of an approved gauge measurements import job.
const AGMJobKind JobKind = "agm"

type agmJobCreator struct{}

func init() {
	RegisterJobCreator(AGMJobKind, agmJobCreator{})
}

func (agmJobCreator) AutoAccept() bool { return false }

func (agmJobCreator) Description() string {
	return "approved gauge measurements"
}

func (agmJobCreator) Create() Job { return new(ApprovedGaugeMeasurements) }

func (agmJobCreator) Depends() [2][]string {
	return [2][]string{
		{"gauge_measurements"},
		{"gauges"},
	}
}

const (
	agmStageDoneDeleteSQL = `
DELETE FROM waterway.gauge_measurements WHERE id IN (
  SELECT key
  FROM import.track_imports
  WHERE import_id = $1 AND
    relation = 'waterway.gauge_measurements'::regclass AND
    deletion
)`

	agmStageDoneSQL = `
UPDATE waterway.gauge_measurements SET staging_done = true
WHERE id IN (
  SELECT key
  FROM import.track_imports
  WHERE import_id = $1 AND
    relation = 'waterway.gauge_measurements'::regclass AND
  NOT deletion
)`
)

// StageDone replaces gauge measurements with those in the staging area
func (agmJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
	_ Feedback,
) error {
	_, err := tx.ExecContext(ctx, agmStageDoneDeleteSQL, id)
	if err == nil {
		_, err = tx.ExecContext(ctx, agmStageDoneSQL, id)
	}
	return err
}

// CleanUp removes the folder containing the CSV file with the
// the approved gauge measurements.
func (agm *ApprovedGaugeMeasurements) CleanUp() error {
	return os.RemoveAll(agm.Dir)
}

var guessDate = common.TimeParser([]string{
	"02.01.2006 15:04",
	"2006-01-02T15:04:05-07:00",
}).Parse

type timetz struct{ time.Time }

func (ttz *timetz) MarshalJSON() ([]byte, error) {
	return json.Marshal(ttz.Time.Format("2006-01-02T15:04:05-07:00"))
}

type agmLine struct {
	id                 int64
	Location           models.Isrs `json:"fk-gauge-id"`
	CountryCode        string      `json:"country-code"`
	Sender             string      `json:"sender"`
	LanguageCode       string      `json:"language-code"`
	DateIssue          timetz      `json:"date-issue"`
	ReferenceCode      string      `json:"reference-code"`
	MeasureDate        timetz      `json:"measure-date"`
	WaterLevel         float64     `json:"water-level"`
	DateInfo           timetz      `json:"date-info"`
	SourceOrganization string      `json:"source-organization"`
}

func (a *agmLine) hasDiff(b *agmLine) bool {
	const eps = 0.00001
	return a.CountryCode != b.CountryCode ||
		a.Sender != b.Sender ||
		a.LanguageCode != b.LanguageCode ||
		a.ReferenceCode != b.ReferenceCode ||
		math.Abs(a.WaterLevel-b.WaterLevel) > eps ||
		a.SourceOrganization != b.SourceOrganization
}

type agmSummaryEntry struct {
	FKGaugeID   models.Isrs `json:"fk-gauge-id"`
	MeasureDate timetz      `json:"measure-date"`
	Versions    []*agmLine  `json:"versions"`
}

const (
	agmSelectSQL = `
SELECT
  id,
  country_code,
  sender,
  language_code,
  date_issue,
  reference_code,
  measure_date,
  water_level,
  date_info,
  source_organization
FROM waterway.gauge_measurements
WHERE
  location
    = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
  AND measure_date BETWEEN $6 AND $7
  AND staging_done
`

	agmInsertSQL = `
INSERT INTO waterway.gauge_measurements (
  location,
  measure_date,
  country_code,
  sender,
  language_code,
  date_issue,
  reference_code,
  water_level,
  date_info,
  source_organization,
  staging_done
) VALUES (
  ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
  $6,
  $7,
  $8,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14,
  false
)
RETURNING id`

	agmGaugeCheckSQL = `
SELECT EXISTS(
  SELECT 1 FROM waterway.gauges
  WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int))
`
)

func parseAGMHeaders(headers []string, fkGaugeIDIdx, measureDateIdx, valueIdx *int) error {

	headerFields := []struct {
		idx  *int
		name string
	}{
		{fkGaugeIDIdx, "fk_gauge_id"},
		{measureDateIdx, "measure_date"},
		{valueIdx, "value"}, // "water_level",
	}

nextHeader:
	for i, f := range headers {
		h := strings.Replace(strings.ToLower(
			strings.TrimSpace(f)), " ", "_", -1)

		for j := range headerFields {
			if headerFields[j].name == h {
				if *headerFields[j].idx != -1 {
					return fmt.Errorf(
						"there is more than one column namend '%s'", h)
				}
				*headerFields[j].idx = i
				continue nextHeader
			}
		}
	}

	var missing []string
	for i := range headerFields {
		if headerFields[i].name != "unit" && *headerFields[i].idx == -1 {
			missing = append(missing, headerFields[i].name)
		}
	}
	if len(missing) > 0 {
		return fmt.Errorf("missing columns: %s", strings.Join(missing, ", "))
	}

	return nil
}

// Do executes the actual approved gauge measurements import.
func (agm *ApprovedGaugeMeasurements) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	f, err := os.Open(filepath.Join(agm.Dir, "agm.csv"))
	if err != nil {
		return nil, err
	}
	defer f.Close()

	r := csv.NewReader(bufio.NewReader(f))
	r.Comma = ';'
	r.ReuseRecord = true

	headers, err := r.Read()
	if err != nil {
		return nil, err
	}

	var (
		fkGaugeIDIdx   = -1
		measureDateIdx = -1
		valueIdx       = -1
	)

	if err := parseAGMHeaders(
		headers,
		&fkGaugeIDIdx, &measureDateIdx, &valueIdx,
	); err != nil {
		return nil, err
	}

	gaugeCheckStmt, err := conn.PrepareContext(ctx, agmGaugeCheckSQL)
	if err != nil {
		return nil, err
	}
	defer gaugeCheckStmt.Close()

	selectStmt, err := conn.PrepareContext(ctx, agmSelectSQL)
	if err != nil {
		return nil, err
	}
	defer selectStmt.Close()

	insertStmt, err := conn.PrepareContext(ctx, agmInsertSQL)
	if err != nil {
		return nil, err
	}
	defer insertStmt.Close()

	trackStmt, err := conn.PrepareContext(ctx, trackImportDeletionSQL)
	if err != nil {
		return nil, err
	}
	defer trackStmt.Close()

	entries := []*agmSummaryEntry{}

	checkedGauges := map[models.Isrs]bool{}

	warnLimiter := common.WarningLimiter{Log: feedback.Warn, MaxWarnings: 100}
	warn := warnLimiter.Warn
	defer warnLimiter.Close()

	agmLines := []*agmLine{}
	ignored := 0
	mdMinMax := map[models.Isrs]*[2]time.Time{}

lines:
	for line := 1; ; line++ {

		row, err := r.Read()
		switch {
		case err == io.EOF || len(row) == 0:
			feedback.Info("Read %d entries in CSV file", line-1)
			if ignored > 0 {
				feedback.Info("%d entries ignored", ignored)
			}
			if ignored == line-1 {
				return nil, UnchangedError("No entries imported")
			}
			break lines
		case err != nil:
			return nil, fmt.Errorf("CSV parsing failed: %v", err)
		}

		gids := row[fkGaugeIDIdx]
		gid, err := models.IsrsFromString(gids)
		if err != nil {
			return nil, fmt.Errorf("invalid ISRS code line %d: %v", line, err)
		}

		if exists, found := checkedGauges[*gid]; found {
			if !exists {
				// Just ignore the line since we have already warned
				ignored++
				continue lines
			}
		} else { // not found in gauge cache
			if err := gaugeCheckStmt.QueryRowContext(
				ctx,
				gid.CountryCode,
				gid.LoCode,
				gid.FairwaySection,
				gid.Orc,
				gid.Hectometre,
			).Scan(&exists); err != nil {
				return nil, err
			}
			checkedGauges[*gid] = exists
			if !exists {
				warn("Ignoring data for unknown gauge %s", gid.String())
				ignored++
				continue lines
			}
		}

		md, err := guessDate(row[measureDateIdx])
		if err != nil {
			return nil, fmt.Errorf("invalid 'measure_date' line %d: %v", line, err)
		}
		if v := mdMinMax[*gid]; v != nil {
			if md.Before(v[0]) {
				v[0] = md
			}
			if md.After(v[1]) {
				v[1] = md
			}
		} else {
			mdMinMax[*gid] = &[2]time.Time{md, md}
		}

		newSender := agm.Originator
		newCountryCode := gid.CountryCode
		newLanguageCode := common.CCtoLang[gid.CountryCode]
		newDateIssue := time.Now()
		newReferenceCode := "ZPG"

		value, err := strconv.ParseFloat(row[valueIdx], 32)
		if err != nil {
			return nil, fmt.Errorf("invalid 'value' line %d: %v", line, err)
		}
		newValue := value

		newDateInfo := newDateIssue

		newSourceOrganization := newSender

		agmLines = append(agmLines, newAGMLine(
			*gid,
			newCountryCode,
			newSender,
			newLanguageCode,
			newDateIssue,
			newReferenceCode,
			md,
			newValue,
			newDateInfo,
			newSourceOrganization,
		))
	}

	oldGMLines := map[models.Isrs]map[int64]*agmLine{}
	for gid, minMax := range mdMinMax {
		oldGMLines[gid], err = getOldGMLines(
			ctx, selectStmt, gid, minMax[0], minMax[1])
		if err != nil {
			return nil, err
		}
	}

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	txInsertStmt := tx.StmtContext(ctx, insertStmt)
	txTrackStmt := tx.StmtContext(ctx, trackStmt)

agmLines:
	for _, line := range agmLines {

		var ase *agmSummaryEntry

		if old := oldGMLines[line.Location]; old != nil {
			ut := line.MeasureDate.Unix()
			if o, ok := old[ut]; ok {
				if !o.hasDiff(line) { // identical
					// don't delete
					delete(old, ut)
					continue agmLines
				}
				ase = &agmSummaryEntry{
					FKGaugeID:   line.Location,
					MeasureDate: line.MeasureDate,
					Versions:    []*agmLine{o, line},
				}
			}
		}
		if ase == nil {
			ase = &agmSummaryEntry{
				FKGaugeID:   line.Location,
				MeasureDate: line.MeasureDate,
				Versions:    []*agmLine{line},
			}
		}

		var newID int64

		if err := txInsertStmt.QueryRowContext(
			ctx,
			line.Location.CountryCode,
			line.Location.LoCode,
			line.Location.FairwaySection,
			line.Location.Orc,
			line.Location.Hectometre,
			line.MeasureDate.Time,
			line.CountryCode,
			line.Sender,
			line.LanguageCode,
			line.DateIssue.Time,
			line.ReferenceCode,
			line.WaterLevel,
			line.DateInfo.Time,
			line.SourceOrganization,
		).Scan(&newID); err != nil {
			return nil, err
		}

		if _, err := txTrackStmt.ExecContext(
			ctx, importID, "waterway.gauge_measurements",
			newID,
			false,
		); err != nil {
			return nil, err
		}

		entries = append(entries, ase)
	}

	var removed int

	// Issue deletes
	for _, old := range oldGMLines {
		removed += len(old)
		for _, line := range old {
			if _, err := txTrackStmt.ExecContext(
				ctx, importID, "waterway.gauge_measurements",
				line.id,
				true,
			); err != nil {
				return nil, err
			}
			entries = append(entries, &agmSummaryEntry{
				FKGaugeID:   line.Location,
				MeasureDate: line.MeasureDate,
				Versions:    []*agmLine{line, nil},
			})
		}
	}

	feedback.Info("Measurements to update/insert: %d", len(entries))
	feedback.Info("Measurements to delete: %d", removed)

	if len(entries) == 0 && removed == 0 {
		return nil, UnchangedError("No changes from AGM import")
	}

	if err = tx.Commit(); err != nil {
		return nil, fmt.Errorf("commit failed: %v", err)
	}

	// Sort here to mix the deletes right beside the matching inserts/updates.
	// This also makes the output deterministic.
	sort.Slice(entries, func(i, j int) bool {
		return entries[i].FKGaugeID.Less(&entries[j].FKGaugeID)
	})

	feedback.Info("Imported %d entries with changes", len(entries))
	feedback.Info("Importing approved gauge measurements took %s",
		time.Since(start))

	return entries, nil
}

func getOldGMLines(
	ctx context.Context,
	stmt *sql.Stmt,
	location models.Isrs,
	from time.Time,
	to time.Time,
) (map[int64]*agmLine, error) {
	var (
		oldID                 int64
		oldCountryCode        string
		oldSender             string
		oldLanguageCode       string
		oldDateIssue          time.Time
		oldReferenceCode      string
		oldMeasureDate        time.Time
		oldValue              float64
		oldDateInfo           time.Time
		oldSourceOrganization string
	)
	gmLines := map[int64]*agmLine{}

	gms, err := stmt.QueryContext(
		ctx,
		location.CountryCode,
		location.LoCode,
		location.FairwaySection,
		location.Orc,
		location.Hectometre,
		from,
		to,
	)
	if err != nil {
		return nil, err
	}
	defer gms.Close()
	for gms.Next() {
		if err = gms.Scan(
			&oldID,
			&oldCountryCode,
			&oldSender,
			&oldLanguageCode,
			&oldDateIssue,
			&oldReferenceCode,
			&oldMeasureDate,
			&oldValue,
			&oldDateInfo,
			&oldSourceOrganization,
		); err != nil {
			return nil, err
		}
		line := newAGMLine(
			location,
			oldCountryCode,
			oldSender,
			oldLanguageCode,
			oldDateIssue,
			oldReferenceCode,
			oldMeasureDate,
			oldValue,
			oldDateInfo,
			oldSourceOrganization,
		)
		line.id = oldID
		gmLines[oldMeasureDate.Unix()] = line
	}
	if err = gms.Err(); err != nil {
		return nil, err
	}
	return gmLines, nil
}

func newAGMLine(
	location models.Isrs,
	countryCode string,
	sender string,
	languageCode string,
	dateIssue time.Time,
	referenceCode string,
	measureDate time.Time,
	waterLevel float64,
	dateInfo time.Time,
	sourceOrganization string,
) *agmLine {
	return &agmLine{
		Location:           location,
		CountryCode:        countryCode,
		Sender:             sender,
		LanguageCode:       languageCode,
		DateIssue:          timetz{dateIssue},
		ReferenceCode:      referenceCode,
		MeasureDate:        timetz{measureDate},
		WaterLevel:         waterLevel,
		DateInfo:           timetz{dateInfo},
		SourceOrganization: sourceOrganization,
	}
}