view pkg/imports/agm.go @ 2006:35acb7f9ae0c

Do anything else before expectedly failing role creation Creating roles during database setup expectedly fails in case there already is another gemma database in the cluster. Doing it at the end of the transaction ensures it does not hide errors in other commands in the script. In passing, add the default admin via the designated view to ensure it will become a correctly set up application user.
author Tom Gottfried <tom@intevation.de>
date Thu, 24 Jan 2019 17:23:43 +0100
parents 59055c8301df
children 8a986d80e1c6
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"bufio"
	"context"
	"database/sql"
	"encoding/csv"
	"fmt"
	"io"
	"os"
	"path/filepath"
	"strconv"
	"strings"
	"time"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/misc"
	"gemma.intevation.de/gemma/pkg/models"
)

type ApprovedGaugeMeasurements struct {
	Dir string `json:"dir"`
}

// GMAPJobKind is the unique name of an approved gauge measurements import job.
const AGMJobKind JobKind = "agm"

type agmJobCreator struct{}

func init() {
	RegisterJobCreator(AGMJobKind, agmJobCreator{})
}

func (agmJobCreator) AutoAccept() bool { return false }

func (agmJobCreator) Description() string {
	return "approved gauge measurements"
}

func (agmJobCreator) Create(_ JobKind, data string) (Job, error) {
	agm := new(ApprovedGaugeMeasurements)
	if err := common.FromJSONString(data, agm); err != nil {
		return nil, err
	}
	return agm, nil
}

func (agmJobCreator) Depends() []string {
	return []string{
		"gauges",
		"gauge_measurements",
	}
}

const (
	// delete the old  and keep the new measures.
	agmStageDoneDeleteSQL = `
WITH staged AS (
  SELECT key
  FROM import.track_imports
  WHERE import_id = $1 AND
        relation = 'waterway.gauge_measurements'::regclass
),
to_delete AS (
  SELECT o.id AS id
  FROM waterway.gauge_measurements o
  JOIN waterway.gauge_measurements n
    ON n.fk_gauge_id = o.fk_gauge_id AND n.measure_date = o.measure_date
    WHERE n.id     IN (SELECT key FROM staged)
	  AND o.id NOT IN (SELECT key FROM staged)
)
DELETE FROM waterway.gauge_measurements WHERE id IN (SELECT id from to_delete)`

	agmStageDoneSQL = `
UPDATE waterway.gauge_measurements SET staging_done = true
WHERE id IN (
  SELECT key FROM import.track_imports
  WHERE import_id = $1 AND
    relation = 'waterway.gauge_measurements'::regclass)`
)

func (agmJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
) error {
	_, err := tx.ExecContext(ctx, agmStageDoneDeleteSQL, id)
	if err == nil {
		_, err = tx.ExecContext(ctx, agmStageDoneSQL, id)
	}
	return err
}

// CleanUp removes the folder containing the CSV file with the
// the approved gauge measurements.
func (agm *ApprovedGaugeMeasurements) CleanUp() error {
	return os.RemoveAll(agm.Dir)
}

var guessDate = misc.TimeGuesser([]string{
	"02.01.2006 15:04",
	"2006-01-02T15:04:05-07:00",
}).Guess

// Do executes the actual approved gauge measurements import.
func (agm *ApprovedGaugeMeasurements) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	f, err := os.Open(filepath.Join(agm.Dir, "agm.csv"))
	if err != nil {
		return nil, err
	}
	defer f.Close()

	r := csv.NewReader(bufio.NewReader(f))
	r.Comma = ';'
	r.ReuseRecord = true

	headers, err := r.Read()
	if err != nil {
		return nil, err
	}

	headerIndices := map[string]int{}

	for i, f := range headers {
		headerIndices[strings.Replace(
			strings.ToLower(
				strings.TrimSpace(f)), " ", "_", -1)] = i
	}

	var missing []string

	for _, m := range [...]string{
		"fk_gauge_id",
		"measure_date",
		"from", // "sender",
		"language_code",
		"country_code",
		"date_issue",
		"reference_code",
		"value", // "water_level",
		"predicted",
		// "is_waterlevel",
		"value_min",
		"value_max",
		"date_info",
		"originator", // "source_organization",
	} {
		if _, found := headerIndices[m]; !found {
			missing = append(missing, m)
		}
	}

	if len(missing) > 0 {
		return nil, fmt.Errorf("Missing columns: %s", strings.Join(missing, ", "))
	}

	inCm, _ := rescale("cm")
	scaler := func(row []string) (func(float32) float32, error) {
		idx, found := headerIndices["unit"]
		if !found {
			return inCm, nil
		}
		unit := row[idx]
		if unit == "cm" {
			return inCm, nil
		}
		s, err := rescale(unit)
		return s, err
	}

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	insertStmt, err := tx.PrepareContext(ctx, insertGMSQL)
	if err != nil {
		return nil, err
	}
	defer insertStmt.Close()
	trackStmt, err := tx.PrepareContext(ctx, trackImportSQL)
	if err != nil {
		return nil, err
	}
	defer trackStmt.Close()

	ids := []int64{}

	args := make([]interface{}, 19)

	args[18] = false // staging_done

lines:
	for line := 1; ; line++ {

		row, err := r.Read()
		switch {
		case err == io.EOF || len(row) == 0:
			break lines
		case err != nil:
			return nil, fmt.Errorf("CSV parsing failed: %v", err)
		}
		convert, err := scaler(row)
		if err != nil {
			return nil, fmt.Errorf("line %d: %v", line, err)
		}

		gids := row[headerIndices["fk_gauge_id"]]
		gid, err := models.IsrsFromString(gids)
		if err != nil {
			return nil, fmt.Errorf("Invalid ISRS code line %d: %v", line, err)
		}

		args[0] = gid.CountryCode
		args[1] = gid.LoCode
		args[2] = gid.FairwaySection
		args[3] = gid.Orc
		args[4] = gid.Hectometre

		md, err := guessDate(row[headerIndices["measure_date"]])
		if err != nil {
			return nil, fmt.Errorf("Invalid 'measure_date' line %d: %v", line, err)
		}
		args[5] = md

		args[6] = row[headerIndices["from"]]
		args[7] = row[headerIndices["language_code"]]
		args[8] = row[headerIndices["country_code"]]

		dis, err := guessDate(row[headerIndices["date_issue"]])
		if err != nil {
			return nil, fmt.Errorf("Invalid 'date_issue' line %d: %v", line, err)
		}
		args[9] = dis

		args[10] = row[headerIndices["reference_code"]]

		value, err := strconv.ParseFloat(row[headerIndices["value"]], 32)
		if err != nil {
			return nil, fmt.Errorf("Invalid 'value' line %d: %v", line, err)
		}
		args[11] = convert(float32(value))

		predicted := strings.ToLower(row[headerIndices["predicted"]]) == "true"
		args[12] = predicted

		args[13] = true // is_waterlevel

		valueMin, err := strconv.ParseFloat(row[headerIndices["value_min"]], 32)
		if err != nil {
			return nil, fmt.Errorf("Invalid 'value_min' line %d: %v", line, err)
		}
		args[14] = convert(float32(valueMin))

		valueMax, err := strconv.ParseFloat(row[headerIndices["value_max"]], 32)
		if err != nil {
			return nil, fmt.Errorf("Invalid 'value_max' line %d: %v", line, err)
		}
		args[15] = convert(float32(valueMax))

		din, err := guessDate(row[headerIndices["date_info"]])
		if err != nil {
			return nil, fmt.Errorf("Invalid 'date_info' line %d: %v", line, err)
		}
		args[16] = din

		args[17] = row[headerIndices["originator"]]

		// args[18] (staging_done) is set to true outside the loop.

		var id int64
		if err := insertStmt.QueryRowContext(ctx, args...).Scan(&id); err != nil {
			return nil, fmt.Errorf("Failed to insert line %d: %v", line, err)
		}
		ids = append(ids, id)

		if _, err := trackStmt.ExecContext(
			ctx, importID, "waterway.gauge_measurements", id,
		); err != nil {
			return nil, err
		}
	}

	if err := tx.Commit(); err != nil {
		return nil, fmt.Errorf("Commit failed: %v", err)
	}

	feedback.Info("Importing approved gauge measurements took %s",
		time.Since(start))

	summary := struct {
		IDs []int64 `json:"ids"`
	}{
		IDs: ids,
	}
	return &summary, nil
}