view pkg/imports/agm.go @ 3533:8e083b271fca

Improve error messages if no matching gauge version found Avoid hitting the NOT NULL constraint of the referencing validity column in order to hit the foreign key constraint instead and emit an appropriate error message in all such cases.
author Tom Gottfried <tom@intevation.de>
date Wed, 29 May 2019 18:14:20 +0200
parents 6e748f31777a
children baa51bb82364
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Sascha Wilde <wilde@intevation.de>

package imports

import (
	"bufio"
	"context"
	"database/sql"
	"encoding/csv"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"math"
	"os"
	"path/filepath"
	"strconv"
	"strings"
	"time"

	"gemma.intevation.de/gemma/pkg/misc"
	"gemma.intevation.de/gemma/pkg/models"
)

type ApprovedGaugeMeasurements struct {
	Dir        string `json:"dir"`
	Originator string `json:"originator"`
}

// GMAPJobKind is the unique name of an approved gauge measurements import job.
const AGMJobKind JobKind = "agm"

type agmJobCreator struct{}

func init() {
	RegisterJobCreator(AGMJobKind, agmJobCreator{})
}

func (agmJobCreator) AutoAccept() bool { return false }

func (agmJobCreator) Description() string {
	return "approved gauge measurements"
}

func (agmJobCreator) Create() Job { return new(ApprovedGaugeMeasurements) }

func (agmJobCreator) Depends() [2][]string {
	return [2][]string{
		{"gauge_measurements"},
		{"gauges"},
	}
}

const (
	// delete the old  and keep the new measures.
	agmStageDoneDeleteSQL = `
WITH staged AS (
  SELECT key
  FROM import.track_imports
  WHERE import_id = $1 AND
        relation = 'waterway.gauge_measurements'::regclass
),
to_delete AS (
  SELECT o.id AS id
  FROM waterway.gauge_measurements o
  JOIN waterway.gauge_measurements n
    USING (location, measure_date)
    WHERE n.id IN (SELECT key FROM staged)
	  AND o.id NOT IN (SELECT key FROM staged)
)
DELETE FROM waterway.gauge_measurements WHERE id IN (SELECT id from to_delete)`

	agmStageDoneSQL = `
UPDATE waterway.gauge_measurements SET staging_done = true
WHERE id IN (
  SELECT key FROM import.track_imports
  WHERE import_id = $1 AND
    relation = 'waterway.gauge_measurements'::regclass)`
)

func (agmJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
) error {
	_, err := tx.ExecContext(ctx, agmStageDoneDeleteSQL, id)
	if err == nil {
		_, err = tx.ExecContext(ctx, agmStageDoneSQL, id)
	}
	return err
}

// CleanUp removes the folder containing the CSV file with the
// the approved gauge measurements.
func (agm *ApprovedGaugeMeasurements) CleanUp() error {
	return os.RemoveAll(agm.Dir)
}

var guessDate = misc.TimeGuesser([]string{
	"02.01.2006 15:04",
	"2006-01-02T15:04:05-07:00",
}).Guess

type timetz struct{ time.Time }

func (ttz *timetz) MarshalJSON() ([]byte, error) {
	return json.Marshal(ttz.Time.Format("2006-01-02T15:04:05-07:00"))
}

type agmLine struct {
	CountryCode        string  `json:"country-code"`
	Sender             string  `json:"sender"`
	LanguageCode       string  `json:"language-code"`
	DateIssue          timetz  `json:"date-issue"`
	ReferenceCode      string  `json:"reference-code"`
	WaterLevel         float64 `json:"water-level"`
	DateInfo           timetz  `json:"date-info"`
	SourceOrganization string  `json:"source-organization"`
}

func (a *agmLine) hasDiff(b *agmLine) bool {
	const eps = 0.00001
	return a.CountryCode != b.CountryCode ||
		a.Sender != b.Sender ||
		a.LanguageCode != b.LanguageCode ||
		a.ReferenceCode != b.ReferenceCode ||
		math.Abs(a.WaterLevel-b.WaterLevel) > eps ||
		a.SourceOrganization != b.SourceOrganization
}

type agmSummaryEntry struct {
	FKGaugeID   models.Isrs `json:"fk-gauge-id"`
	MeasureDate timetz      `json:"measure-date"`
	Versions    []*agmLine  `json:"versions"`
}

const (
	agmSelectSQL = `
SELECT
  id,
  country_code,
  sender,
  language_code,
  date_issue,
  reference_code,
  water_level,
  date_info,
  source_organization
FROM waterway.gauge_measurements
WHERE
  location
    = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
  AND measure_date = $6
  AND staging_done
`

	agmInsertSQL = `
INSERT INTO waterway.gauge_measurements (
  location,
  validity,
  measure_date,
  country_code,
  sender,
  language_code,
  date_issue,
  reference_code,
  water_level,
  date_info,
  source_organization,
  staging_done
) VALUES (
  ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
  COALESCE(
    (SELECT validity FROM waterway.gauges
       WHERE location
            = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
         AND validity @> CAST($6 AS timestamp with time zone)),
    tstzrange(NULL, NULL)),
  $6,
  $7,
  $8,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14,
  false
)
RETURNING id`

	agmGaugeCheckSQL = `
SELECT EXISTS(
  SELECT 1 FROM waterway.gauges
  WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int))
`
)

var errContinue = errors.New("continue")

// Do executes the actual approved gauge measurements import.
func (agm *ApprovedGaugeMeasurements) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	f, err := os.Open(filepath.Join(agm.Dir, "agm.csv"))
	if err != nil {
		return nil, err
	}
	defer f.Close()

	r := csv.NewReader(bufio.NewReader(f))
	r.Comma = ';'
	r.ReuseRecord = true

	headers, err := r.Read()
	if err != nil {
		return nil, err
	}

	var (
		fkGaugeIDIdx   = -1
		measureDateIdx = -1
		valueIdx       = -1
	)

	headerFields := []struct {
		idx  *int
		name string
	}{
		{&fkGaugeIDIdx, "fk_gauge_id"},
		{&measureDateIdx, "measure_date"},
		{&valueIdx, "value"}, // "water_level",
	}

nextHeader:
	for i, f := range headers {
		h := strings.Replace(strings.ToLower(
			strings.TrimSpace(f)), " ", "_", -1)

		for j := range headerFields {
			if headerFields[j].name == h {
				if *headerFields[j].idx != -1 {
					return nil, fmt.Errorf(
						"There is more than one column namend '%s'", h)
				}
				*headerFields[j].idx = i
				continue nextHeader
			}
		}
	}

	var missing []string
	for i := range headerFields {
		if headerFields[i].name != "unit" && *headerFields[i].idx == -1 {
			missing = append(missing, headerFields[i].name)
		}
	}
	if len(missing) > 0 {
		return nil, fmt.Errorf("Missing columns: %s", strings.Join(missing, ", "))
	}

	gaugeCheckStmt, err := conn.PrepareContext(ctx, agmGaugeCheckSQL)
	if err != nil {
		return nil, err
	}
	defer gaugeCheckStmt.Close()

	selectStmt, err := conn.PrepareContext(ctx, agmSelectSQL)
	if err != nil {
		return nil, err
	}
	defer selectStmt.Close()

	insertStmt, err := conn.PrepareContext(ctx, agmInsertSQL)
	if err != nil {
		return nil, err
	}
	defer insertStmt.Close()

	trackStmt, err := conn.PrepareContext(ctx, trackImportSQL)
	if err != nil {
		return nil, err
	}
	defer trackStmt.Close()

	entries := []*agmSummaryEntry{}

	checkedGauges := map[models.Isrs]bool{}

	warnLimiter := misc.WarningLimiter{Log: feedback.Warn, MaxWarnings: 100}
	warn := warnLimiter.Warn
	defer warnLimiter.Close()

lines:
	for line, ignored := 1, 0; ; line++ {

		row, err := r.Read()
		switch {
		case err == io.EOF || len(row) == 0:
			feedback.Info("Read %d entries in CSV file", line-1)
			if ignored > 0 {
				feedback.Info("%d entries ignored", ignored)
			}
			if ignored == line-1 {
				return nil, UnchangedError("No entries imported")
			}
			feedback.Info("Imported %d entries with changes", len(entries))
			break lines
		case err != nil:
			return nil, fmt.Errorf("CSV parsing failed: %v", err)
		}

		gids := row[fkGaugeIDIdx]
		gid, err := models.IsrsFromString(gids)
		if err != nil {
			return nil, fmt.Errorf("Invalid ISRS code line %d: %v", line, err)
		}

		if exists, found := checkedGauges[*gid]; found {
			if !exists {
				// Just ignore the line since we have already warned
				ignored++
				continue lines
			}
		} else { // not found in gauge cache
			if err := gaugeCheckStmt.QueryRowContext(
				ctx,
				gid.CountryCode,
				gid.LoCode,
				gid.FairwaySection,
				gid.Orc,
				gid.Hectometre,
			).Scan(&exists); err != nil {
				return nil, err
			}
			checkedGauges[*gid] = exists
			if !exists {
				warn("Ignoring data for unknown gauge %s", gid.String())
				ignored++
				continue lines
			}
		}

		md, err := guessDate(row[measureDateIdx])
		if err != nil {
			return nil, fmt.Errorf("Invalid 'measure_date' line %d: %v", line, err)
		}

		var (
			oldID                 int64
			oldCountryCode        string
			oldSender             string
			oldLanguageCode       string
			oldDateIssue          time.Time
			oldReferenceCode      string
			oldValue              float64
			oldDateInfo           time.Time
			oldSourceOrganization string
		)

		err = selectStmt.QueryRowContext(
			ctx,
			gid.CountryCode,
			gid.LoCode,
			gid.FairwaySection,
			gid.Orc,
			gid.Hectometre,
			md,
		).Scan(
			&oldID,
			&oldCountryCode,
			&oldSender,
			&oldLanguageCode,
			&oldDateIssue,
			&oldReferenceCode,
			&oldValue,
			&oldDateInfo,
			&oldSourceOrganization,
		)

		var newEntry bool
		switch {
		case err == sql.ErrNoRows:
			// Complete new one
			newEntry = true
		case err != nil:
			return nil, err
		}

		newSender := agm.Originator
		newCountryCode := gid.CountryCode
		newLanguageCode := misc.CCtoLang[gid.CountryCode]
		newDateIssue := time.Now()
		newReferenceCode := "ZPG"

		value, err := strconv.ParseFloat(row[valueIdx], 32)
		if err != nil {
			return nil, fmt.Errorf("Invalid 'value' line %d: %v", line, err)
		}
		newValue := value

		newDateInfo := newDateIssue

		newSourceOrganization := newSender

		switch err := func() error {
			tx, err := conn.BeginTx(ctx, nil)
			if err != nil {
				return err
			}
			defer tx.Rollback()

			var newID int64

			if err := tx.StmtContext(ctx, insertStmt).QueryRowContext(
				ctx,
				gid.CountryCode,
				gid.LoCode,
				gid.FairwaySection,
				gid.Orc,
				gid.Hectometre,
				md,
				newCountryCode,
				newSender,
				newLanguageCode,
				newDateIssue,
				newReferenceCode,
				newValue,
				newDateInfo,
				newSourceOrganization,
			).Scan(&newID); err != nil {
				warn(handleError(err).Error())
				ignored++
				return errContinue
			}

			if _, err := tx.StmtContext(ctx, trackStmt).ExecContext(
				ctx, importID, "waterway.gauge_measurements", newID,
			); err != nil {
				return err
			}

			if err = tx.Commit(); err != nil {
				err = fmt.Errorf("Commit failed: %v", err)
			}
			return err
		}(); {
		case err == errContinue:
			continue lines
		case err != nil:
			return nil, err
		}

		n := newAGMLine(
			newCountryCode,
			newSender,
			newLanguageCode,
			newDateIssue,
			newReferenceCode,
			newValue,
			newDateInfo,
			newSourceOrganization,
		)

		ase := &agmSummaryEntry{
			FKGaugeID:   *gid,
			MeasureDate: timetz{md},
		}

		if newEntry {
			ase.Versions = []*agmLine{n}
		} else {
			o := newAGMLine(
				oldCountryCode,
				oldSender,
				oldLanguageCode,
				oldDateIssue,
				oldReferenceCode,
				oldValue,
				oldDateInfo,
				oldSourceOrganization,
			)
			// Ignore if there is no diff.
			if !n.hasDiff(o) {
				continue
			}
			ase.Versions = []*agmLine{o, n}
		}
		entries = append(entries, ase)
	}

	feedback.Info("Importing approved gauge measurements took %s",
		time.Since(start))

	return entries, nil
}

func newAGMLine(
	countryCode string,
	sender string,
	languageCode string,
	dateIssue time.Time,
	referenceCode string,
	waterLevel float64,
	dateInfo time.Time,
	sourceOrganization string,
) *agmLine {
	return &agmLine{
		CountryCode:        countryCode,
		Sender:             sender,
		LanguageCode:       languageCode,
		DateIssue:          timetz{dateIssue},
		ReferenceCode:      referenceCode,
		WaterLevel:         waterLevel,
		DateInfo:           timetz{dateInfo},
		SourceOrganization: sourceOrganization,
	}
}