view pkg/imports/wp.go @ 2084:ddbac0f22ffb

Waterway profiles import: Restructured code a bit in preparation of downloading Profile geometries from WFS first (needs to be done).
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 31 Jan 2019 16:32:18 +0100
parents 9318973487a1
children 6096ec4951f8
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"bufio"
	"context"
	"database/sql"
	"encoding/csv"
	"fmt"
	"io"
	"os"
	"path/filepath"
	"strconv"
	"strings"
	"time"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/misc"
	"gemma.intevation.de/gemma/pkg/models"
	"github.com/jackc/pgx/pgtype"
)

type WaterwayProfiles struct {
	Dir string `json:"dir"`
}

const WPJobKind JobKind = "wp"

type wpJobCreator struct{}

func init() {
	RegisterJobCreator(WPJobKind, wpJobCreator{})
}

func (wpJobCreator) Create(_ JobKind, data string) (Job, error) {
	wp := new(WaterwayProfiles)
	if err := common.FromJSONString(data, wp); err != nil {
		return nil, err
	}
	return wp, nil
}

func (wpJobCreator) AutoAccept() bool { return false }

func (wpJobCreator) Description() string {
	return "waterway profiles"
}

func (wpJobCreator) Depends() []string {
	return []string{
		"waterway_profiles",
	}
}

const (
	insertWaterwayProfileSQL = `
INSERT INTO waterway.waterway_profiles (
  location,
  validity,
  lnwl,
  mwl,
  hnwl,
  fe30,
  fe100,
  date_info,
  source_organization
) VALUES (
  ($1, $2, $3, $4, $5),
  $6,
  $7,
  $8,
  $9,
  $10,
  $11,
  $12,
  $13
) RETURNING id`

	wpStageDoneSQL = `
UPDATE waterway.waterway_profiles SET staging_done = true
WHERE id IN (
  SELECT key FROM import.track_imports
  WHERE import_id = $1 AND
    relation = 'waterway.waterway_profiles'::regclass)`
)

func (wpJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
) error {
	_, err := tx.ExecContext(ctx, wpStageDoneSQL, id)
	return err
}

func (wp *WaterwayProfiles) CleanUp() error {
	return os.RemoveAll(wp.Dir)
}

func parseFloat64(s string) (sql.NullFloat64, error) {
	if s == "" {
		return sql.NullFloat64{}, nil
	}
	s = strings.Replace(s, ",", ".", -1)
	v, err := strconv.ParseFloat(s, 64)
	if err != nil {
		return sql.NullFloat64{}, err
	}
	return sql.NullFloat64{Float64: v, Valid: true}, nil
}

func (wp *WaterwayProfiles) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {
	start := time.Now()

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	//  TODO: Download profile geometries from WFS.

	summary, err := wp.processCSV(ctx, importID, tx, start, feedback)
	if err != nil {
		return nil, fmt.Errorf("error processing CVS: %v", err)
	}

	if err := tx.Commit(); err != nil {
		return nil, fmt.Errorf(
			"Importing waterway profiles failed after %s: %v",
			time.Since(start), err)
	}

	feedback.Info("Importing waterway profiles took %s",
		time.Since(start))
	return summary, nil
}

func (wp *WaterwayProfiles) processCSV(
	ctx context.Context,
	importID int64,
	tx *sql.Tx,
	start time.Time,
	feedback Feedback,
) (interface{}, error) {

	f, err := os.Open(filepath.Join(wp.Dir, "wp.csv"))
	if err != nil {
		return nil, err
	}
	defer f.Close()

	r := csv.NewReader(bufio.NewReader(f))
	r.Comma = ';'
	r.ReuseRecord = true

	headers, err := r.Read()
	if err != nil {
		return nil, err
	}

	var (
		locationIdx  = -1
		validFromIdx = -1
		validToIdx   = -1
		lnwlIdx      = -1
		mwlIdx       = -1
		hnwlIdx      = -1
		fe30Idx      = -1
		fe100Idx     = -1
		dateInfoIdx  = -1
		sourceIdx    = -1
	)

	fields := []struct {
		idx    *int
		substr string
	}{
		{&locationIdx, "location"},
		{&validFromIdx, "valid_from"},
		{&validToIdx, "valid_to"},
		{&lnwlIdx, "lnwl"},
		{&mwlIdx, "mwl"},
		{&hnwlIdx, "hnwl"},
		{&fe30Idx, "fe30"},
		{&fe100Idx, "fe100"},
		{&dateInfoIdx, "date_info"},
		{&sourceIdx, "source"},
	}

nextHeader:
	for i, h := range headers {
		h = strings.ToLower(h)
		for j := range fields {
			if strings.Contains(h, fields[j].substr) {
				if *fields[j].idx != -1 {
					return nil, fmt.Errorf(
						"CSV has more than one column with name containing '%s'",
						fields[j].substr)
				}
				*fields[j].idx = i
				continue nextHeader
			}
		}
	}

	var missing []string
	for i := range fields {
		if *fields[i].idx == -1 {
			missing = append(missing, fields[i].substr)
		}
	}
	if len(missing) > 0 {
		return nil, fmt.Errorf(
			"CSV is missing columns: %s",
			strings.Join(missing, ", "))
	}

	parseDate := misc.TimeGuesser([]string{"02.01.2006"}).Guess

	insertStmt, err := tx.PrepareContext(ctx, insertWaterwayProfileSQL)
	if err != nil {
		return nil, err
	}
	defer insertStmt.Close()

	trackStmt, err := tx.PrepareContext(ctx, trackImportSQL)
	if err != nil {
		return nil, err
	}
	defer trackStmt.Close()

	var ids []int64

lines:
	for line := 1; ; line++ {

		row, err := r.Read()
		switch {
		case err == io.EOF || len(row) == 0:
			break lines
		case err != nil:
			return nil, fmt.Errorf("CSV parsing failed: %v", err)
		}

		location, err := models.IsrsFromString(row[locationIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid ISRS location code in line %d: %v",
				line, err)
		}

		validFromTime, err := parseDate(row[validFromIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid 'valid_from' value in line %d: %v",
				line, err)
		}
		validToTime, err := parseDate(row[validToIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid 'valid_to' value in line %d: %v",
				line, err)
		}

		validFrom := pgtype.Timestamptz{
			Time:   validFromTime,
			Status: pgtype.Present,
		}

		validTo := pgtype.Timestamptz{
			Time:   validToTime,
			Status: pgtype.Present,
		}

		validity := pgtype.Tstzrange{
			Lower:     validFrom,
			Upper:     validTo,
			LowerType: pgtype.Inclusive,
			UpperType: pgtype.Exclusive,
			Status:    pgtype.Present,
		}

		lnwl, err := parseFloat64(row[lnwlIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid 'lnwl' value in line %d: %v",
				line, err)
		}
		mwl, err := parseFloat64(row[mwlIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid 'mwl' value in line %d: %v",
				line, err)
		}
		hnwl, err := parseFloat64(row[hnwlIdx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid 'hnwl' value in line %d: %v",
				line, err)
		}
		fe30, err := parseFloat64(row[fe30Idx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid 'fe30' value in line %d: %v",
				line, err)
		}
		fe100, err := parseFloat64(row[fe100Idx])
		if err != nil {
			return nil, fmt.Errorf(
				"Invalid 'fe100' value in line %d: %v",
				line, err)
		}

		var dateInfo time.Time

		if di := row[dateInfoIdx]; di == "" {
			dateInfo = start
		} else if dateInfo, err = parseDate(di); err != nil {
			return nil, fmt.Errorf(
				"Invalid 'date_info' value in line %d: %v",
				line, err)
		}

		source := row[sourceIdx]

		var id int64
		if err := insertStmt.QueryRowContext(
			ctx,
			location.CountryCode,
			location.LoCode,
			location.FairwaySection,
			location.Orc,
			location.Hectometre,
			&validity,
			lnwl,
			mwl,
			hnwl,
			fe30,
			fe100,
			dateInfo,
			source,
		).Scan(&id); err != nil {
			return nil, err
		}

		if _, err := trackStmt.ExecContext(
			ctx, importID, "waterway.waterway_profiles", id); err != nil {
			return nil, err
		}
		ids = append(ids, id)
	}
	if len(ids) == 0 {
		return nil, UnchangedError("No new entries in waterway profiles.")
	}

	feedback.Info("%d new entries in waterway profiles.", len(ids))

	summary := struct {
		IDs []int64 `json:"ids"`
	}{
		IDs: ids,
	}
	return &summary, nil
}