view pkg/imports/agm.go @ 3220:56b297592c0a

Handle failing INSERTs gracefully during approved gauge measurements import
author Tom Gottfried <tom@intevation.de>
date Thu, 09 May 2019 14:21:50 +0200
parents 4acbee65275d
children 232fc90e6ee2
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Sascha Wilde <wilde@intevation.de>

package imports

import (
	"bufio"
	"context"
	"database/sql"
	"encoding/csv"
	"encoding/json"
	"fmt"
	"io"
	"math"
	"os"
	"path/filepath"
	"strconv"
	"strings"
	"time"

	"gemma.intevation.de/gemma/pkg/misc"
	"gemma.intevation.de/gemma/pkg/models"
)

type ApprovedGaugeMeasurements struct {
	Dir        string `json:"dir"`
	Originator string `json:"originator"`
}

// GMAPJobKind is the unique name of an approved gauge measurements import job.
const AGMJobKind JobKind = "agm"

type agmJobCreator struct{}

func init() {
	RegisterJobCreator(AGMJobKind, agmJobCreator{})
}

func (agmJobCreator) AutoAccept() bool { return false }

func (agmJobCreator) Description() string {
	return "approved gauge measurements"
}

func (agmJobCreator) Create() Job { return new(ApprovedGaugeMeasurements) }

func (agmJobCreator) Depends() [2][]string {
	return [2][]string{
		{"gauge_measurements"},
		{"gauges"},
	}
}

const (
	// delete the old  and keep the new measures.
	agmStageDoneDeleteSQL = `
WITH staged AS (
  SELECT key
  FROM import.track_imports
  WHERE import_id = $1 AND
        relation = 'waterway.gauge_measurements'::regclass
),
to_delete AS (
  SELECT o.id AS id
  FROM waterway.gauge_measurements o
  JOIN waterway.gauge_measurements n
    ON n.fk_gauge_id = o.fk_gauge_id AND n.measure_date = o.measure_date
    WHERE n.id IN (SELECT key FROM staged)
	  AND o.id NOT IN (SELECT key FROM staged)
          AND NOT o.predicted
)
DELETE FROM waterway.gauge_measurements WHERE id IN (SELECT id from to_delete)`

	agmStageDoneSQL = `
UPDATE waterway.gauge_measurements SET staging_done = true
WHERE id IN (
  SELECT key FROM import.track_imports
  WHERE import_id = $1 AND
    relation = 'waterway.gauge_measurements'::regclass)`
)

func (agmJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
) error {
	_, err := tx.ExecContext(ctx, agmStageDoneDeleteSQL, id)
	if err == nil {
		_, err = tx.ExecContext(ctx, agmStageDoneSQL, id)
	}
	return err
}

// CleanUp removes the folder containing the CSV file with the
// the approved gauge measurements.
func (agm *ApprovedGaugeMeasurements) CleanUp() error {
	return os.RemoveAll(agm.Dir)
}

var guessDate = misc.TimeGuesser([]string{
	"02.01.2006 15:04",
	"2006-01-02T15:04:05-07:00",
}).Guess

type timetz struct{ time.Time }

func (ttz *timetz) MarshalJSON() ([]byte, error) {
	return json.Marshal(ttz.Time.Format("2006-01-02T15:04:05-07:00"))
}

type agmLine struct {
	CountryCode        string   `json:"country-code"`
	Sender             string   `json:"sender"`
	LanguageCode       string   `json:"language-code"`
	DateIssue          timetz   `json:"date-issue"`
	ReferenceCode      string   `json:"reference-code"`
	WaterLevel         float64  `json:"water-level"`
	Predicted          bool     `json:"predicted"`
	ValueMin           *float64 `json:"value-min"`
	ValueMax           *float64 `json:"value-max"`
	DateInfo           timetz   `json:"date-info"`
	SourceOrganization string   `json:"source-organization"`
}

func (a *agmLine) hasDiff(b *agmLine) bool {
	const eps = 0.00001
	fdiff := func(x, y *float64) bool {
		if x == nil && y == nil {
			return false
		}
		if (x == nil && y != nil) || (x != nil && y == nil) {
			return true
		}
		return math.Abs(*x-*y) > eps
	}
	return a.CountryCode != b.CountryCode ||
		a.Sender != b.Sender ||
		a.LanguageCode != b.LanguageCode ||
		a.ReferenceCode != b.ReferenceCode ||
		math.Abs(a.WaterLevel-b.WaterLevel) > eps ||
		a.Predicted != b.Predicted ||
		fdiff(a.ValueMin, b.ValueMin) ||
		fdiff(a.ValueMax, b.ValueMax) ||
		a.SourceOrganization != b.SourceOrganization
}

type agmSummaryEntry struct {
	FKGaugeID   models.Isrs `json:"fk-gauge-id"`
	MeasureDate timetz      `json:"measure-date"`
	Versions    []*agmLine  `json:"versions"`
}

const (
	agmSelectSQL = `
SELECT
  id,
  country_code,
  sender,
  language_code,
  date_issue,
  reference_code,
  water_level,
  predicted,
  value_min,
  value_max,
  date_info,
  source_organization
FROM waterway.gauge_measurements
WHERE
  fk_gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int) AND
  measure_date = $6 AND staging_done`

	agmInsertSQL = `
INSERT INTO waterway.gauge_measurements (
  fk_gauge_id,
  measure_date,
  country_code,
  sender,
  language_code,
  date_issue,
  reference_code,
  water_level,
  predicted,
  value_min,
  value_max,
  date_info,
  source_organization,
  is_waterlevel,
  staging_done
) VALUES(
  ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
  $6,
  $7,
  $8,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14,
  $15,
  $16,
  $17,
  true,
  false
)
RETURNING id`

	agmGaugeCheckSQL = `
SELECT EXISTS(
  SELECT 1 FROM waterway.gauges
  WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int))
`
)

// Do executes the actual approved gauge measurements import.
func (agm *ApprovedGaugeMeasurements) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	f, err := os.Open(filepath.Join(agm.Dir, "agm.csv"))
	if err != nil {
		return nil, err
	}
	defer f.Close()

	r := csv.NewReader(bufio.NewReader(f))
	r.Comma = ';'
	r.ReuseRecord = true

	headers, err := r.Read()
	if err != nil {
		return nil, err
	}

	var (
		fkGaugeIDIdx   = -1
		measureDateIdx = -1
		valueIdx       = -1
	)

	headerFields := []struct {
		idx  *int
		name string
	}{
		{&fkGaugeIDIdx, "fk_gauge_id"},
		{&measureDateIdx, "measure_date"},
		{&valueIdx, "value"}, // "water_level",
	}

nextHeader:
	for i, f := range headers {
		h := strings.Replace(strings.ToLower(
			strings.TrimSpace(f)), " ", "_", -1)

		for j := range headerFields {
			if headerFields[j].name == h {
				if *headerFields[j].idx != -1 {
					return nil, fmt.Errorf(
						"There is more than one column namend '%s'", h)
				}
				*headerFields[j].idx = i
				continue nextHeader
			}
		}
	}

	var missing []string
	for i := range headerFields {
		if headerFields[i].name != "unit" && *headerFields[i].idx == -1 {
			missing = append(missing, headerFields[i].name)
		}
	}
	if len(missing) > 0 {
		return nil, fmt.Errorf("Missing columns: %s", strings.Join(missing, ", "))
	}

	gaugeCheckStmt, err := conn.PrepareContext(ctx, agmGaugeCheckSQL)
	if err != nil {
		return nil, err
	}
	defer gaugeCheckStmt.Close()

	selectStmt, err := conn.PrepareContext(ctx, agmSelectSQL)
	if err != nil {
		return nil, err
	}
	defer selectStmt.Close()

	insertStmt, err := conn.PrepareContext(ctx, agmInsertSQL)
	if err != nil {
		return nil, err
	}
	defer insertStmt.Close()

	trackStmt, err := conn.PrepareContext(ctx, trackImportSQL)
	if err != nil {
		return nil, err
	}
	defer trackStmt.Close()

	entries := []*agmSummaryEntry{}

	checkedGauges := map[models.Isrs]bool{}

lines:
	for line, ignored := 1, 0; ; line++ {

		row, err := r.Read()
		switch {
		case err == io.EOF || len(row) == 0:
			feedback.Info("Read %d entries in CSV file", line-1)
			if ignored > 0 {
				feedback.Info("%d entries ignored", ignored)
			}
			if ignored == line-1 {
				return nil, UnchangedError("No entries imported")
			}
			feedback.Info("Imported %d entries with changes", len(entries))
			break lines
		case err != nil:
			return nil, fmt.Errorf("CSV parsing failed: %v", err)
		}

		gids := row[fkGaugeIDIdx]
		gid, err := models.IsrsFromString(gids)
		if err != nil {
			return nil, fmt.Errorf("Invalid ISRS code line %d: %v", line, err)
		}

		if exists, found := checkedGauges[*gid]; found {
			if !exists {
				// Just ignore the line since we have already warned
				ignored++
				continue lines
			}
		} else { // not found in gauge cache
			if err := gaugeCheckStmt.QueryRowContext(
				ctx,
				gid.CountryCode,
				gid.LoCode,
				gid.FairwaySection,
				gid.Orc,
				gid.Hectometre,
			).Scan(&exists); err != nil {
				return nil, err
			}
			checkedGauges[*gid] = exists
			if !exists {
				feedback.Warn("Ignoring data for unknown gauge %s", gid.String())
				ignored++
				continue lines
			}
		}

		md, err := guessDate(row[measureDateIdx])
		if err != nil {
			return nil, fmt.Errorf("Invalid 'measure_date' line %d: %v", line, err)
		}

		var (
			oldID                 int64
			oldCountryCode        string
			oldSender             string
			oldLanguageCode       string
			oldDateIssue          time.Time
			oldReferenceCode      string
			oldValue              float64
			oldPredicted          bool
			oldValueMin           sql.NullFloat64
			oldValueMax           sql.NullFloat64
			oldDateInfo           time.Time
			oldSourceOrganization string
		)

		err = selectStmt.QueryRowContext(
			ctx,
			gid.CountryCode,
			gid.LoCode,
			gid.FairwaySection,
			gid.Orc,
			gid.Hectometre,
			md,
		).Scan(
			&oldID,
			&oldCountryCode,
			&oldSender,
			&oldLanguageCode,
			&oldDateIssue,
			&oldReferenceCode,
			&oldValue,
			&oldPredicted,
			&oldValueMin,
			&oldValueMax,
			&oldDateInfo,
			&oldSourceOrganization,
		)

		var newEntry bool
		switch {
		case err == sql.ErrNoRows:
			// Complete new one
			newEntry = true
		case err != nil:
			return nil, err
		}

		newSender := agm.Originator
		newCountryCode := gid.CountryCode
		newLanguageCode := misc.CCtoLang[gid.CountryCode]
		newDateIssue := time.Now()
		newReferenceCode := "ZPG"

		value, err := strconv.ParseFloat(row[valueIdx], 32)
		if err != nil {
			return nil, fmt.Errorf("Invalid 'value' line %d: %v", line, err)
		}
		newValue := value

		newPredicted := false

		newValueMin := sql.NullFloat64{
			Float64: 0,
			Valid:   true,
		}
		newValueMax := sql.NullFloat64{
			Float64: 0,
			Valid:   true,
		}

		newDateInfo := newDateIssue

		newSourceOrganization := newSender

		tx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return nil, err
		}
		defer tx.Rollback()

		var newID int64

		if err := tx.StmtContext(ctx, insertStmt).QueryRowContext(
			ctx,
			gid.CountryCode,
			gid.LoCode,
			gid.FairwaySection,
			gid.Orc,
			gid.Hectometre,
			md,
			newCountryCode,
			newSender,
			newLanguageCode,
			newDateIssue,
			newReferenceCode,
			newValue,
			newPredicted,
			newValueMin,
			newValueMax,
			newDateInfo,
			newSourceOrganization,
		).Scan(&newID); err != nil {
			feedback.Warn(handleError(err).Error())
			if err := tx.Rollback(); err != nil {
				return nil, err
			}
			ignored++
			continue
		}
		if _, err := tx.StmtContext(ctx, trackStmt).ExecContext(
			ctx, importID, "waterway.gauge_measurements", newID,
		); err != nil {
			return nil, err
		}
		if err := tx.Commit(); err != nil {
			return nil, fmt.Errorf("Commit failed: %v", err)
		}

		n := newAGMLine(
			newCountryCode,
			newSender,
			newLanguageCode,
			newDateIssue,
			newReferenceCode,
			newValue,
			newPredicted,
			newValueMin,
			newValueMax,
			newDateInfo,
			newSourceOrganization,
		)

		ase := &agmSummaryEntry{
			FKGaugeID:   *gid,
			MeasureDate: timetz{md},
		}

		if newEntry {
			ase.Versions = []*agmLine{n}
		} else {
			o := newAGMLine(
				oldCountryCode,
				oldSender,
				oldLanguageCode,
				oldDateIssue,
				oldReferenceCode,
				oldValue,
				oldPredicted,
				oldValueMin,
				oldValueMax,
				oldDateInfo,
				oldSourceOrganization,
			)
			// Ignore if there is no diff.
			if o.Predicted || !n.hasDiff(o) {
				continue
			}
			ase.Versions = []*agmLine{o, n}
		}
		entries = append(entries, ase)
	}

	feedback.Info("Importing approved gauge measurements took %s",
		time.Since(start))

	return entries, nil
}

func newAGMLine(
	countryCode string,
	sender string,
	languageCode string,
	dateIssue time.Time,
	referenceCode string,
	waterLevel float64,
	predicted bool,
	valueMin sql.NullFloat64,
	valueMax sql.NullFloat64,
	dateInfo time.Time,
	sourceOrganization string,
) *agmLine {
	nilFloat := func(v sql.NullFloat64) *float64 {
		var p *float64
		if v.Valid {
			p = &v.Float64
		}
		return p
	}
	return &agmLine{
		CountryCode:        countryCode,
		Sender:             sender,
		LanguageCode:       languageCode,
		DateIssue:          timetz{dateIssue},
		ReferenceCode:      referenceCode,
		WaterLevel:         waterLevel,
		Predicted:          predicted,
		ValueMin:           nilFloat(valueMin),
		ValueMax:           nilFloat(valueMax),
		DateInfo:           timetz{dateInfo},
		SourceOrganization: sourceOrganization,
	}
}