view pkg/imports/stsh.go @ 5670:b75d0b303328

Various fixes and improvements of gauges import: - Allow update of erased data (and thereby set erased to false) - Fix source_organization to work with ERDMS2 - Give ISRS of new and updated gauges in summary - Fixed reference of null pointers if revlevels are missing - Fixed reference of null pointer on update errors - Added ISRS to reference_code warning
author Sascha Wilde <wilde@sha-bang.de>
date Fri, 08 Dec 2023 17:29:56 +0100
parents 59a99655f34d
children 6270951dda28
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"archive/zip"
	"context"
	"database/sql"
	"errors"
	"fmt"
	"os"
	"path"
	"path/filepath"
	"strings"
	"time"

	shp "github.com/jonas-p/go-shp"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/models"
)

// StretchShape imports stretches from an uploaded shape file.
type StretchShape struct {
	Dir string `json:"dir"`
}

// STSHJobKind is the import queue type identifier.
const STSHJobKind JobKind = "stsh"

type stshJobCreator struct{}

func init() { RegisterJobCreator(STSHJobKind, stshJobCreator{}) }

func (stshJobCreator) Description() string { return "stretch from shape" }

func (stshJobCreator) AutoAccept() bool { return false }

func (stshJobCreator) Create() Job { return new(StretchShape) }

func (stshJobCreator) Depends() [2][]string {
	return [2][]string{{"stretches", "stretch_countries"}}
}

const (
	stshInsertSQL = `
INSERT INTO users.stretches (
  name,
  stretch,
  area,
  objnam,
  nobjnam,
  date_info,
  source_organization
) VALUES (
  $1,
  isrsrange(isrs_fromText($2), isrs_fromText($3)),
  ST_GeomFromWKB($4, 4326),
  $5,
  $6,
  $7,
  $8)
RETURNING id`
)

// StageDone is merely the same as for the normal stretch import.
func (stshJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
	feedback Feedback,
) error {
	return stJobCreator{}.StageDone(ctx, tx, id, feedback)
}

// CleanUp removes the folder with the uploaded shape file.
func (stsh *StretchShape) CleanUp() error {
	return os.RemoveAll(stsh.Dir)
}

func fixAttribute(s string) string {
	return strings.TrimRight(s, "\x00")
}

type stretchSummary struct {
	ID        int64                  `json:"id"`
	Name      string                 `json:"name"`
	From      string                 `json:"from"`
	To        string                 `json:"to"`
	ObjNam    string                 `json:"objnam"`
	NObjNam   *string                `json:"nobjnam"`
	Date      models.Date            `json:"date-info"`
	Countries models.UniqueCountries `json:"countries"`
}

func parseUniqueCountries(s string) (models.UniqueCountries, error) {
	unique := map[models.Country]struct{}{}
	var countries models.UniqueCountries
	for _, c := range strings.Split(s, ",") {
		if c = strings.ToUpper(strings.TrimSpace(c)); c == "" {
			continue
		}
		n := models.Country(c)
		if !n.Valid() {
			return nil, fmt.Errorf("'%s' is not a valid country code", c)
		}
		if _, found := unique[n]; found {
			return nil, fmt.Errorf("'%s' is not a unique country code", c)
		}
		countries = append(countries, n)
		unique[n] = struct{}{}
	}
	return countries, nil
}

// Do executes the actual stretch import from the shape file.
func (stsh *StretchShape) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()
	defer func() {
		feedback.Info("Storing stretches from shape file took %v.",
			time.Since(start))
	}()

	zpath := filepath.Join(stsh.Dir, "stretch.zip")

	z, err := zip.OpenReader(zpath)
	if err != nil {
		return nil, err
	}
	defer z.Close()

	shpF := common.FindInZIP(z, ".shp")
	if shpF == nil {
		return nil, errors.New("no SHP file found in ZIP")
	}
	prefix := strings.TrimSuffix(shpF.Name, path.Ext(shpF.Name))
	dbfF := common.FindInZIP(z, prefix+".dbf")
	if dbfF == nil {
		return nil, fmt.Errorf("no DBF file found for %s", shpF.Name)
	}

	shpR, err := shpF.Open()
	if err != nil {
		return nil, err
	}

	dbfR, err := dbfF.Open()
	if err != nil {
		shpR.Close()
		return nil, err
	}
	sr := shp.SequentialReaderFromExt(shpR, dbfR)
	defer sr.Close()

	fields := sr.Fields()

	// Map the attribute column indices.

	var (
		nameIdx      = -1
		objnamIdx    = -1
		nobjnamIdx   = -1
		lowerIdx     = -1
		upperIdx     = -1
		sourceIdx    = -1
		dateInfoIdx  = -1
		countriesIdx = -1
	)

	type index struct {
		name string
		idx  *int
	}

	indices := []index{
		{"name", &nameIdx},
		{"objnam", &objnamIdx},
		{"nobjnam", &nobjnamIdx},
		{"lower", &lowerIdx},
		{"upper", &upperIdx},
		{"source", &sourceIdx},
		{"source", &sourceIdx},
		{"dateinfo", &dateInfoIdx},
		{"countries", &countriesIdx},
	}

nextField:
	for i := range fields {
		name := strings.ToLower(fields[i].String())
		for j := range indices {
			if name == indices[j].name {
				*indices[j].idx = i
				continue nextField
			}
		}
	}

	var missingFields []string

	for i := range indices {
		if *indices[i].idx == -1 {
			missingFields = append(missingFields, indices[i].name)
		}
	}

	if len(missingFields) > 0 {
		return nil, fmt.Errorf("missing fields in attributes: %s",
			strings.Join(missingFields, ", "))
	}

	// Now we have ensured that all columns are in place
	// so start extracting data from the shape file.

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	insStmt, err := tx.PrepareContext(ctx, stshInsertSQL)
	if err != nil {
		return nil, err
	}
	defer insStmt.Close()

	insCountryStmt, err := tx.PrepareContext(ctx, stInsertCountrySQL)
	if err != nil {
		return nil, err
	}
	defer insCountryStmt.Close()

	trackStmt, err := tx.PrepareContext(ctx, trackImportSQL)
	if err != nil {
		return nil, err
	}
	defer trackStmt.Close()

	var stretches []*stretchSummary

	for sr.Next() {

		_, p := sr.Shape()
		if p == nil {
			feedback.Warn("Invalid NULL geometry found.")
			continue
		}
		poly, err := shapeToPolygon(p)
		if err != nil {
			feedback.Warn("Invalid geometry found: %v.", err)
			continue
		}

		var (
			name     = fixAttribute(sr.Attribute(nameIdx))
			objnam   = fixAttribute(sr.Attribute(objnamIdx))
			nobjnam  = fixAttribute(sr.Attribute(nobjnamIdx))
			lower    = fixAttribute(sr.Attribute(lowerIdx))
			upper    = fixAttribute(sr.Attribute(upperIdx))
			dateInfo = fixAttribute(sr.Attribute(dateInfoIdx))
			source   = fixAttribute(sr.Attribute(sourceIdx))
			cnts     = fixAttribute(sr.Attribute(countriesIdx))
		)

		feedback.Info("Importing stretch %s (%s - %s).",
			name, lower, upper)

		date, err := common.ParseTime(dateInfo)
		if err != nil {
			feedback.Warn("Invalid time value: %v.", err)
			continue
		}

		countries, err := parseUniqueCountries(cnts)
		if err != nil {
			feedback.Warn("Countries: %v.", err)
			continue
		}

		var nobjnamNull sql.NullString
		if nobjnam != "" {
			nobjnamNull = sql.NullString{
				String: nobjnam,
				Valid:  true,
			}
		}

		// Convert to a multi polygon.
		area := poly.MultiPolygonGeom().AsWKB()

		var id int64

		if err := insStmt.QueryRowContext(
			ctx,
			name,
			lower, upper,
			area,
			objnam,
			nobjnamNull,
			date,
			source,
		).Scan(&id); err != nil {
			return nil, err
		}

		// Store the countries
		for _, country := range countries {
			if _, err := insCountryStmt.ExecContext(ctx, id, country); err != nil {
				return nil, err
			}
		}

		// Finally track the stretch

		if _, err := trackStmt.ExecContext(
			ctx,
			importID,
			"users.stretches",
			id,
		); err != nil {
			return nil, err
		}

		stretch := &stretchSummary{
			ID:        id,
			Name:      name,
			From:      lower,
			To:        upper,
			ObjNam:    objnam,
			Date:      models.Date{Time: date},
			Countries: countries,
		}

		if nobjnamNull.Valid {
			stretch.NObjNam = &nobjnamNull.String
		}

		stretches = append(stretches, stretch)
	}

	if err := sr.Err(); err != nil {
		return nil, err
	}

	if len(stretches) == 0 {
		return nil, UnchangedError("No stretches written.")
	}

	if err := tx.Commit(); err != nil {
		return nil, err
	}

	return stretches, nil
}