view pkg/imports/sr.go @ 2130:f3aabc05f9b2

Fix constraints on waterway profiles staging_done in the UNIQUE constraint had no effect, because the exclusion constraint prevented two rows with equal location and validity anyhow. Adding staging_done to the exclusion constraint makes the UNIQUE constraint checking only a corner case of what the exclusion constraint checks. Thus, remove the UNIQUE constraint. Casting staging_done to int is needed because there is no appropriate operator class for booleans. Casting to smallint or even bit would have been better (i.e. should result in smaller index size), but that would have required creating such a CAST, in addition.
author Tom Gottfried <tom@intevation.de>
date Wed, 06 Feb 2019 15:42:32 +0100
parents 59055c8301df
children b868cb653c4d
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Tom Gottfried <tom@intevation.de>

package imports

import (
	"archive/zip"
	"bufio"
	"context"
	"crypto/sha1"
	"database/sql"
	"encoding/hex"
	"errors"
	"fmt"
	"io"
	"math"
	"os"
	"path"
	"path/filepath"
	"strconv"
	"strings"
	"time"

	shp "github.com/jonas-p/go-shp"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/models"
	"gemma.intevation.de/gemma/pkg/octree"
)

// SoundingResult is a Job to import sounding reults
// from a ZIP file into the database.
type SoundingResult struct {
	// Dir is the folder in the file system where the
	// 'sr.zip' is expect to be.
	Dir string `json:"dir"`

	// Override data
	// Date if given overrides the date value from the meta.json.
	Date *models.Date `json:"date,omitempty"`
	// Date if given overrides the name of the bottleneck from the meta.json.
	Bottleneck *string `json:"bottleneck,omitempty"`
	// EPSG if given overrides the EPSG code from the meta.json.
	// Defaults to WGS84.
	EPSG *uint `json:"epsg,omitempty"`
	// DepthReference if given overides the DepthReference value
	// from the meta.json.
	DepthReference *string `json:"depth-reference,omitempty"`
}

const (
	contourStepWidth = 0.1
	contourTolerance = 0.1
)

// SRJobKind is the unique name of a SoundingResult import job.
const SRJobKind JobKind = "sr"

type srJobCreator struct{}

func init() {
	RegisterJobCreator(SRJobKind, srJobCreator{})
}

func (srJobCreator) Description() string { return "sounding results" }

func (srJobCreator) AutoAccept() bool { return false }

func (srJobCreator) Create(_ JobKind, data string) (Job, error) {
	sr := new(SoundingResult)
	if err := common.FromJSONString(data, sr); err != nil {
		return nil, err
	}
	return sr, nil
}

func (srJobCreator) Depends() []string {
	return []string{
		"sounding_results",
		"sounding_results_contour_lines",
		"bottlenecks",
	}
}

func (srJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
) error {
	_, err := tx.ExecContext(ctx, srStageDoneSQL, id)
	return err
}

const (
	srStageDoneSQL = `
UPDATE waterway.sounding_results SET staging_done = true
WHERE id = (
  SELECT key from import.track_imports
  WHERE import_id = $1 AND
        relation = 'waterway.sounding_results'::regclass)`

	insertPointsSQL = `
INSERT INTO waterway.sounding_results (
  bottleneck_id,
  date_info,
  depth_reference,
  point_cloud,
  area
) VALUES (
  (SELECT id from waterway.bottlenecks where objnam = $1),
  $2::date,
  $3,
  ST_Transform(ST_GeomFromWKB($4, $6::integer), 4326)::geography,
  (SELECT CASE WHEN $5::bytea IS NULL THEN
    ST_Transform(
      ST_ConcaveHull(
          ST_Force2D(ST_GeomFromWKB($4, $6::integer)), 0.7), 4326)::geography
  ELSE
    ST_Transform(ST_GeomFromWKB($5, $6::integer), 4326)::geography
  END)
)
RETURNING
  id,
  ST_X(ST_Centroid(point_cloud::geometry)),
  ST_Y(ST_Centroid(point_cloud::geometry)),
  CASE WHEN ST_Y(ST_Centroid(point_cloud::geometry)) > 0 THEN
    32600
  ELSE
    32700
  END + floor((ST_X(ST_Centroid(point_cloud::geometry))+180)/6)::int + 1`

	insertOctreeSQL = `
UPDATE waterway.sounding_results SET
  octree_checksum = $2, octree_index = $3
WHERE id = $1`

	insertContourSQL = `
INSERT INTO waterway.sounding_results_contour_lines (
  sounding_result_id,
  height,
  lines
)
SELECT
  $1,
  $2,
  ST_Transform(
    ST_Multi(
      ST_CollectionExtract(
        ST_SimplifyPreserveTopology(
          ST_Multi(ST_Collectionextract(
            ST_MakeValid(ST_GeomFromWKB($4, $3::integer)), 2)),
          $5
        ),
        2
      )
    ),
    4326
  )
FROM waterway.sounding_results sr
WHERE id = $1
`
)

// Do executes the actual sounding result import.
func (sr *SoundingResult) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	z, err := zip.OpenReader(filepath.Join(sr.Dir, "sr.zip"))
	if err != nil {
		return nil, err
	}
	defer z.Close()

	feedback.Info("Looking for 'meta.json'")
	mf := common.FindInZIP(z, "meta.json")
	if mf == nil && !sr.completeOverride() {
		return nil, errors.New("Cannot find 'meta.json'")
	}

	m, err := sr.loadMeta(mf)
	if err != nil {
		return nil, err
	}

	if err := m.Validate(ctx, conn); err != nil {
		return nil, common.ToError(err)
	}

	var xyzf *zip.File
	for _, ext := range []string{".xyz", ".txt"} {
		feedback.Info("Looking for '*%s'", ext)
		if xyzf = common.FindInZIP(z, ext); xyzf != nil {
			break
		}
	}
	if xyzf == nil {
		return nil, errors.New("Cannot find any *.xyz or *.txt file")
	}

	xyz, err := loadXYZ(xyzf, feedback)
	if err != nil {
		return nil, err
	}

	if len(xyz) == 0 {
		return nil, errors.New("XYZ does not contain any vertices")
	}

	// Is there a boundary shapefile in the ZIP archive?
	polygon, err := loadBoundary(z)
	if err != nil {
		return nil, err
	}

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	var (
		id       int64
		epsg     uint32
		lat, lon float64
	)
	start := time.Now()

	err = tx.QueryRow(insertPointsSQL,
		m.Bottleneck,
		m.Date.Time,
		m.DepthReference,
		xyz.AsWKB(),
		polygon.asWKB(),
		m.EPSG,
	).Scan(&id, &lat, &lon, &epsg)
	xyz, polygon = nil, nil // not need from now on.
	feedback.Info("storing points took %s", time.Since(start))
	if err != nil {
		return nil, err
	}

	feedback.Info("Best suited UTM EPSG: %d", epsg)

	feedback.Info("Triangulate...")
	start = time.Now()
	tin, err := octree.GenerateTinByID(ctx, conn, id, epsg)
	feedback.Info("triangulation took %s", time.Since(start))
	if err != nil {
		return nil, err
	}

	if tin == nil {
		return nil, errors.New("cannot load TIN from database")
	}

	feedback.Info("Building octree...")
	builder := octree.NewBuilder(tin)
	start = time.Now()
	builder.Build()
	octreeIndex, err := builder.Bytes()
	tin = nil // not needed from now on
	feedback.Info("building octree took %s", time.Since(start))
	if err != nil {
		return nil, err
	}

	feedback.Info("Store octree...")
	start = time.Now()
	h := sha1.New()
	h.Write(octreeIndex)
	checksum := hex.EncodeToString(h.Sum(nil))
	_, err = tx.Exec(insertOctreeSQL, id, checksum, octreeIndex)
	feedback.Info("storing octree index took %s", time.Since(start))
	if err != nil {
		return nil, err
	}

	tree := builder.Tree()
	builder = nil // not needed from now on

	feedback.Info("Generate contour lines...")
	start = time.Now()
	err = generateContours(tree, tx, id)
	feedback.Info("generating and storing contour lines took %s", time.Since(start))
	if err != nil {
		return nil, err
	}

	// Store for potential later removal.
	if err = track(ctx, tx, importID, "waterway.sounding_results", id); err != nil {
		return nil, err
	}

	if err = tx.Commit(); err == nil {
		feedback.Info("Storing sounding result was successful.")
	}

	summary := struct {
		Bottleneck string      `json:"bottleneck"`
		Date       models.Date `json:"date"`
		Lat        float64     `json:"lat"`
		Lon        float64     `json:"lon"`
	}{
		Bottleneck: m.Bottleneck,
		Date:       m.Date,
		Lat:        lat,
		Lon:        lon,
	}
	return &summary, err
}

// CleanUp removes the folder containing the ZIP file with the
// the sounding result import.
func (sr *SoundingResult) CleanUp() error {
	return os.RemoveAll(sr.Dir)
}

func (sr *SoundingResult) completeOverride() bool {
	// sr.EPSG == nil -> WGS84
	return sr.Bottleneck != nil && sr.Date != nil && sr.DepthReference != nil
}

func (sr *SoundingResult) loadMeta(f *zip.File) (*models.SoundingResultMeta, error) {
	if f == nil {
		var epsg uint
		if sr.EPSG != nil {
			epsg = *sr.EPSG
		} else {
			epsg = models.WGS84
		}
		return &models.SoundingResultMeta{
			Date:           *sr.Date,
			Bottleneck:     *sr.Bottleneck,
			EPSG:           epsg,
			DepthReference: *sr.DepthReference,
		}, nil
	}
	r, err := f.Open()
	if err != nil {
		return nil, err
	}
	defer r.Close()
	var m models.SoundingResultMeta
	if err := m.Decode(r); err != nil {
		return nil, err
	}

	// Apply overrides
	if sr.Date != nil {
		m.Date = *sr.Date
	}
	if sr.Bottleneck != nil {
		m.Bottleneck = *sr.Bottleneck
	}
	if sr.EPSG != nil {
		m.EPSG = *sr.EPSG
	}
	if sr.DepthReference != nil {
		m.DepthReference = *sr.DepthReference
	}

	return &m, nil
}

func loadXYZReader(r io.Reader, feedback Feedback) (octree.MultiPointZ, error) {
	mpz := make(octree.MultiPointZ, 0, 250000)
	s := bufio.NewScanner(r)

	for line := 1; s.Scan(); line++ {
		text := s.Text()
		var p octree.Vertex
		// fmt.Sscanf(text, "%f,%f,%f") is 4 times slower.
		idx := strings.IndexByte(text, ',')
		if idx == -1 {
			feedback.Warn("format error in line %d", line)
			continue
		}
		var err error
		if p.X, err = strconv.ParseFloat(text[:idx], 64); err != nil {
			feedback.Warn("format error in line %d: %v", line, err)
			continue
		}
		text = text[idx+1:]
		if idx = strings.IndexByte(text, ','); idx == -1 {
			feedback.Warn("format error in line %d", line)
			continue
		}
		if p.Y, err = strconv.ParseFloat(text[:idx], 64); err != nil {
			feedback.Warn("format error in line %d: %v", line, err)
			continue
		}
		text = text[idx+1:]
		if p.Z, err = strconv.ParseFloat(text, 64); err != nil {
			feedback.Warn("format error in line %d: %v", line, err)
			continue
		}
		mpz = append(mpz, p)
	}

	if err := s.Err(); err != nil {
		return nil, err
	}

	return mpz, nil
}

func loadXYZ(f *zip.File, feedback Feedback) (octree.MultiPointZ, error) {
	r, err := f.Open()
	if err != nil {
		return nil, err
	}
	defer r.Close()
	return loadXYZReader(r, feedback)
}

func loadBoundary(z *zip.ReadCloser) (polygonSlice, error) {
	shpF := common.FindInZIP(z, ".shp")
	if shpF == nil {
		return nil, nil
	}
	prefix := strings.TrimSuffix(shpF.Name, path.Ext(shpF.Name))
	dbfF := common.FindInZIP(z, prefix+".dbf")
	if dbfF == nil {
		return nil, fmt.Errorf("No DBF file found for %s", shpF.Name)
	}

	shpR, err := shpF.Open()
	if err != nil {
		return nil, err
	}

	dbfR, err := dbfF.Open()
	if err != nil {
		shpR.Close()
		return nil, err
	}
	sr := shp.SequentialReaderFromExt(shpR, dbfR)
	defer sr.Close()

	if !sr.Next() {
		return nil, sr.Err()
	}

	_, s := sr.Shape()
	if s == nil {
		return nil, sr.Err()
	}

	return shapeToPolygon(s)
}

func generateContours(tree *octree.Tree, tx *sql.Tx, id int64) error {
	stmt, err := tx.Prepare(insertContourSQL)
	if err != nil {
		return err
	}
	defer stmt.Close()

	// Adjust contour lines heights to multiples of contourStepWidth
	heights := make([]float64, 0)
	h := contourStepWidth * math.Ceil(tree.Min.Z/contourStepWidth)
	for ; h <= tree.Max.Z; h += contourStepWidth {
		heights = append(heights, h)
	}

	octree.DoContours(tree, heights, func(res *octree.ContourResult) {
		if err == nil {
			_, err = stmt.Exec(
				id, res.Height, tree.EPSG,
				res.Lines.AsWKB2D(),
				contourTolerance)
		}
	})

	return err
}