view pkg/imports/stsh.go @ 4611:b5aa1eb83bb0 geoserver_sql_views

Add possibility to configure SRS for GeoServer SQL view Automatic detection of spatial reference system for SQL views in GeoServer does not always find the correct SRS.
author Tom Gottfried <tom@intevation.de>
date Fri, 06 Sep 2019 11:58:03 +0200
parents 8080007d3c06
children eca3afe766d7 5e38667f740c
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"archive/zip"
	"context"
	"database/sql"
	"errors"
	"fmt"
	"os"
	"path"
	"path/filepath"
	"strings"
	"time"

	shp "github.com/jonas-p/go-shp"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/models"
)

type StretchShape struct {
	Dir string `json:"dir"`
}

const STSHJobKind JobKind = "stsh"

type stshJobCreator struct{}

func init() { RegisterJobCreator(STSHJobKind, stshJobCreator{}) }

func (stshJobCreator) Description() string { return "stretch from shape" }

func (stshJobCreator) AutoAccept() bool { return false }

func (stshJobCreator) Create() Job { return new(StretchShape) }

func (stshJobCreator) Depends() [2][]string {
	return [2][]string{
		{"stretches", "stretch_countries"},
		{},
	}
}

const (
	stshInsertSQL = `
INSERT INTO waterway.stretches (
  name,
  stretch,
  area,
  objnam,
  nobjnam,
  date_info,
  source_organization
) VALUES (
  $1,
  isrsrange(isrs_fromText($2), isrs_fromText($3)),
  ST_GeomFromWKB($4, 4326),
  $5,
  $6,
  $7,
  $8)
RETURNING id`
)

// StageDone is merely the same as for the normal stretch import.
func (stshJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
) error {
	return stJobCreator{}.StageDone(ctx, tx, id)
}

func (stsh *StretchShape) CleanUp() error {
	return os.RemoveAll(stsh.Dir)
}

func fixAttribute(s string) string {
	return strings.TrimRight(s, "\x00")
}

type stretchSummary struct {
	ID        int64                  `json:"id"`
	Name      string                 `json:"name"`
	From      string                 `json:"from"`
	To        string                 `json:"to"`
	ObjNam    string                 `json:"objnam"`
	NObjNam   *string                `json:"nobjnam"`
	Date      models.Date            `json:"date-info"`
	Countries models.UniqueCountries `json:"countries"`
}

func parseUniqueCountries(s string) (models.UniqueCountries, error) {
	unique := map[models.Country]struct{}{}
	var countries models.UniqueCountries
	for _, c := range strings.Split(s, ",") {
		if c = strings.ToUpper(strings.TrimSpace(c)); c == "" {
			continue
		}
		n := models.Country(c)
		if !n.Valid() {
			return nil, fmt.Errorf("'%s' is not a valid country code", c)
		}
		if _, found := unique[n]; found {
			return nil, fmt.Errorf("'%s' is not a unique country code", c)
		}
		countries = append(countries, n)
		unique[n] = struct{}{}
	}
	return countries, nil
}

func (stsh *StretchShape) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()
	defer func() {
		feedback.Info("Storing stretches from shape file took %v.",
			time.Since(start))
	}()

	zpath := filepath.Join(stsh.Dir, "stretch.zip")

	z, err := zip.OpenReader(zpath)
	if err != nil {
		return nil, err
	}
	defer z.Close()

	shpF := common.FindInZIP(z, ".shp")
	if shpF == nil {
		return nil, errors.New("no SHP file found in ZIP")
	}
	prefix := strings.TrimSuffix(shpF.Name, path.Ext(shpF.Name))
	dbfF := common.FindInZIP(z, prefix+".dbf")
	if dbfF == nil {
		return nil, fmt.Errorf("no DBF file found for %s", shpF.Name)
	}

	shpR, err := shpF.Open()
	if err != nil {
		return nil, err
	}

	dbfR, err := dbfF.Open()
	if err != nil {
		shpR.Close()
		return nil, err
	}
	sr := shp.SequentialReaderFromExt(shpR, dbfR)
	defer sr.Close()

	fields := sr.Fields()

	// Map the attribute column indices.

	var (
		nameIdx      = -1
		objnamIdx    = -1
		nobjnamIdx   = -1
		lowerIdx     = -1
		upperIdx     = -1
		sourceIdx    = -1
		dateInfoIdx  = -1
		countriesIdx = -1
	)

	type index struct {
		name string
		idx  *int
	}

	indices := []index{
		{"name", &nameIdx},
		{"objnam", &objnamIdx},
		{"nobjnam", &nobjnamIdx},
		{"lower", &lowerIdx},
		{"upper", &upperIdx},
		{"source", &sourceIdx},
		{"source", &sourceIdx},
		{"dateinfo", &dateInfoIdx},
		{"countries", &countriesIdx},
	}

nextField:
	for i := range fields {
		name := strings.ToLower(fields[i].String())
		for j := range indices {
			if name == indices[j].name {
				*indices[j].idx = i
				continue nextField
			}
		}
	}

	var missingFields []string

	for i := range indices {
		if *indices[i].idx == -1 {
			missingFields = append(missingFields, indices[i].name)
		}
	}

	if len(missingFields) > 0 {
		return nil, fmt.Errorf("missing fields in attributes: %s",
			strings.Join(missingFields, ", "))
	}

	// Now we have ensured that all columns are in place
	// so start extracting data from the shape file.

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	insStmt, err := tx.PrepareContext(ctx, stshInsertSQL)
	if err != nil {
		return nil, err
	}
	defer insStmt.Close()

	insCountryStmt, err := tx.PrepareContext(ctx, stInsertCountrySQL)
	if err != nil {
		return nil, err
	}
	defer insCountryStmt.Close()

	trackStmt, err := tx.PrepareContext(ctx, trackImportSQL)
	if err != nil {
		return nil, err
	}
	defer trackStmt.Close()

	var stretches []*stretchSummary

	for sr.Next() {

		_, p := sr.Shape()
		if p == nil {
			feedback.Warn("Invalid NULL geometry found.")
			continue
		}
		poly, err := shapeToPolygon(p)
		if err != nil {
			feedback.Warn("Invalid geometry found: %v.", err)
			continue
		}

		var (
			name     = fixAttribute(sr.Attribute(nameIdx))
			objnam   = fixAttribute(sr.Attribute(objnamIdx))
			nobjnam  = fixAttribute(sr.Attribute(nobjnamIdx))
			lower    = fixAttribute(sr.Attribute(lowerIdx))
			upper    = fixAttribute(sr.Attribute(upperIdx))
			dateInfo = fixAttribute(sr.Attribute(dateInfoIdx))
			source   = fixAttribute(sr.Attribute(sourceIdx))
			cnts     = fixAttribute(sr.Attribute(countriesIdx))
		)

		feedback.Info("Importing stretch %s (%s - %s).",
			name, lower, upper)

		date, err := common.ParseTime(dateInfo)
		if err != nil {
			feedback.Warn("Invalid time value: %v.", err)
			continue
		}

		countries, err := parseUniqueCountries(cnts)
		if err != nil {
			feedback.Warn("Countries: %v.", err)
			continue
		}

		var nobjnamNull sql.NullString
		if nobjnam != "" {
			nobjnamNull = sql.NullString{
				String: nobjnam,
				Valid:  true,
			}
		}

		// Convert to a multi polygon.
		area := poly.MultiPolygonGeom().AsWKB()

		var id int64

		if err := insStmt.QueryRowContext(
			ctx,
			name,
			lower, upper,
			area,
			objnam,
			nobjnamNull,
			date,
			source,
		).Scan(&id); err != nil {
			return nil, err
		}

		// Store the countries
		for _, country := range countries {
			if _, err := insCountryStmt.ExecContext(ctx, country); err != nil {
				return nil, err
			}
		}

		// Finally track the stretch

		if _, err := trackStmt.ExecContext(
			ctx,
			importID,
			"waterway.stretches",
			id,
		); err != nil {
			return nil, err
		}

		stretch := &stretchSummary{
			ID:        id,
			Name:      name,
			From:      lower,
			To:        upper,
			ObjNam:    objnam,
			Date:      models.Date{date},
			Countries: countries,
		}

		if nobjnamNull.Valid {
			stretch.NObjNam = &nobjnamNull.String
		}

		stretches = append(stretches, stretch)
	}

	if err := sr.Err(); err != nil {
		return nil, err
	}

	if len(stretches) == 0 {
		return nil, UnchangedError("No stretches written.")
	}

	if err := tx.Commit(); err != nil {
		return nil, err
	}

	return stretches, nil
}