view pkg/imports/stsh.go @ 5560:f2204f91d286

Join the log lines of imports to the log exports to recover data from them. Used in SR export to extract information that where in the meta json but now are only found in the log.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 09 Feb 2022 18:34:40 +0100
parents 59a99655f34d
children 6270951dda28
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"archive/zip"
	"context"
	"database/sql"
	"errors"
	"fmt"
	"os"
	"path"
	"path/filepath"
	"strings"
	"time"

	shp "github.com/jonas-p/go-shp"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/models"
)

// StretchShape imports stretches from an uploaded shape file.
type StretchShape struct {
	Dir string `json:"dir"`
}

// STSHJobKind is the import queue type identifier.
const STSHJobKind JobKind = "stsh"

type stshJobCreator struct{}

func init() { RegisterJobCreator(STSHJobKind, stshJobCreator{}) }

func (stshJobCreator) Description() string { return "stretch from shape" }

func (stshJobCreator) AutoAccept() bool { return false }

func (stshJobCreator) Create() Job { return new(StretchShape) }

func (stshJobCreator) Depends() [2][]string {
	return [2][]string{{"stretches", "stretch_countries"}}
}

const (
	stshInsertSQL = `
INSERT INTO users.stretches (
  name,
  stretch,
  area,
  objnam,
  nobjnam,
  date_info,
  source_organization
) VALUES (
  $1,
  isrsrange(isrs_fromText($2), isrs_fromText($3)),
  ST_GeomFromWKB($4, 4326),
  $5,
  $6,
  $7,
  $8)
RETURNING id`
)

// StageDone is merely the same as for the normal stretch import.
func (stshJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
	feedback Feedback,
) error {
	return stJobCreator{}.StageDone(ctx, tx, id, feedback)
}

// CleanUp removes the folder with the uploaded shape file.
func (stsh *StretchShape) CleanUp() error {
	return os.RemoveAll(stsh.Dir)
}

func fixAttribute(s string) string {
	return strings.TrimRight(s, "\x00")
}

type stretchSummary struct {
	ID        int64                  `json:"id"`
	Name      string                 `json:"name"`
	From      string                 `json:"from"`
	To        string                 `json:"to"`
	ObjNam    string                 `json:"objnam"`
	NObjNam   *string                `json:"nobjnam"`
	Date      models.Date            `json:"date-info"`
	Countries models.UniqueCountries `json:"countries"`
}

func parseUniqueCountries(s string) (models.UniqueCountries, error) {
	unique := map[models.Country]struct{}{}
	var countries models.UniqueCountries
	for _, c := range strings.Split(s, ",") {
		if c = strings.ToUpper(strings.TrimSpace(c)); c == "" {
			continue
		}
		n := models.Country(c)
		if !n.Valid() {
			return nil, fmt.Errorf("'%s' is not a valid country code", c)
		}
		if _, found := unique[n]; found {
			return nil, fmt.Errorf("'%s' is not a unique country code", c)
		}
		countries = append(countries, n)
		unique[n] = struct{}{}
	}
	return countries, nil
}

// Do executes the actual stretch import from the shape file.
func (stsh *StretchShape) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()
	defer func() {
		feedback.Info("Storing stretches from shape file took %v.",
			time.Since(start))
	}()

	zpath := filepath.Join(stsh.Dir, "stretch.zip")

	z, err := zip.OpenReader(zpath)
	if err != nil {
		return nil, err
	}
	defer z.Close()

	shpF := common.FindInZIP(z, ".shp")
	if shpF == nil {
		return nil, errors.New("no SHP file found in ZIP")
	}
	prefix := strings.TrimSuffix(shpF.Name, path.Ext(shpF.Name))
	dbfF := common.FindInZIP(z, prefix+".dbf")
	if dbfF == nil {
		return nil, fmt.Errorf("no DBF file found for %s", shpF.Name)
	}

	shpR, err := shpF.Open()
	if err != nil {
		return nil, err
	}

	dbfR, err := dbfF.Open()
	if err != nil {
		shpR.Close()
		return nil, err
	}
	sr := shp.SequentialReaderFromExt(shpR, dbfR)
	defer sr.Close()

	fields := sr.Fields()

	// Map the attribute column indices.

	var (
		nameIdx      = -1
		objnamIdx    = -1
		nobjnamIdx   = -1
		lowerIdx     = -1
		upperIdx     = -1
		sourceIdx    = -1
		dateInfoIdx  = -1
		countriesIdx = -1
	)

	type index struct {
		name string
		idx  *int
	}

	indices := []index{
		{"name", &nameIdx},
		{"objnam", &objnamIdx},
		{"nobjnam", &nobjnamIdx},
		{"lower", &lowerIdx},
		{"upper", &upperIdx},
		{"source", &sourceIdx},
		{"source", &sourceIdx},
		{"dateinfo", &dateInfoIdx},
		{"countries", &countriesIdx},
	}

nextField:
	for i := range fields {
		name := strings.ToLower(fields[i].String())
		for j := range indices {
			if name == indices[j].name {
				*indices[j].idx = i
				continue nextField
			}
		}
	}

	var missingFields []string

	for i := range indices {
		if *indices[i].idx == -1 {
			missingFields = append(missingFields, indices[i].name)
		}
	}

	if len(missingFields) > 0 {
		return nil, fmt.Errorf("missing fields in attributes: %s",
			strings.Join(missingFields, ", "))
	}

	// Now we have ensured that all columns are in place
	// so start extracting data from the shape file.

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	insStmt, err := tx.PrepareContext(ctx, stshInsertSQL)
	if err != nil {
		return nil, err
	}
	defer insStmt.Close()

	insCountryStmt, err := tx.PrepareContext(ctx, stInsertCountrySQL)
	if err != nil {
		return nil, err
	}
	defer insCountryStmt.Close()

	trackStmt, err := tx.PrepareContext(ctx, trackImportSQL)
	if err != nil {
		return nil, err
	}
	defer trackStmt.Close()

	var stretches []*stretchSummary

	for sr.Next() {

		_, p := sr.Shape()
		if p == nil {
			feedback.Warn("Invalid NULL geometry found.")
			continue
		}
		poly, err := shapeToPolygon(p)
		if err != nil {
			feedback.Warn("Invalid geometry found: %v.", err)
			continue
		}

		var (
			name     = fixAttribute(sr.Attribute(nameIdx))
			objnam   = fixAttribute(sr.Attribute(objnamIdx))
			nobjnam  = fixAttribute(sr.Attribute(nobjnamIdx))
			lower    = fixAttribute(sr.Attribute(lowerIdx))
			upper    = fixAttribute(sr.Attribute(upperIdx))
			dateInfo = fixAttribute(sr.Attribute(dateInfoIdx))
			source   = fixAttribute(sr.Attribute(sourceIdx))
			cnts     = fixAttribute(sr.Attribute(countriesIdx))
		)

		feedback.Info("Importing stretch %s (%s - %s).",
			name, lower, upper)

		date, err := common.ParseTime(dateInfo)
		if err != nil {
			feedback.Warn("Invalid time value: %v.", err)
			continue
		}

		countries, err := parseUniqueCountries(cnts)
		if err != nil {
			feedback.Warn("Countries: %v.", err)
			continue
		}

		var nobjnamNull sql.NullString
		if nobjnam != "" {
			nobjnamNull = sql.NullString{
				String: nobjnam,
				Valid:  true,
			}
		}

		// Convert to a multi polygon.
		area := poly.MultiPolygonGeom().AsWKB()

		var id int64

		if err := insStmt.QueryRowContext(
			ctx,
			name,
			lower, upper,
			area,
			objnam,
			nobjnamNull,
			date,
			source,
		).Scan(&id); err != nil {
			return nil, err
		}

		// Store the countries
		for _, country := range countries {
			if _, err := insCountryStmt.ExecContext(ctx, id, country); err != nil {
				return nil, err
			}
		}

		// Finally track the stretch

		if _, err := trackStmt.ExecContext(
			ctx,
			importID,
			"users.stretches",
			id,
		); err != nil {
			return nil, err
		}

		stretch := &stretchSummary{
			ID:        id,
			Name:      name,
			From:      lower,
			To:        upper,
			ObjNam:    objnam,
			Date:      models.Date{Time: date},
			Countries: countries,
		}

		if nobjnamNull.Valid {
			stretch.NObjNam = &nobjnamNull.String
		}

		stretches = append(stretches, stretch)
	}

	if err := sr.Err(); err != nil {
		return nil, err
	}

	if len(stretches) == 0 {
		return nil, UnchangedError("No stretches written.")
	}

	if err := tx.Commit(); err != nil {
		return nil, err
	}

	return stretches, nil
}