view pkg/imports/bn.go @ 3678:8f58851927c0

client: make layer factory only return new layer config for individual maps instead of each time it is invoked. The purpose of the factory was to support multiple maps with individual layers. But returning a new config each time it is invoked leads to bugs that rely on the layer's state. Now this factory reuses the same objects it created before, per map.
author Markus Kottlaender <markus@intevation.de>
date Mon, 17 Jun 2019 17:31:35 +0200
parents db87f34805fb
children 0227670dedd5
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Tom Gottfried <tom.gottfried@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"errors"
	"regexp"
	"strconv"
	"strings"
	"time"

	"gemma.intevation.de/gemma/pkg/soap/ifbn"
	"github.com/jackc/pgx/pgtype"
)

// Bottleneck is an import job to import
// bottlenecks from an IFBN SOAP service.
type Bottleneck struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Tolerance used for axis snapping
	Tolerance float64 `json:"tolerance"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

// BNJobKind is the import queue type identifier.
const BNJobKind JobKind = "bn"

const (
	insertBottleneckSQL = `
WITH
bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))),
r AS (SELECT isrsrange(
    (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY),
    (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r)
INSERT INTO waterway.bottlenecks (
  bottleneck_id,
  validity,
  gauge_location,
  gauge_validity,
  objnam,
  nobjnm,
  stretch,
  area,
  rb,
  lb,
  responsible_country,
  revisiting_time,
  limiting,
  date_info,
  source_organization
) SELECT
  $1,
  validity * $2, -- intersections with gauge validity ranges
  location,
  validity,
  $4,
  $5,
  (SELECT r FROM r),
  ISRSrange_area(
    ISRSrange_axis((SELECT r FROM r),
                   $15),
    (SELECT ST_Collect(CAST(area AS geometry))
        FROM waterway.waterway_area)),
  $8,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14
  FROM waterway.gauges
  WHERE location = isrs_fromText($3) AND validity && $2
ON CONFLICT (bottleneck_id, validity) DO UPDATE SET
    gauge_location = EXCLUDED.gauge_location,
    gauge_validity = EXCLUDED.gauge_validity,
    objnam = EXCLUDED.objnam,
    nobjnm = EXCLUDED.nobjnm,
    stretch = EXCLUDED.stretch,
    area = EXCLUDED.area,
    rb = EXCLUDED.rb,
    lb = EXCLUDED.lb,
    responsible_country = EXCLUDED.responsible_country,
    revisiting_time = EXCLUDED.revisiting_time,
    limiting = EXCLUDED.limiting,
    date_info = EXCLUDED.date_info,
    source_organization = EXCLUDED.source_organization
RETURNING id
`

	// Alignment with gauge validity might have generated new entries
	// for the same time range. Thus, remove the old ones
	deleteObsoleteBNSQL = `
DELETE FROM waterway.bottlenecks
WHERE bottleneck_id = $1 AND validity <@ $2 AND id <> ALL($3)
`

	fixBNValiditySQL = `
UPDATE waterway.bottlenecks SET
   -- Set enddate of old entry to new startdate in case of overlap:
  validity = validity - $2
WHERE bottleneck_id = $1
  AND validity && $2 AND NOT validity <@ $2
`

	deleteBottleneckMaterialSQL = `
WITH del AS (
  DELETE FROM waterway.bottlenecks_riverbed_materials
  WHERE bottleneck_id = ANY($1)
    AND riverbed <> ALL($2)
  RETURNING riverbed)
SELECT DISTINCT riverbed FROM del
`

	insertBottleneckMaterialSQL = `
INSERT INTO waterway.bottlenecks_riverbed_materials (
   bottleneck_id,
   riverbed
) SELECT *
FROM unnest(CAST($1 AS int[])) AS bns,
  unnest(CAST($2 AS varchar[])) AS materials
ON CONFLICT (bottleneck_id, riverbed) DO NOTHING
`
)

type bnJobCreator struct{}

func init() {
	RegisterJobCreator(BNJobKind, bnJobCreator{})
}

func (bnJobCreator) Description() string { return "bottlenecks" }

func (bnJobCreator) AutoAccept() bool { return false }

func (bnJobCreator) Create() Job { return new(Bottleneck) }

func (bnJobCreator) Depends() [2][]string {
	return [2][]string{
		{"bottlenecks", "bottlenecks_riverbed_materials"},
		{"gauges", "distance_marks_virtual", "waterway_axis", "waterway_area"},
	}
}

const (
	bnStageDoneSQL = `
UPDATE waterway.bottlenecks SET staging_done = true
WHERE id IN (
  SELECT key from import.track_imports
  WHERE import_id = $1 AND
        relation = 'waterway.bottlenecks'::regclass)`
)

// StageDone moves the imported bottleneck out of the staging area.
func (bnJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
) error {
	_, err := tx.ExecContext(ctx, bnStageDoneSQL, id)
	return err
}

// CleanUp of a bottleneck import is a NOP.
func (*Bottleneck) CleanUp() error { return nil }

var rblbRe = regexp.MustCompile(`(..)_(..)`)

func splitRBLB(s string) (*string, *string) {
	m := rblbRe.FindStringSubmatch(s)
	if len(m) == 0 {
		return nil, nil
	}
	return &m[1], &m[2]
}

// Do executes the actual bottleneck import.
func (bn *Bottleneck) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	fetch := func() ([]*ifbn.BottleNeckType, error) {
		client := ifbn.NewIBottleneckService(bn.URL, bn.Insecure, nil)

		req := &ifbn.Export_bn_by_isrs{}

		resp, err := client.Export_bn_by_isrs(req)
		if err != nil {
			return nil, err
		}

		if resp.Export_bn_by_isrsResult == nil {
			return nil, errors.New(
				"The requested service returned no bottlenecks")
		}

		return resp.Export_bn_by_isrsResult.BottleNeckType, nil
	}

	return storeBottlenecks(ctx, fetch, importID, conn, feedback, bn.Tolerance)
}

func storeBottlenecks(
	ctx context.Context,
	fetch func() ([]*ifbn.BottleNeckType, error),
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
	tolerance float64,
) (interface{}, error) {
	start := time.Now()

	bns, err := fetch()
	if err != nil {
		return nil, err
	}

	feedback.Info("Found %d bottlenecks for import", len(bns))

	var insertStmt, deleteObsoleteBNStmt, fixValidityStmt,
		deleteMaterialStmt, insertMaterialStmt, trackStmt *sql.Stmt

	for _, x := range []struct {
		sql  string
		stmt **sql.Stmt
	}{
		{insertBottleneckSQL, &insertStmt},
		{deleteObsoleteBNSQL, &deleteObsoleteBNStmt},
		{fixBNValiditySQL, &fixValidityStmt},
		{deleteBottleneckMaterialSQL, &deleteMaterialStmt},
		{insertBottleneckMaterialSQL, &insertMaterialStmt},
		{trackImportSQL, &trackStmt},
	} {
		var err error
		if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil {
			return nil, err
		}
		defer (*x.stmt).Close()
	}

	var nids []string

	feedback.Info("Tolerance used to snap waterway axis: %g", tolerance)

	for _, bn := range bns {
		if err := storeBottleneck(
			ctx, importID, conn, feedback, bn, &nids, tolerance,
			insertStmt, deleteObsoleteBNStmt, fixValidityStmt,
			deleteMaterialStmt, insertMaterialStmt, trackStmt); err != nil {
			return nil, err
		}
	}
	if len(nids) == 0 {
		return nil, UnchangedError("No new bottlenecks inserted")
	}

	feedback.Info("Storing %d bottlenecks took %s", len(nids), time.Since(start))
	feedback.Info("Import of bottlenecks was successful")
	summary := struct {
		Bottlenecks []string `json:"bottlenecks"`
	}{
		Bottlenecks: nids,
	}
	return &summary, nil
}

func storeBottleneck(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
	bn *ifbn.BottleNeckType,
	nids *[]string,
	tolerance float64,
	insertStmt, deleteObsoleteBNStmt, fixValidityStmt,
	deleteMaterialStmt, insertMaterialStmt, trackStmt *sql.Stmt,
) error {
	feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id)

	if bn.AdditionalData == nil || bn.AdditionalData.KeyValuePair == nil {
		feedback.Warn("Missing validity information")
		return nil
	}
	const (
		fromKey = "Valid_from_date"
		toKey   = "Valid_to_date"
	)
	fromTo := make(map[string]time.Time)
	for _, kv := range bn.AdditionalData.KeyValuePair {
		k := string(kv.Key)
		if k == fromKey || k == toKey {
			if t, err := time.Parse(time.RFC3339, kv.Value); err != nil {
				return err
			} else {
				fromTo[k] = t
			}
		}
	}

	var tfrom, tto pgtype.Timestamptz
	if t, ok := fromTo[fromKey]; ok {
		tfrom.Set(t)
	} else {
		feedback.Warn("Missing start date")
		return nil
	}
	var uBound pgtype.BoundType
	if t, ok := fromTo[toKey]; ok {
		tto.Set(t)
		uBound = pgtype.Exclusive
	} else {
		uBound = pgtype.Unbounded
	}
	validity := pgtype.Tstzrange{
		Lower:     tfrom,
		Upper:     tto,
		LowerType: pgtype.Inclusive,
		UpperType: uBound,
		Status:    pgtype.Present,
	}

	rb, lb := splitRBLB(bn.Rb_lb)

	var revisitingTime *int
	if bn.Revisiting_time != nil &&
		len(strings.TrimSpace(*bn.Revisiting_time)) > 0 {
		i, err := strconv.Atoi(*bn.Revisiting_time)
		if err != nil {
			feedback.Warn(
				"Cannot convert given revisiting time '%s' to number of months",
				*bn.Revisiting_time)
		} else {
			revisitingTime = &i
		}
	}

	var limiting, country string

	if bn.Limiting_factor != nil {
		limiting = string(*bn.Limiting_factor)
	}

	if bn.Responsible_country != nil {
		country = string(*bn.Responsible_country)
	}

	var materials []string
	if bn.Riverbed != nil {
		for _, material := range bn.Riverbed.Material {
			if material != nil {
				materials = append(materials, string(*material))
			}
		}
	}

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return err
	}
	defer tx.Rollback()

	var bnIds []int64
	bns, err := tx.StmtContext(ctx, insertStmt).QueryContext(ctx,
		bn.Bottleneck_id,
		&validity,
		bn.Fk_g_fid,
		bn.OBJNAM,
		bn.NOBJNM,
		bn.From_ISRS, bn.To_ISRS,
		rb,
		lb,
		country,
		revisitingTime,
		limiting,
		bn.Date_Info,
		bn.Source,
		tolerance,
	)
	if err != nil {
		feedback.Warn(handleError(err).Error())
		return nil
	}
	defer bns.Close()
	for bns.Next() {
		var nid int64
		if err := bns.Scan(&nid); err != nil {
			return err
		}
		bnIds = append(bnIds, nid)
	}
	if err := bns.Err(); err != nil {
		return err
	}
	if len(bnIds) == 0 {
		feedback.Warn(
			"No gauge matching '%s' or given time available", bn.Fk_g_fid)
		return nil
	}

	// Remove obsolete bottleneck version entries
	var pgBnIds pgtype.Int8Array
	pgBnIds.Set(bnIds)
	if _, err = tx.StmtContext(ctx, deleteObsoleteBNStmt).ExecContext(ctx,
		bn.Bottleneck_id,
		&validity,
		&pgBnIds,
	); err != nil {
		feedback.Warn(handleError(err).Error())
		if err2 := tx.Rollback(); err2 != nil {
			return err2
		}
		return nil
	}

	// Set end of validity of old version to start of new version
	// in case of overlap
	if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext(ctx,
		bn.Bottleneck_id,
		validity,
	); err != nil {
		feedback.Warn(handleError(err).Error())
		if err2 := tx.Rollback(); err2 != nil {
			return err2
		}
		return nil
	}

	if materials != nil {
		// Remove obsolete riverbed materials
		var pgMaterials pgtype.VarcharArray
		pgMaterials.Set(materials)
		mtls, err := tx.StmtContext(ctx,
			deleteMaterialStmt).QueryContext(ctx,
			&pgBnIds,
			&pgMaterials,
		)
		if err != nil {
			return err
		}
		defer mtls.Close()
		for mtls.Next() {
			var delMat string
			if err := mtls.Scan(&delMat); err != nil {
				return err
			}
			feedback.Warn("Removed riverbed material %s", delMat)
		}
		if err := mtls.Err(); err != nil {
			return err
		}

		// Insert riverbed materials
		if _, err := tx.StmtContext(ctx, insertMaterialStmt).ExecContext(ctx,
			&pgBnIds,
			&pgMaterials,
		); err != nil {
			feedback.Warn("Failed to insert riverbed materials")
			feedback.Warn(handleError(err).Error())
			return nil
		}
	}

	for _, nid := range bnIds {
		if _, err := tx.StmtContext(ctx, trackStmt).ExecContext(
			ctx, importID, "waterway.bottlenecks", nid,
		); err != nil {
			return err
		}
	}
	if err = tx.Commit(); err != nil {
		return err
	}
	*nids = append(*nids, bn.Bottleneck_id)
	return nil
}