view pkg/imports/agm.go @ 4026:82037bbd2c7c

Do not require review if nothing changed
author Tom Gottfried <tom@intevation.de>
date Mon, 22 Jul 2019 18:35:14 +0200
parents baa51bb82364
children b17453420eff
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Sascha Wilde <wilde@intevation.de>
//  * Tom Gottfried <tom.gottfried@intevation.de>

package imports

import (
	"bufio"
	"context"
	"database/sql"
	"encoding/csv"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"math"
	"os"
	"path/filepath"
	"strconv"
	"strings"
	"time"

	"gemma.intevation.de/gemma/pkg/misc"
	"gemma.intevation.de/gemma/pkg/models"
)

type ApprovedGaugeMeasurements struct {
	Dir        string `json:"dir"`
	Originator string `json:"originator"`
}

// GMAPJobKind is the unique name of an approved gauge measurements import job.
const AGMJobKind JobKind = "agm"

type agmJobCreator struct{}

func init() {
	RegisterJobCreator(AGMJobKind, agmJobCreator{})
}

func (agmJobCreator) AutoAccept() bool { return false }

func (agmJobCreator) Description() string {
	return "approved gauge measurements"
}

func (agmJobCreator) Create() Job { return new(ApprovedGaugeMeasurements) }

func (agmJobCreator) Depends() [2][]string {
	return [2][]string{
		{"gauge_measurements"},
		{"gauges"},
	}
}

const (
	// delete the old  and keep the new measures.
	agmStageDoneDeleteSQL = `
WITH staged AS (
  SELECT key
  FROM import.track_imports
  WHERE import_id = $1 AND
        relation = 'waterway.gauge_measurements'::regclass
),
to_delete AS (
  SELECT o.id AS id
  FROM waterway.gauge_measurements o
  JOIN waterway.gauge_measurements n
    USING (location, measure_date)
    WHERE n.id IN (SELECT key FROM staged)
	  AND o.id NOT IN (SELECT key FROM staged)
)
DELETE FROM waterway.gauge_measurements WHERE id IN (SELECT id from to_delete)`

	agmStageDoneSQL = `
UPDATE waterway.gauge_measurements SET staging_done = true
WHERE id IN (
  SELECT key FROM import.track_imports
  WHERE import_id = $1 AND
    relation = 'waterway.gauge_measurements'::regclass)`
)

func (agmJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
) error {
	_, err := tx.ExecContext(ctx, agmStageDoneDeleteSQL, id)
	if err == nil {
		_, err = tx.ExecContext(ctx, agmStageDoneSQL, id)
	}
	return err
}

// CleanUp removes the folder containing the CSV file with the
// the approved gauge measurements.
func (agm *ApprovedGaugeMeasurements) CleanUp() error {
	return os.RemoveAll(agm.Dir)
}

var guessDate = misc.TimeGuesser([]string{
	"02.01.2006 15:04",
	"2006-01-02T15:04:05-07:00",
}).Guess

type timetz struct{ time.Time }

func (ttz *timetz) MarshalJSON() ([]byte, error) {
	return json.Marshal(ttz.Time.Format("2006-01-02T15:04:05-07:00"))
}

type agmLine struct {
	Location           models.Isrs `json:"fk-gauge-id"`
	CountryCode        string      `json:"country-code"`
	Sender             string      `json:"sender"`
	LanguageCode       string      `json:"language-code"`
	DateIssue          timetz      `json:"date-issue"`
	ReferenceCode      string      `json:"reference-code"`
	MeasureDate        timetz      `json:"measure-date"`
	WaterLevel         float64     `json:"water-level"`
	DateInfo           timetz      `json:"date-info"`
	SourceOrganization string      `json:"source-organization"`
}

func (a *agmLine) hasDiff(b *agmLine) bool {
	const eps = 0.00001
	return a.CountryCode != b.CountryCode ||
		a.Sender != b.Sender ||
		a.LanguageCode != b.LanguageCode ||
		a.ReferenceCode != b.ReferenceCode ||
		math.Abs(a.WaterLevel-b.WaterLevel) > eps ||
		a.SourceOrganization != b.SourceOrganization
}

type agmSummaryEntry struct {
	FKGaugeID   models.Isrs `json:"fk-gauge-id"`
	MeasureDate timetz      `json:"measure-date"`
	Versions    []*agmLine  `json:"versions"`
}

const (
	agmSelectSQL = `
SELECT
  id,
  country_code,
  sender,
  language_code,
  date_issue,
  reference_code,
  water_level,
  date_info,
  source_organization
FROM waterway.gauge_measurements
WHERE
  location
    = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
  AND measure_date = $6
  AND staging_done
`

	agmInsertSQL = `
INSERT INTO waterway.gauge_measurements (
  location,
  validity,
  measure_date,
  country_code,
  sender,
  language_code,
  date_issue,
  reference_code,
  water_level,
  date_info,
  source_organization,
  staging_done
) VALUES (
  ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
  COALESCE(
    (SELECT validity FROM waterway.gauges
       WHERE location
            = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
         AND validity @> CAST($6 AS timestamp with time zone)),
    tstzrange(NULL, NULL)),
  $6,
  $7,
  $8,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14,
  false
)
RETURNING id`

	agmGaugeCheckSQL = `
SELECT EXISTS(
  SELECT 1 FROM waterway.gauges
  WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int))
`
)

var errContinue = errors.New("continue")

// Do executes the actual approved gauge measurements import.
func (agm *ApprovedGaugeMeasurements) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	f, err := os.Open(filepath.Join(agm.Dir, "agm.csv"))
	if err != nil {
		return nil, err
	}
	defer f.Close()

	r := csv.NewReader(bufio.NewReader(f))
	r.Comma = ';'
	r.ReuseRecord = true

	headers, err := r.Read()
	if err != nil {
		return nil, err
	}

	var (
		fkGaugeIDIdx   = -1
		measureDateIdx = -1
		valueIdx       = -1
	)

	headerFields := []struct {
		idx  *int
		name string
	}{
		{&fkGaugeIDIdx, "fk_gauge_id"},
		{&measureDateIdx, "measure_date"},
		{&valueIdx, "value"}, // "water_level",
	}

nextHeader:
	for i, f := range headers {
		h := strings.Replace(strings.ToLower(
			strings.TrimSpace(f)), " ", "_", -1)

		for j := range headerFields {
			if headerFields[j].name == h {
				if *headerFields[j].idx != -1 {
					return nil, fmt.Errorf(
						"There is more than one column namend '%s'", h)
				}
				*headerFields[j].idx = i
				continue nextHeader
			}
		}
	}

	var missing []string
	for i := range headerFields {
		if headerFields[i].name != "unit" && *headerFields[i].idx == -1 {
			missing = append(missing, headerFields[i].name)
		}
	}
	if len(missing) > 0 {
		return nil, fmt.Errorf("Missing columns: %s", strings.Join(missing, ", "))
	}

	gaugeCheckStmt, err := conn.PrepareContext(ctx, agmGaugeCheckSQL)
	if err != nil {
		return nil, err
	}
	defer gaugeCheckStmt.Close()

	selectStmt, err := conn.PrepareContext(ctx, agmSelectSQL)
	if err != nil {
		return nil, err
	}
	defer selectStmt.Close()

	insertStmt, err := conn.PrepareContext(ctx, agmInsertSQL)
	if err != nil {
		return nil, err
	}
	defer insertStmt.Close()

	trackStmt, err := conn.PrepareContext(ctx, trackImportSQL)
	if err != nil {
		return nil, err
	}
	defer trackStmt.Close()

	entries := []*agmSummaryEntry{}

	checkedGauges := map[models.Isrs]bool{}

	warnLimiter := misc.WarningLimiter{Log: feedback.Warn, MaxWarnings: 100}
	warn := warnLimiter.Warn
	defer warnLimiter.Close()

	agmLines := []*agmLine{}
	ignored := 0

lines:
	for line := 1; ; line++ {

		row, err := r.Read()
		switch {
		case err == io.EOF || len(row) == 0:
			feedback.Info("Read %d entries in CSV file", line-1)
			if ignored > 0 {
				feedback.Info("%d entries ignored", ignored)
			}
			if ignored == line-1 {
				return nil, UnchangedError("No entries imported")
			}
			break lines
		case err != nil:
			return nil, fmt.Errorf("CSV parsing failed: %v", err)
		}

		gids := row[fkGaugeIDIdx]
		gid, err := models.IsrsFromString(gids)
		if err != nil {
			return nil, fmt.Errorf("Invalid ISRS code line %d: %v", line, err)
		}

		if exists, found := checkedGauges[*gid]; found {
			if !exists {
				// Just ignore the line since we have already warned
				ignored++
				continue lines
			}
		} else { // not found in gauge cache
			if err := gaugeCheckStmt.QueryRowContext(
				ctx,
				gid.CountryCode,
				gid.LoCode,
				gid.FairwaySection,
				gid.Orc,
				gid.Hectometre,
			).Scan(&exists); err != nil {
				return nil, err
			}
			checkedGauges[*gid] = exists
			if !exists {
				warn("Ignoring data for unknown gauge %s", gid.String())
				ignored++
				continue lines
			}
		}

		md, err := guessDate(row[measureDateIdx])
		if err != nil {
			return nil, fmt.Errorf("Invalid 'measure_date' line %d: %v", line, err)
		}

		newSender := agm.Originator
		newCountryCode := gid.CountryCode
		newLanguageCode := misc.CCtoLang[gid.CountryCode]
		newDateIssue := time.Now()
		newReferenceCode := "ZPG"

		value, err := strconv.ParseFloat(row[valueIdx], 32)
		if err != nil {
			return nil, fmt.Errorf("Invalid 'value' line %d: %v", line, err)
		}
		newValue := value

		newDateInfo := newDateIssue

		newSourceOrganization := newSender

		agmLines = append(agmLines, newAGMLine(
			*gid,
			newCountryCode,
			newSender,
			newLanguageCode,
			newDateIssue,
			newReferenceCode,
			md,
			newValue,
			newDateInfo,
			newSourceOrganization,
		))
	}

agmLines:
	for _, line := range agmLines {
		var (
			oldID                 int64
			oldCountryCode        string
			oldSender             string
			oldLanguageCode       string
			oldDateIssue          time.Time
			oldReferenceCode      string
			oldValue              float64
			oldDateInfo           time.Time
			oldSourceOrganization string
		)

		err = selectStmt.QueryRowContext(
			ctx,
			line.Location.CountryCode,
			line.Location.LoCode,
			line.Location.FairwaySection,
			line.Location.Orc,
			line.Location.Hectometre,
			line.MeasureDate.Time,
		).Scan(
			&oldID,
			&oldCountryCode,
			&oldSender,
			&oldLanguageCode,
			&oldDateIssue,
			&oldReferenceCode,
			&oldValue,
			&oldDateInfo,
			&oldSourceOrganization,
		)

		var newEntry bool
		switch {
		case err == sql.ErrNoRows:
			// Complete new one
			newEntry = true
		case err != nil:
			return nil, err
		}

		switch err := func() error {
			tx, err := conn.BeginTx(ctx, nil)
			if err != nil {
				return err
			}
			defer tx.Rollback()

			var newID int64

			if err := tx.StmtContext(ctx, insertStmt).QueryRowContext(
				ctx,
				line.Location.CountryCode,
				line.Location.LoCode,
				line.Location.FairwaySection,
				line.Location.Orc,
				line.Location.Hectometre,
				line.MeasureDate.Time,
				line.CountryCode,
				line.Sender,
				line.LanguageCode,
				line.DateIssue.Time,
				line.ReferenceCode,
				line.WaterLevel,
				line.DateInfo.Time,
				line.SourceOrganization,
			).Scan(&newID); err != nil {
				warn(handleError(err).Error())
				ignored++
				return errContinue
			}

			if _, err := tx.StmtContext(ctx, trackStmt).ExecContext(
				ctx, importID, "waterway.gauge_measurements", newID,
			); err != nil {
				return err
			}

			if err = tx.Commit(); err != nil {
				err = fmt.Errorf("Commit failed: %v", err)
			}
			return err
		}(); {
		case err == errContinue:
			continue agmLines
		case err != nil:
			return nil, err
		}

		ase := &agmSummaryEntry{
			FKGaugeID:   line.Location,
			MeasureDate: line.MeasureDate,
		}

		if newEntry {
			ase.Versions = []*agmLine{line}
		} else {
			o := newAGMLine(
				line.Location,
				oldCountryCode,
				oldSender,
				oldLanguageCode,
				oldDateIssue,
				oldReferenceCode,
				line.MeasureDate.Time,
				oldValue,
				oldDateInfo,
				oldSourceOrganization,
			)
			// Ignore if there is no diff.
			if !line.hasDiff(o) {
				continue
			}
			ase.Versions = []*agmLine{o, line}
		}
		entries = append(entries, ase)
	}

	if len(entries) == 0 {
		return nil, UnchangedError("No entries with changes")
	}
	feedback.Info("Imported %d entries with changes", len(entries))
	feedback.Info("Importing approved gauge measurements took %s",
		time.Since(start))

	return entries, nil
}

func newAGMLine(
	location models.Isrs,
	countryCode string,
	sender string,
	languageCode string,
	dateIssue time.Time,
	referenceCode string,
	measureDate time.Time,
	waterLevel float64,
	dateInfo time.Time,
	sourceOrganization string,
) *agmLine {
	return &agmLine{
		Location:           location,
		CountryCode:        countryCode,
		Sender:             sender,
		LanguageCode:       languageCode,
		DateIssue:          timetz{dateIssue},
		ReferenceCode:      referenceCode,
		MeasureDate:        timetz{measureDate},
		WaterLevel:         waterLevel,
		DateInfo:           timetz{dateInfo},
		SourceOrganization: sourceOrganization,
	}
}