view pkg/imports/agm.go @ 2104:c9af355d4a2c

staging: display stretch name
author Thomas Junk <thomas.junk@intevation.de>
date Mon, 04 Feb 2019 14:35:47 +0100
parents 8a986d80e1c6
children 58a28715e386
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"bufio"
	"context"
	"database/sql"
	"encoding/csv"
	"fmt"
	"io"
	"os"
	"path/filepath"
	"strconv"
	"strings"
	"time"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/misc"
	"gemma.intevation.de/gemma/pkg/models"
)

type ApprovedGaugeMeasurements struct {
	Dir string `json:"dir"`
}

// GMAPJobKind is the unique name of an approved gauge measurements import job.
const AGMJobKind JobKind = "agm"

type agmJobCreator struct{}

func init() {
	RegisterJobCreator(AGMJobKind, agmJobCreator{})
}

func (agmJobCreator) AutoAccept() bool { return false }

func (agmJobCreator) Description() string {
	return "approved gauge measurements"
}

func (agmJobCreator) Create(_ JobKind, data string) (Job, error) {
	agm := new(ApprovedGaugeMeasurements)
	if err := common.FromJSONString(data, agm); err != nil {
		return nil, err
	}
	return agm, nil
}

func (agmJobCreator) Depends() []string {
	return []string{
		"gauges",
		"gauge_measurements",
	}
}

const (
	// delete the old  and keep the new measures.
	agmStageDoneDeleteSQL = `
WITH staged AS (
  SELECT key
  FROM import.track_imports
  WHERE import_id = $1 AND
        relation = 'waterway.gauge_measurements'::regclass
),
to_delete AS (
  SELECT o.id AS id
  FROM waterway.gauge_measurements o
  JOIN waterway.gauge_measurements n
    ON n.fk_gauge_id = o.fk_gauge_id AND n.measure_date = o.measure_date
    WHERE n.id     IN (SELECT key FROM staged)
	  AND o.id NOT IN (SELECT key FROM staged)
)
DELETE FROM waterway.gauge_measurements WHERE id IN (SELECT id from to_delete)`

	agmStageDoneSQL = `
UPDATE waterway.gauge_measurements SET staging_done = true
WHERE id IN (
  SELECT key FROM import.track_imports
  WHERE import_id = $1 AND
    relation = 'waterway.gauge_measurements'::regclass)`
)

func (agmJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
) error {
	_, err := tx.ExecContext(ctx, agmStageDoneDeleteSQL, id)
	if err == nil {
		_, err = tx.ExecContext(ctx, agmStageDoneSQL, id)
	}
	return err
}

// CleanUp removes the folder containing the CSV file with the
// the approved gauge measurements.
func (agm *ApprovedGaugeMeasurements) CleanUp() error {
	return os.RemoveAll(agm.Dir)
}

var guessDate = misc.TimeGuesser([]string{
	"02.01.2006 15:04",
	"2006-01-02T15:04:05-07:00",
}).Guess

// Do executes the actual approved gauge measurements import.
func (agm *ApprovedGaugeMeasurements) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	f, err := os.Open(filepath.Join(agm.Dir, "agm.csv"))
	if err != nil {
		return nil, err
	}
	defer f.Close()

	r := csv.NewReader(bufio.NewReader(f))
	r.Comma = ';'
	r.ReuseRecord = true

	headers, err := r.Read()
	if err != nil {
		return nil, err
	}

	var (
		fkGaugeIDIdx     = -1
		measureDateIdx   = -1
		fromIdx          = -1
		languageCodeIdx  = -1
		countryCodeIdx   = -1
		dateIssueIdx     = -1
		referenceCodeIdx = -1
		valueIdx         = -1
		predictedIdx     = -1
		valueMinIdx      = -1
		valueMaxIdx      = -1
		dateInfoIdx      = -1
		originatorIdx    = -1
		unitIdx          = -1
	)

	headerFields := []struct {
		idx  *int
		name string
	}{
		{&fkGaugeIDIdx, "fk_gauge_id"},
		{&measureDateIdx, "measure_date"},
		{&fromIdx, "from"}, // "sender",
		{&languageCodeIdx, "language_code"},
		{&countryCodeIdx, "country_code"},
		{&dateIssueIdx, "date_issue"},
		{&referenceCodeIdx, "reference_code"},
		{&valueIdx, "value"}, // "water_level",
		{&predictedIdx, "predicted"},
		// "is_waterlevel",
		{&valueMinIdx, "value_min"},
		{&valueMaxIdx, "value_max"},
		{&dateInfoIdx, "date_info"},
		{&originatorIdx, "originator"}, // "source_organization",
		{&unitIdx, "unit"},
	}

nextHeader:
	for i, f := range headers {
		h := strings.Replace(strings.ToLower(
			strings.TrimSpace(f)), " ", "_", -1)

		for j := range headerFields {
			if headerFields[j].name == h {
				if *headerFields[j].idx != -1 {
					return nil, fmt.Errorf(
						"There is more than one column namend '%s'", h)
				}
				*headerFields[j].idx = i
				continue nextHeader
			}
		}
	}

	var missing []string

	for i := range headerFields {
		if headerFields[i].name != "unit" && *headerFields[i].idx == -1 {
			missing = append(missing, headerFields[i].name)
		}
	}

	if len(missing) > 0 {
		return nil, fmt.Errorf("Missing columns: %s", strings.Join(missing, ", "))
	}

	inCm, _ := rescale("cm")
	scaler := func(row []string) (func(float32) float32, error) {
		if unitIdx == -1 {
			return inCm, nil
		}
		unit := row[unitIdx]
		if unit == "cm" {
			return inCm, nil
		}
		s, err := rescale(unit)
		return s, err
	}

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	insertStmt, err := tx.PrepareContext(ctx, insertGMSQL)
	if err != nil {
		return nil, err
	}
	defer insertStmt.Close()
	trackStmt, err := tx.PrepareContext(ctx, trackImportSQL)
	if err != nil {
		return nil, err
	}
	defer trackStmt.Close()

	ids := []int64{}

	args := make([]interface{}, 19)

	args[18] = false // staging_done

lines:
	for line := 1; ; line++ {

		row, err := r.Read()
		switch {
		case err == io.EOF || len(row) == 0:
			break lines
		case err != nil:
			return nil, fmt.Errorf("CSV parsing failed: %v", err)
		}
		convert, err := scaler(row)
		if err != nil {
			return nil, fmt.Errorf("line %d: %v", line, err)
		}

		gids := row[fkGaugeIDIdx]
		gid, err := models.IsrsFromString(gids)
		if err != nil {
			return nil, fmt.Errorf("Invalid ISRS code line %d: %v", line, err)
		}

		args[0] = gid.CountryCode
		args[1] = gid.LoCode
		args[2] = gid.FairwaySection
		args[3] = gid.Orc
		args[4] = gid.Hectometre

		md, err := guessDate(row[measureDateIdx])
		if err != nil {
			return nil, fmt.Errorf("Invalid 'measure_date' line %d: %v", line, err)
		}
		args[5] = md

		args[6] = row[fromIdx]
		args[7] = row[languageCodeIdx]
		args[8] = row[countryCodeIdx]

		dis, err := guessDate(row[dateIssueIdx])
		if err != nil {
			return nil, fmt.Errorf("Invalid 'date_issue' line %d: %v", line, err)
		}
		args[9] = dis

		args[10] = row[referenceCodeIdx]

		value, err := strconv.ParseFloat(row[valueIdx], 32)
		if err != nil {
			return nil, fmt.Errorf("Invalid 'value' line %d: %v", line, err)
		}
		args[11] = convert(float32(value))

		predicted := strings.ToLower(row[predictedIdx]) == "true"
		args[12] = predicted

		args[13] = true // is_waterlevel

		valueMin, err := strconv.ParseFloat(row[valueMinIdx], 32)
		if err != nil {
			return nil, fmt.Errorf("Invalid 'value_min' line %d: %v", line, err)
		}
		args[14] = convert(float32(valueMin))

		valueMax, err := strconv.ParseFloat(row[valueMaxIdx], 32)
		if err != nil {
			return nil, fmt.Errorf("Invalid 'value_max' line %d: %v", line, err)
		}
		args[15] = convert(float32(valueMax))

		din, err := guessDate(row[dateInfoIdx])
		if err != nil {
			return nil, fmt.Errorf("Invalid 'date_info' line %d: %v", line, err)
		}
		args[16] = din

		args[17] = row[originatorIdx]

		// args[18] (staging_done) is set to true outside the loop.

		var id int64
		if err := insertStmt.QueryRowContext(ctx, args...).Scan(&id); err != nil {
			return nil, fmt.Errorf("Failed to insert line %d: %v", line, err)
		}
		ids = append(ids, id)

		if _, err := trackStmt.ExecContext(
			ctx, importID, "waterway.gauge_measurements", id,
		); err != nil {
			return nil, err
		}
	}

	if err := tx.Commit(); err != nil {
		return nil, fmt.Errorf("Commit failed: %v", err)
	}

	feedback.Info("Importing approved gauge measurements took %s",
		time.Since(start))

	summary := struct {
		IDs []int64 `json:"ids"`
	}{
		IDs: ids,
	}
	return &summary, nil
}