view pkg/imports/wx.go @ 1754:807569b08513

Import queue: Auto acceptance is now a property of the import kind itself and is not configurable any more.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 10 Jan 2019 16:19:26 +0100
parents fb05027d93b6
children bb4348ac52ab
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"bytes"
	"context"
	"database/sql"
	"encoding/binary"
	"encoding/json"
	"errors"
	"fmt"
	"io"
	"math"
	"strings"
	"time"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/wfs"
)

// WaterwayAxis is an import job to import
// the waterway axes in form of line string geometries
// and attribute data from a WFS service.
type WaterwayAxis struct {
	// URL the GetCapabilities URL of the WFS service.
	URL string `json:"url"`
	// FeatureType selects the feature type of the WFS service.
	FeatureType string `json:"feature-type"`
	// SortBy works around misconfigured services to
	// establish a sort order to get the features.
	SortBy string `json:"sort-by"`
}

// WXJobKind is the import queue type identifier.
const WXJobKind JobKind = "wx"

type wxJobCreator struct{}

func init() {
	RegisterJobCreator(WXJobKind, wxJobCreator{})
}

func (wxJobCreator) Description() string { return "waterway axis" }

func (wxJobCreator) AutoAccept() bool { return true }

func (wxJobCreator) Create(_ JobKind, data string) (Job, error) {
	wx := new(WaterwayAxis)
	if err := common.FromJSONString(data, wx); err != nil {
		return nil, err
	}
	return wx, nil
}

func (wxJobCreator) Depends() []string {
	return []string{
		"waterway_axis",
	}
}

// StageDone is a NOP for waterway axis imports.
func (wxJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
	return nil
}

// CleanUp for waterway imports is a NOP.
func (*WaterwayAxis) CleanUp() error { return nil }

type waterwayAxisProperties struct {
	ObjNam  string  `json:"hydro_objnam"`
	NObjNnm *string `json:"hydro_nobjnm"`
}

type line [][]float64

const wkbLineString uint32 = 2

func (l line) asWKB() []byte {

	size := 1 + 4 + 4 + len(l)*(2*8)

	buf := bytes.NewBuffer(make([]byte, 0, size))

	binary.Write(buf, binary.LittleEndian, wkbNDR)
	binary.Write(buf, binary.LittleEndian, wkbLineString)
	binary.Write(buf, binary.LittleEndian, uint32(len(l)))

	for _, c := range l {
		var lat, lon float64
		if len(c) > 0 {
			lat = c[0]
		}
		if len(c) > 1 {
			lon = c[1]
		}
		binary.Write(buf, binary.LittleEndian, math.Float64bits(lat))
		binary.Write(buf, binary.LittleEndian, math.Float64bits(lon))
	}

	return buf.Bytes()
}

const (
	deleteWaterwayAxisSQL = `DELETE FROM waterway.waterway_axis`
	insertWaterwayAxisSQL = `
INSERT INTO waterway.waterway_axis (wtwaxs, objnam, nobjnam)
VALUES (
  ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326)::geography,
  $3,
  $4
)`
)

// Do executes the actual waterway exis import.
func (wx *WaterwayAxis) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	feedback.Info("Import waterway axis")

	feedback.Info("Loading capabilities from %s", wx.URL)
	caps, err := wfs.GetCapabilities(wx.URL)
	if err != nil {
		feedback.Error("Loading capabilities failed: %v", err)
		return nil, err
	}

	ft := caps.FindFeatureType(wx.FeatureType)
	if ft == nil {
		return nil, fmt.Errorf("Unknown feature type '%s'", wx.FeatureType)
	}

	feedback.Info("Found feature type '%s", wx.FeatureType)

	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
	if err != nil {
		feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS)
		return nil, err
	}

	if wx.SortBy != "" {
		feedback.Info("Features will be sorted by '%s'", wx.SortBy)
	}

	urls, err := wfs.GetFeaturesGET(
		caps, wx.FeatureType, "application/json", wx.SortBy)
	if err != nil {
		feedback.Error("Cannot create GetFeature URLs. %v", err)
		return nil, err
	}

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	insertStmt, err := tx.PrepareContext(ctx, insertWaterwayAxisSQL)
	if err != nil {
		return nil, err
	}
	defer insertStmt.Close()

	// Delete the old features.
	if _, err := tx.ExecContext(ctx, deleteWaterwayAxisSQL); err != nil {
		return nil, err
	}

	var (
		unsupportedTypes  = map[string]int{}
		missingProperties int
		badProperties     int
		features          int
	)

	if err := wfs.DownloadURLs(urls, func(r io.Reader) error {
		rfc, err := wfs.ParseRawFeatureCollection(r)
		if err != nil {
			return err
		}
		if rfc.CRS != nil {
			crsName := rfc.CRS.Properties.Name
			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
				feedback.Error("Unsupported CRS: %d", crsName)
				return err
			}
		}

		// No features -> ignore.
		if rfc.Features == nil {
			return nil
		}

		feedback.Info("Using EPSG: %d", epsg)

		for _, feature := range rfc.Features {
			if feature.Properties == nil || feature.Geometry.Coordinates == nil {
				missingProperties++
				continue
			}

			var props waterwayAxisProperties

			if err := json.Unmarshal(*feature.Properties, &props); err != nil {
				badProperties++
				continue
			}

			var nobjnam sql.NullString
			if props.NObjNnm != nil {
				nobjnam = sql.NullString{String: *props.NObjNnm, Valid: true}
			}

			switch feature.Geometry.Type {
			case "LineString":
				var l line
				if err := json.Unmarshal(*feature.Geometry.Coordinates, &l); err != nil {
					return err
				}
				if _, err := insertStmt.ExecContext(
					ctx,
					l.asWKB(),
					epsg,
					props.ObjNam,
					nobjnam,
				); err != nil {
					return err
				}
				features++
			case "MultiLineString":
				var ls []line
				if err := json.Unmarshal(*feature.Geometry.Coordinates, &ls); err != nil {
					return err
				}
				for _, l := range ls {
					if _, err := insertStmt.ExecContext(
						ctx,
						l.asWKB(),
						epsg,
						props.ObjNam,
						nobjnam,
					); err != nil {
						return err
					}
					features++
				}
			default:
				unsupportedTypes[feature.Geometry.Type]++
			}
		}
		return nil
	}); err != nil {
		feedback.Error("Downloading features failed: %v", err)
		return nil, err
	}

	if features == 0 {
		err := errors.New("No features found")
		feedback.Error("%v", err)
		return nil, err
	}

	if badProperties > 0 {
		feedback.Warn("Bad properties: %d", badProperties)
	}

	if missingProperties > 0 {
		feedback.Warn("Missing properties: %d", missingProperties)
	}

	if len(unsupportedTypes) != 0 {
		var b strings.Builder
		for t, c := range unsupportedTypes {
			if b.Len() > 0 {
				b.WriteString(", ")
			}
			b.WriteString(fmt.Sprintf("%s: %d", t, c))
		}
		feedback.Warn("Unsupported types found: %s", b.String())
	}

	if err = tx.Commit(); err == nil {
		feedback.Info("Storing %d features took %s",
			features, time.Since(start))
	}

	return nil, err
}