view pkg/imports/stsh.go @ 4324:36d384326407

BottleneckDialogue: Prototype for delete of surveys implemented
author Thomas Junk <thomas.junk@intevation.de>
date Wed, 04 Sep 2019 14:39:51 +0200
parents 3d6a2c6b436c
children 8080007d3c06
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"archive/zip"
	"context"
	"database/sql"
	"errors"
	"fmt"
	"log"
	"os"
	"path"
	"path/filepath"
	"strings"
	"time"

	shp "github.com/jonas-p/go-shp"

	"gemma.intevation.de/gemma/pkg/common"
)

type StretchShape struct {
	Dir string `json:"dir"`
}

const STSHJobKind JobKind = "stsh"

type stshJobCreator struct{}

func init() { RegisterJobCreator(STSHJobKind, stshJobCreator{}) }

func (stshJobCreator) Description() string { return "stretch from shape" }

func (stshJobCreator) AutoAccept() bool { return false }

func (stshJobCreator) Create() Job { return new(StretchShape) }

func (stshJobCreator) Depends() [2][]string {
	return [2][]string{
		{"stretches", "stretch_countries"},
		{},
	}
}

const (
	stshInsertSQL = `
INSERT INTO waterway.stretches (
  name,
  stretch,
  area,
  objnam,
  nobjnam,
  date_info,
  source_organization
) VALUES (
  $1,
  isrsrange(isrs_fromText($2), isrs_fromText($3)),
  ST_GeomFromWKB($4, 4326),
  $5,
  $6,
  $7,
  $8)
RETURNING id`
)

func (stshJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
) error {
	// TODO: Implement me!
	return nil
}

func (stsh *StretchShape) CleanUp() error {
	return os.RemoveAll(stsh.Dir)
}

func fixAttribute(s string) string {
	return strings.TrimRight(s, "\x00")
}

func (stsh *StretchShape) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()
	defer func() {
		feedback.Info("Storing stretches from shape file took %v.",
			time.Since(start))
	}()

	zpath := filepath.Join(stsh.Dir, "stretch.zip")

	z, err := zip.OpenReader(zpath)
	if err != nil {
		return nil, err
	}
	defer z.Close()

	shpF := common.FindInZIP(z, ".shp")
	if shpF == nil {
		return nil, errors.New("no SHP file found in ZIP")
	}
	prefix := strings.TrimSuffix(shpF.Name, path.Ext(shpF.Name))
	dbfF := common.FindInZIP(z, prefix+".dbf")
	if dbfF == nil {
		return nil, fmt.Errorf("no DBF file found for %s", shpF.Name)
	}

	shpR, err := shpF.Open()
	if err != nil {
		return nil, err
	}

	dbfR, err := dbfF.Open()
	if err != nil {
		shpR.Close()
		return nil, err
	}
	sr := shp.SequentialReaderFromExt(shpR, dbfR)
	defer sr.Close()

	fields := sr.Fields()

	// Map the attribute column indices.

	var (
		nameIdx      = -1
		objnamIdx    = -1
		nobjnamIdx   = -1
		lowerIdx     = -1
		upperIdx     = -1
		sourceIdx    = -1
		dateInfoIdx  = -1
		countriesIdx = -1
	)

	type index struct {
		name string
		idx  *int
	}

	indices := []index{
		{"name", &nameIdx},
		{"objnam", &objnamIdx},
		{"nobjnam", &nobjnamIdx},
		{"lower", &lowerIdx},
		{"upper", &upperIdx},
		{"source", &sourceIdx},
		{"source", &sourceIdx},
		{"dateinfo", &dateInfoIdx},
		{"countries", &countriesIdx},
	}

nextField:
	for i := range fields {
		name := strings.ToLower(fields[i].String())
		for j := range indices {
			if name == indices[j].name {
				*indices[j].idx = i
				continue nextField
			}
		}
	}

	var missingFields []string

	for i := range indices {
		if *indices[i].idx == -1 {
			missingFields = append(missingFields, indices[i].name)
		}
	}

	if len(missingFields) > 0 {
		return nil, fmt.Errorf("missing fields in attributes: %s",
			strings.Join(missingFields, ", "))
	}

	// Now we have ensured that all columns are in place
	// so start extracting data from the shape file.

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	insStmt, err := tx.PrepareContext(ctx, stshInsertSQL)
	if err != nil {
		return nil, err
	}
	defer insStmt.Close()

	insCountryStmt, err := tx.PrepareContext(ctx, stInsertCountrySQL)
	if err != nil {
		return nil, err
	}
	defer insCountryStmt.Close()

	for sr.Next() {

		_, p := sr.Shape()
		if p == nil {
			feedback.Warn("Invalid NULL geometry found.")
			continue
		}
		poly, err := shapeToPolygon(p)
		if err != nil {
			feedback.Warn("Invalid geometry found: %v.", err)
			continue
		}

		var (
			name      = fixAttribute(sr.Attribute(nameIdx))
			objnam    = fixAttribute(sr.Attribute(objnamIdx))
			nobjnam   = fixAttribute(sr.Attribute(nobjnamIdx))
			lower     = fixAttribute(sr.Attribute(lowerIdx))
			upper     = fixAttribute(sr.Attribute(upperIdx))
			dateInfo  = fixAttribute(sr.Attribute(dateInfoIdx))
			source    = fixAttribute(sr.Attribute(sourceIdx))
			countries = fixAttribute(sr.Attribute(countriesIdx))
		)

		log.Printf("name     : %+q\n", name)
		log.Printf("objnam   : %+q\n", objnam)
		log.Printf("nobjnam  : %+q\n", nobjnam)
		log.Printf("lower    : %+q\n", lower)
		log.Printf("upper    : %+q\n", upper)
		log.Printf("dateinfo : %+q\n", dateInfo)
		log.Printf("source   : %+q\n", source)
		log.Printf("countries: %+q\n", countries)

		date, err := common.ParseTime(dateInfo)
		if err != nil {
			feedback.Warn("Invalid time value: %v.", err)
			continue
		}

		var nobjnamNull sql.NullString
		if nobjnam != "" {
			nobjnamNull = sql.NullString{
				String: nobjnam,
				Valid:  true,
			}
		}

		// Convert to a multi polygon.
		area := poly.MultiPolygonGeom().AsWKB()

		log.Printf("len geom: %d\n", len(area))

		var id int64

		if err := insStmt.QueryRowContext(
			ctx,
			name,
			lower, upper,
			area,
			objnam,
			nobjnamNull,
			date,
			source,
		).Scan(&id); err != nil {
			return nil, err
		}

		log.Println("after insert")

		for _, country := range strings.Split(countries, ",") {
			if country = strings.TrimSpace(country); country == "" {
				continue
			}
			if _, err := insCountryStmt.ExecContext(ctx, country); err != nil {
				return nil, err
			}
		}

		// TODO: Implement me!
	}

	if err := sr.Err(); err != nil {
		return nil, err
	}

	return nil, errors.New("Not implemented, yet!")
}