view pkg/imports/agm.go @ 5670:b75d0b303328

Various fixes and improvements of gauges import: - Allow update of erased data (and thereby set erased to false) - Fix source_organization to work with ERDMS2 - Give ISRS of new and updated gauges in summary - Fixed reference of null pointers if revlevels are missing - Fixed reference of null pointer on update errors - Added ISRS to reference_code warning
author Sascha Wilde <wilde@sha-bang.de>
date Fri, 08 Dec 2023 17:29:56 +0100
parents 56c589f7435d
children 6270951dda28
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Sascha Wilde <wilde@intevation.de>
//  * Tom Gottfried <tom.gottfried@intevation.de>

package imports

import (
	"bufio"
	"context"
	"database/sql"
	"encoding/csv"
	"encoding/json"
	"fmt"
	"io"
	"math"
	"os"
	"path/filepath"
	"sort"
	"strconv"
	"strings"
	"time"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/models"
)

// ApprovedGaugeMeasurements is a Job to import
// approved gauge measurements from a CVS file
// into the database.
type ApprovedGaugeMeasurements struct {
	Dir        string `json:"dir"`
	Originator string `json:"originator"`
}

// AGMJobKind is the unique name of an approved gauge measurements import job.
const AGMJobKind JobKind = "agm"

type agmJobCreator struct{}

func init() {
	RegisterJobCreator(AGMJobKind, agmJobCreator{})
}

func (agmJobCreator) AutoAccept() bool { return false }

func (agmJobCreator) Description() string {
	return "approved gauge measurements"
}

func (agmJobCreator) Create() Job { return new(ApprovedGaugeMeasurements) }

func (agmJobCreator) Depends() [2][]string {
	return [2][]string{
		{"gauge_measurements"},
		{"gauges"},
	}
}

const (
	agmStageDoneDeleteSQL = `
DELETE FROM waterway.gauge_measurements WHERE id IN (
  SELECT key
  FROM import.track_imports
  WHERE import_id = $1 AND
    relation = 'waterway.gauge_measurements'::regclass AND
    deletion
)`

	agmStageDoneSQL = `
UPDATE waterway.gauge_measurements SET staging_done = true
WHERE id IN (
  SELECT key
  FROM import.track_imports
  WHERE import_id = $1 AND
    relation = 'waterway.gauge_measurements'::regclass AND
  NOT deletion
)`
)

// StageDone replaces gauge measurements with those in the staging area
func (agmJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
	_ Feedback,
) error {
	_, err := tx.ExecContext(ctx, agmStageDoneDeleteSQL, id)
	if err == nil {
		_, err = tx.ExecContext(ctx, agmStageDoneSQL, id)
	}
	return err
}

// CleanUp removes the folder containing the CSV file with the
// the approved gauge measurements.
func (agm *ApprovedGaugeMeasurements) CleanUp() error {
	return os.RemoveAll(agm.Dir)
}

var guessDate = common.TimeParser([]string{
	"02.01.2006 15:04",
	"2006-01-02T15:04:05-07:00",
}).Parse

type timetz struct{ time.Time }

func (ttz *timetz) MarshalJSON() ([]byte, error) {
	return json.Marshal(ttz.Time.Format("2006-01-02T15:04:05-07:00"))
}

type agmLine struct {
	id                 int64
	Location           models.Isrs `json:"fk-gauge-id"`
	CountryCode        string      `json:"country-code"`
	Sender             string      `json:"sender"`
	LanguageCode       string      `json:"language-code"`
	DateIssue          timetz      `json:"date-issue"`
	ReferenceCode      string      `json:"reference-code"`
	MeasureDate        timetz      `json:"measure-date"`
	WaterLevel         float64     `json:"water-level"`
	DateInfo           timetz      `json:"date-info"`
	SourceOrganization string      `json:"source-organization"`
}

func (a *agmLine) hasDiff(b *agmLine) bool {
	const eps = 0.00001
	return a.CountryCode != b.CountryCode ||
		a.Sender != b.Sender ||
		a.LanguageCode != b.LanguageCode ||
		a.ReferenceCode != b.ReferenceCode ||
		math.Abs(a.WaterLevel-b.WaterLevel) > eps ||
		a.SourceOrganization != b.SourceOrganization
}

type agmSummaryEntry struct {
	FKGaugeID   models.Isrs `json:"fk-gauge-id"`
	MeasureDate timetz      `json:"measure-date"`
	Versions    []*agmLine  `json:"versions"`
}

const (
	agmSelectSQL = `
SELECT
  id,
  country_code,
  sender,
  language_code,
  date_issue,
  reference_code,
  measure_date,
  water_level,
  date_info,
  source_organization
FROM waterway.gauge_measurements
WHERE
  location
    = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
  AND measure_date BETWEEN $6 AND $7
  AND staging_done
`

	agmInsertSQL = `
INSERT INTO waterway.gauge_measurements (
  location,
  measure_date,
  country_code,
  sender,
  language_code,
  date_issue,
  reference_code,
  water_level,
  date_info,
  source_organization,
  staging_done
) VALUES (
  ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
  $6,
  $7,
  $8,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14,
  false
)
RETURNING id`

	agmGaugeCheckSQL = `
SELECT EXISTS(
  SELECT 1 FROM waterway.gauges
  WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int))
`
)

func parseAGMHeaders(headers []string, fkGaugeIDIdx, measureDateIdx, valueIdx *int) error {

	headerFields := []struct {
		idx  *int
		name string
	}{
		{fkGaugeIDIdx, "fk_gauge_id"},
		{measureDateIdx, "measure_date"},
		{valueIdx, "value"}, // "water_level",
	}

nextHeader:
	for i, f := range headers {
		h := strings.Replace(strings.ToLower(
			strings.TrimSpace(f)), " ", "_", -1)

		for j := range headerFields {
			if headerFields[j].name == h {
				if *headerFields[j].idx != -1 {
					return fmt.Errorf(
						"there is more than one column namend '%s'", h)
				}
				*headerFields[j].idx = i
				continue nextHeader
			}
		}
	}

	var missing []string
	for i := range headerFields {
		if headerFields[i].name != "unit" && *headerFields[i].idx == -1 {
			missing = append(missing, headerFields[i].name)
		}
	}
	if len(missing) > 0 {
		return fmt.Errorf("missing columns: %s", strings.Join(missing, ", "))
	}

	return nil
}

// Do executes the actual approved gauge measurements import.
func (agm *ApprovedGaugeMeasurements) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	f, err := os.Open(filepath.Join(agm.Dir, "agm.csv"))
	if err != nil {
		return nil, err
	}
	defer f.Close()

	r := csv.NewReader(bufio.NewReader(f))
	r.Comma = ';'
	r.ReuseRecord = true

	headers, err := r.Read()
	if err != nil {
		return nil, err
	}

	var (
		fkGaugeIDIdx   = -1
		measureDateIdx = -1
		valueIdx       = -1
	)

	if err := parseAGMHeaders(
		headers,
		&fkGaugeIDIdx, &measureDateIdx, &valueIdx,
	); err != nil {
		return nil, err
	}

	gaugeCheckStmt, err := conn.PrepareContext(ctx, agmGaugeCheckSQL)
	if err != nil {
		return nil, err
	}
	defer gaugeCheckStmt.Close()

	selectStmt, err := conn.PrepareContext(ctx, agmSelectSQL)
	if err != nil {
		return nil, err
	}
	defer selectStmt.Close()

	insertStmt, err := conn.PrepareContext(ctx, agmInsertSQL)
	if err != nil {
		return nil, err
	}
	defer insertStmt.Close()

	trackStmt, err := conn.PrepareContext(ctx, trackImportDeletionSQL)
	if err != nil {
		return nil, err
	}
	defer trackStmt.Close()

	entries := []*agmSummaryEntry{}

	checkedGauges := map[models.Isrs]bool{}

	warnLimiter := common.WarningLimiter{Log: feedback.Warn, MaxWarnings: 100}
	warn := warnLimiter.Warn
	defer warnLimiter.Close()

	agmLines := []*agmLine{}
	ignored := 0
	mdMinMax := map[models.Isrs]*[2]time.Time{}

lines:
	for line := 1; ; line++ {

		row, err := r.Read()
		switch {
		case err == io.EOF || len(row) == 0:
			feedback.Info("Read %d entries in CSV file", line-1)
			if ignored > 0 {
				feedback.Info("%d entries ignored", ignored)
			}
			if ignored == line-1 {
				return nil, UnchangedError("No entries imported")
			}
			break lines
		case err != nil:
			return nil, fmt.Errorf("CSV parsing failed: %v", err)
		}

		gids := row[fkGaugeIDIdx]
		gid, err := models.IsrsFromString(gids)
		if err != nil {
			return nil, fmt.Errorf("invalid ISRS code line %d: %v", line, err)
		}

		if exists, found := checkedGauges[*gid]; found {
			if !exists {
				// Just ignore the line since we have already warned
				ignored++
				continue lines
			}
		} else { // not found in gauge cache
			if err := gaugeCheckStmt.QueryRowContext(
				ctx,
				gid.CountryCode,
				gid.LoCode,
				gid.FairwaySection,
				gid.Orc,
				gid.Hectometre,
			).Scan(&exists); err != nil {
				return nil, err
			}
			checkedGauges[*gid] = exists
			if !exists {
				warn("Ignoring data for unknown gauge %s", gid.String())
				ignored++
				continue lines
			}
		}

		md, err := guessDate(row[measureDateIdx])
		if err != nil {
			return nil, fmt.Errorf("invalid 'measure_date' line %d: %v", line, err)
		}
		if v := mdMinMax[*gid]; v != nil {
			if md.Before(v[0]) {
				v[0] = md
			}
			if md.After(v[1]) {
				v[1] = md
			}
		} else {
			mdMinMax[*gid] = &[2]time.Time{md, md}
		}

		newSender := agm.Originator
		newCountryCode := gid.CountryCode
		newLanguageCode := common.CCtoLang[gid.CountryCode]
		newDateIssue := time.Now()
		newReferenceCode := "ZPG"

		value, err := strconv.ParseFloat(row[valueIdx], 32)
		if err != nil {
			return nil, fmt.Errorf("invalid 'value' line %d: %v", line, err)
		}
		newValue := value

		newDateInfo := newDateIssue

		newSourceOrganization := newSender

		agmLines = append(agmLines, newAGMLine(
			*gid,
			newCountryCode,
			newSender,
			newLanguageCode,
			newDateIssue,
			newReferenceCode,
			md,
			newValue,
			newDateInfo,
			newSourceOrganization,
		))
	}

	oldGMLines := map[models.Isrs]map[int64]*agmLine{}
	for gid, minMax := range mdMinMax {
		oldGMLines[gid], err = getOldGMLines(
			ctx, selectStmt, gid, minMax[0], minMax[1])
		if err != nil {
			return nil, err
		}
	}

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	txInsertStmt := tx.StmtContext(ctx, insertStmt)
	txTrackStmt := tx.StmtContext(ctx, trackStmt)

agmLines:
	for _, line := range agmLines {

		var ase *agmSummaryEntry

		if old := oldGMLines[line.Location]; old != nil {
			ut := line.MeasureDate.Unix()
			if o, ok := old[ut]; ok {
				if !o.hasDiff(line) { // identical
					// don't delete
					delete(old, ut)
					continue agmLines
				}
				ase = &agmSummaryEntry{
					FKGaugeID:   line.Location,
					MeasureDate: line.MeasureDate,
					Versions:    []*agmLine{o, line},
				}
			}
		}
		if ase == nil {
			ase = &agmSummaryEntry{
				FKGaugeID:   line.Location,
				MeasureDate: line.MeasureDate,
				Versions:    []*agmLine{line},
			}
		}

		var newID int64

		if err := txInsertStmt.QueryRowContext(
			ctx,
			line.Location.CountryCode,
			line.Location.LoCode,
			line.Location.FairwaySection,
			line.Location.Orc,
			line.Location.Hectometre,
			line.MeasureDate.Time,
			line.CountryCode,
			line.Sender,
			line.LanguageCode,
			line.DateIssue.Time,
			line.ReferenceCode,
			line.WaterLevel,
			line.DateInfo.Time,
			line.SourceOrganization,
		).Scan(&newID); err != nil {
			return nil, err
		}

		if _, err := txTrackStmt.ExecContext(
			ctx, importID, "waterway.gauge_measurements",
			newID,
			false,
		); err != nil {
			return nil, err
		}

		entries = append(entries, ase)
	}

	var removed int

	// Issue deletes
	for _, old := range oldGMLines {
		removed += len(old)
		for _, line := range old {
			if _, err := txTrackStmt.ExecContext(
				ctx, importID, "waterway.gauge_measurements",
				line.id,
				true,
			); err != nil {
				return nil, err
			}
			entries = append(entries, &agmSummaryEntry{
				FKGaugeID:   line.Location,
				MeasureDate: line.MeasureDate,
				Versions:    []*agmLine{line, nil},
			})
		}
	}

	feedback.Info("Measurements to update/insert: %d", len(entries))
	feedback.Info("Measurements to delete: %d", removed)

	if len(entries) == 0 && removed == 0 {
		return nil, UnchangedError("No changes from AGM import")
	}

	if err = tx.Commit(); err != nil {
		return nil, fmt.Errorf("commit failed: %v", err)
	}

	// Sort here to mix the deletes right beside the matching inserts/updates.
	// This also makes the output deterministic.
	sort.Slice(entries, func(i, j int) bool {
		return entries[i].FKGaugeID.Less(&entries[j].FKGaugeID)
	})

	feedback.Info("Imported %d entries with changes", len(entries))
	feedback.Info("Importing approved gauge measurements took %s",
		time.Since(start))

	return entries, nil
}

func getOldGMLines(
	ctx context.Context,
	stmt *sql.Stmt,
	location models.Isrs,
	from time.Time,
	to time.Time,
) (map[int64]*agmLine, error) {
	var (
		oldID                 int64
		oldCountryCode        string
		oldSender             string
		oldLanguageCode       string
		oldDateIssue          time.Time
		oldReferenceCode      string
		oldMeasureDate        time.Time
		oldValue              float64
		oldDateInfo           time.Time
		oldSourceOrganization string
	)
	gmLines := map[int64]*agmLine{}

	gms, err := stmt.QueryContext(
		ctx,
		location.CountryCode,
		location.LoCode,
		location.FairwaySection,
		location.Orc,
		location.Hectometre,
		from,
		to,
	)
	if err != nil {
		return nil, err
	}
	defer gms.Close()
	for gms.Next() {
		if err = gms.Scan(
			&oldID,
			&oldCountryCode,
			&oldSender,
			&oldLanguageCode,
			&oldDateIssue,
			&oldReferenceCode,
			&oldMeasureDate,
			&oldValue,
			&oldDateInfo,
			&oldSourceOrganization,
		); err != nil {
			return nil, err
		}
		line := newAGMLine(
			location,
			oldCountryCode,
			oldSender,
			oldLanguageCode,
			oldDateIssue,
			oldReferenceCode,
			oldMeasureDate,
			oldValue,
			oldDateInfo,
			oldSourceOrganization,
		)
		line.id = oldID
		gmLines[oldMeasureDate.Unix()] = line
	}
	if err = gms.Err(); err != nil {
		return nil, err
	}
	return gmLines, nil
}

func newAGMLine(
	location models.Isrs,
	countryCode string,
	sender string,
	languageCode string,
	dateIssue time.Time,
	referenceCode string,
	measureDate time.Time,
	waterLevel float64,
	dateInfo time.Time,
	sourceOrganization string,
) *agmLine {
	return &agmLine{
		Location:           location,
		CountryCode:        countryCode,
		Sender:             sender,
		LanguageCode:       languageCode,
		DateIssue:          timetz{dateIssue},
		ReferenceCode:      referenceCode,
		MeasureDate:        timetz{measureDate},
		WaterLevel:         waterLevel,
		DateInfo:           timetz{dateInfo},
		SourceOrganization: sourceOrganization,
	}
}