view pkg/imports/bn.go @ 3703:b07511ff859e

Don't include calculated area in unchanged bottleneck detection. This makes BN imports considerably faster. The only downside is, that when die waterway area changes there is no easy way to recalculate the areas of existing BN. But the semantics in that case are somewhat hard anyway (think of historization for the old area) so this should be ok.
author Sascha Wilde <wilde@intevation.de>
date Wed, 19 Jun 2019 12:34:48 +0200
parents 063a1883b5cb
children 7006b92c0334
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Tom Gottfried <tom.gottfried@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"errors"
	"regexp"
	"strconv"
	"strings"
	"time"

	"gemma.intevation.de/gemma/pkg/soap/ifbn"
	"github.com/jackc/pgx/pgtype"
)

// Bottleneck is an import job to import
// bottlenecks from an IFBN SOAP service.
type Bottleneck struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Tolerance used for axis snapping
	Tolerance float64 `json:"tolerance"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

// BNJobKind is the import queue type identifier.
const BNJobKind JobKind = "bn"

const (
	insertBottleneckSQL = `
WITH
bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))),
r AS (SELECT isrsrange(
    (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY),
    (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r)
INSERT INTO waterway.bottlenecks (
  bottleneck_id,
  validity,
  gauge_location,
  gauge_validity,
  objnam,
  nobjnm,
  stretch,
  area,
  rb,
  lb,
  responsible_country,
  revisiting_time,
  limiting,
  date_info,
  source_organization
) SELECT
  $1,
  validity * $2, -- intersections with gauge validity ranges
  location,
  validity,
  $4,
  $5,
  (SELECT r FROM r),
  ISRSrange_area(
    ISRSrange_axis((SELECT r FROM r),
                   $15),
    (SELECT ST_Collect(CAST(area AS geometry))
        FROM waterway.waterway_area)),
  $8,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14
  FROM waterway.gauges
  WHERE location = isrs_fromText($3) AND validity && $2
ON CONFLICT (bottleneck_id, validity) DO UPDATE SET
    gauge_location = EXCLUDED.gauge_location,
    gauge_validity = EXCLUDED.gauge_validity,
    objnam = EXCLUDED.objnam,
    nobjnm = EXCLUDED.nobjnm,
    stretch = EXCLUDED.stretch,
    area = EXCLUDED.area,
    rb = EXCLUDED.rb,
    lb = EXCLUDED.lb,
    responsible_country = EXCLUDED.responsible_country,
    revisiting_time = EXCLUDED.revisiting_time,
    limiting = EXCLUDED.limiting,
    date_info = EXCLUDED.date_info,
    source_organization = EXCLUDED.source_organization
RETURNING id
`

	findExactMatchBottleneckSQL = `
WITH
bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))),
r AS (SELECT isrsrange(
    (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY),
    (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r)
SELECT id FROM waterway.bottlenecks
WHERE (
  bottleneck_id,
  validity,
  gauge_location,
  gauge_validity,
  objnam,
  nobjnm,
  stretch,
  rb,
  lb,
  responsible_country,
  revisiting_time,
  limiting,
  date_info,
  source_organization,
  staging_done
) = ( SELECT
  $1,
  validity * $2, -- intersections with gauge validity ranges
  location,
  validity,
  $4,
  $5,
  (SELECT r FROM r),
  $8,
  $9,
  $10,
  $11::smallint,
  $12,
  $13::timestamptz,
  $14,
  true
  FROM waterway.gauges
  WHERE location = isrs_fromText($3) AND validity && $2
)
`

	// Alignment with gauge validity might have generated new entries
	// for the same time range. Thus, remove the old ones
	deleteObsoleteBNSQL = `
DELETE FROM waterway.bottlenecks
WHERE bottleneck_id = $1 AND validity <@ $2 AND id <> ALL($3)
`

	fixBNValiditySQL = `
UPDATE waterway.bottlenecks SET
   -- Set enddate of old entry to new startdate in case of overlap:
  validity = validity - $2
WHERE bottleneck_id = $1
  AND validity && $2 AND NOT validity <@ $2
`

	deleteBottleneckMaterialSQL = `
WITH del AS (
  DELETE FROM waterway.bottlenecks_riverbed_materials
  WHERE bottleneck_id = ANY($1)
    AND riverbed <> ALL($2)
  RETURNING riverbed)
SELECT DISTINCT riverbed FROM del
`

	insertBottleneckMaterialSQL = `
INSERT INTO waterway.bottlenecks_riverbed_materials (
   bottleneck_id,
   riverbed
) SELECT *
FROM unnest(CAST($1 AS int[])) AS bns,
  unnest(CAST($2 AS varchar[])) AS materials
ON CONFLICT (bottleneck_id, riverbed) DO NOTHING
`
)

type bnJobCreator struct{}

func init() {
	RegisterJobCreator(BNJobKind, bnJobCreator{})
}

func (bnJobCreator) Description() string { return "bottlenecks" }

func (bnJobCreator) AutoAccept() bool { return false }

func (bnJobCreator) Create() Job { return new(Bottleneck) }

func (bnJobCreator) Depends() [2][]string {
	return [2][]string{
		{"bottlenecks", "bottlenecks_riverbed_materials"},
		{"gauges", "distance_marks_virtual", "waterway_axis", "waterway_area"},
	}
}

const (
	bnStageDoneSQL = `
UPDATE waterway.bottlenecks SET staging_done = true
WHERE id IN (
  SELECT key from import.track_imports
  WHERE import_id = $1 AND
        relation = 'waterway.bottlenecks'::regclass)`
)

// StageDone moves the imported bottleneck out of the staging area.
func (bnJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
) error {
	_, err := tx.ExecContext(ctx, bnStageDoneSQL, id)
	return err
}

// CleanUp of a bottleneck import is a NOP.
func (*Bottleneck) CleanUp() error { return nil }

var rblbRe = regexp.MustCompile(`(..)_(..)`)

func splitRBLB(s string) (*string, *string) {
	m := rblbRe.FindStringSubmatch(s)
	if len(m) == 0 {
		return nil, nil
	}
	return &m[1], &m[2]
}

// Do executes the actual bottleneck import.
func (bn *Bottleneck) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	fetch := func() ([]*ifbn.BottleNeckType, error) {
		client := ifbn.NewIBottleneckService(bn.URL, bn.Insecure, nil)

		req := &ifbn.Export_bn_by_isrs{}

		resp, err := client.Export_bn_by_isrs(req)
		if err != nil {
			return nil, err
		}

		if resp.Export_bn_by_isrsResult == nil {
			return nil, errors.New(
				"The requested service returned no bottlenecks")
		}

		return resp.Export_bn_by_isrsResult.BottleNeckType, nil
	}

	return storeBottlenecks(ctx, fetch, importID, conn, feedback, bn.Tolerance)
}

func storeBottlenecks(
	ctx context.Context,
	fetch func() ([]*ifbn.BottleNeckType, error),
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
	tolerance float64,
) (interface{}, error) {
	start := time.Now()

	bns, err := fetch()
	if err != nil {
		return nil, err
	}

	feedback.Info("Found %d bottlenecks for import", len(bns))

	var insertStmt, findExactMatchingBNStmt, deleteObsoleteBNStmt,
		fixValidityStmt, deleteMaterialStmt, insertMaterialStmt,
		trackStmt *sql.Stmt

	for _, x := range []struct {
		sql  string
		stmt **sql.Stmt
	}{
		{insertBottleneckSQL, &insertStmt},
		{findExactMatchBottleneckSQL, &findExactMatchingBNStmt},
		{deleteObsoleteBNSQL, &deleteObsoleteBNStmt},
		{fixBNValiditySQL, &fixValidityStmt},
		{deleteBottleneckMaterialSQL, &deleteMaterialStmt},
		{insertBottleneckMaterialSQL, &insertMaterialStmt},
		{trackImportSQL, &trackStmt},
	} {
		var err error
		if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil {
			return nil, err
		}
		defer (*x.stmt).Close()
	}

	var nids []string

	feedback.Info("Tolerance used to snap waterway axis: %g", tolerance)

	for _, bn := range bns {
		if err := storeBottleneck(
			ctx, importID, conn, feedback, bn, &nids, tolerance,
			insertStmt, findExactMatchingBNStmt, deleteObsoleteBNStmt,
			fixValidityStmt, deleteMaterialStmt, insertMaterialStmt,
			trackStmt); err != nil {
			return nil, err
		}
	}
	if len(nids) == 0 {
		return nil, UnchangedError("No new bottlenecks inserted")
	}

	feedback.Info("Storing %d bottlenecks took %s", len(nids), time.Since(start))
	feedback.Info("Import of bottlenecks was successful")
	summary := struct {
		Bottlenecks []string `json:"bottlenecks"`
	}{
		Bottlenecks: nids,
	}
	return &summary, nil
}

func storeBottleneck(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
	bn *ifbn.BottleNeckType,
	nids *[]string,
	tolerance float64,
	insertStmt, findExactMatchingBNStmt, deleteObsoleteBNStmt,
	fixValidityStmt, deleteMaterialStmt, insertMaterialStmt,
	trackStmt *sql.Stmt,
) error {
	feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id)

	var tfrom, tto pgtype.Timestamptz
	var uBound pgtype.BoundType

	if bn.AdditionalData == nil || bn.AdditionalData.KeyValuePair == nil {
		// This is a workaround for the fact that most BN data does not
		// contain the optional validity information.
		//
		// The current solution makes every change in the data an
		// update, which might be not the best solution.  Alternative
		// approaches could include usingthe Date Info of the data as
		// start date, and maybe even inspecting the details of changed
		// data for hints wether an update or historisation with a new
		// version is advisable.
		//
		// Never the less, the current solution "just works" for the
		// rtime being...  --  sw
		feedback.Warn("No validity information, assuming infinite validity.")
		tfrom.Set(time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC))
		uBound = pgtype.Unbounded
	} else {
		const (
			fromKey = "Valid_from_date"
			toKey   = "Valid_to_date"
		)
		fromTo := make(map[string]time.Time)
		for _, kv := range bn.AdditionalData.KeyValuePair {
			k := string(kv.Key)
			if k == fromKey || k == toKey {
				if t, err := time.Parse(time.RFC3339, kv.Value); err != nil {
					return err
				} else {
					fromTo[k] = t
				}
			}
		}

		if t, ok := fromTo[fromKey]; ok {
			tfrom.Set(t)
		} else {
			feedback.Warn("Missing start date")
			return nil
		}

		if t, ok := fromTo[toKey]; ok {
			tto.Set(t)
			uBound = pgtype.Exclusive
		} else {
			uBound = pgtype.Unbounded
		}
	}

	validity := pgtype.Tstzrange{
		Lower:     tfrom,
		Upper:     tto,
		LowerType: pgtype.Inclusive,
		UpperType: uBound,
		Status:    pgtype.Present,
	}

	rb, lb := splitRBLB(bn.Rb_lb)

	var revisitingTime *int
	if bn.Revisiting_time != nil &&
		len(strings.TrimSpace(*bn.Revisiting_time)) > 0 {
		i, err := strconv.Atoi(*bn.Revisiting_time)
		if err != nil {
			feedback.Warn(
				"Cannot convert given revisiting time '%s' to number of months",
				*bn.Revisiting_time)
		} else {
			revisitingTime = &i
		}
	}

	var limiting, country string

	if bn.Limiting_factor != nil {
		limiting = string(*bn.Limiting_factor)
	}

	if bn.Responsible_country != nil {
		country = string(*bn.Responsible_country)
	}

	var materials []string
	if bn.Riverbed != nil {
		for _, material := range bn.Riverbed.Material {
			if material != nil {
				materials = append(materials, string(*material))
			}
		}
	}

	// Check if an bottleneck identical to the one we would insert already
	// exists:
	bns, err := findExactMatchingBNStmt.QueryContext(ctx,
		bn.Bottleneck_id,
		&validity,
		bn.Fk_g_fid,
		bn.OBJNAM,
		bn.NOBJNM,
		bn.From_ISRS, bn.To_ISRS,
		rb,
		lb,
		country,
		revisitingTime,
		limiting,
		bn.Date_Info,
		bn.Source,
	)

	if err != nil {
		return err
	}
	defer bns.Close()
	if bns.Next() {
		feedback.Info("unchanged")
		return nil
	}

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return err
	}
	defer tx.Rollback()

	var bnIds []int64
	bns, err = tx.StmtContext(ctx, insertStmt).QueryContext(ctx,
		bn.Bottleneck_id,
		&validity,
		bn.Fk_g_fid,
		bn.OBJNAM,
		bn.NOBJNM,
		bn.From_ISRS, bn.To_ISRS,
		rb,
		lb,
		country,
		revisitingTime,
		limiting,
		bn.Date_Info,
		bn.Source,
		tolerance,
	)
	if err != nil {
		feedback.Warn(handleError(err).Error())
		return nil
	}
	defer bns.Close()
	for bns.Next() {
		var nid int64
		if err := bns.Scan(&nid); err != nil {
			return err
		}
		bnIds = append(bnIds, nid)
	}
	if err := bns.Err(); err != nil {
		feedback.Warn(handleError(err).Error())
		return nil
	}
	if len(bnIds) == 0 {
		feedback.Warn(
			"No gauge matching '%s' or given time available", bn.Fk_g_fid)
		return nil
	}

	// Remove obsolete bottleneck version entries
	var pgBnIds pgtype.Int8Array
	pgBnIds.Set(bnIds)
	if _, err = tx.StmtContext(ctx, deleteObsoleteBNStmt).ExecContext(ctx,
		bn.Bottleneck_id,
		&validity,
		&pgBnIds,
	); err != nil {
		feedback.Warn(handleError(err).Error())
		if err2 := tx.Rollback(); err2 != nil {
			return err2
		}
		return nil
	}

	// Set end of validity of old version to start of new version
	// in case of overlap
	if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext(ctx,
		bn.Bottleneck_id,
		validity,
	); err != nil {
		feedback.Warn(handleError(err).Error())
		if err2 := tx.Rollback(); err2 != nil {
			return err2
		}
		return nil
	}

	if materials != nil {
		// Remove obsolete riverbed materials
		var pgMaterials pgtype.VarcharArray
		pgMaterials.Set(materials)
		mtls, err := tx.StmtContext(ctx,
			deleteMaterialStmt).QueryContext(ctx,
			&pgBnIds,
			&pgMaterials,
		)
		if err != nil {
			return err
		}
		defer mtls.Close()
		for mtls.Next() {
			var delMat string
			if err := mtls.Scan(&delMat); err != nil {
				return err
			}
			feedback.Warn("Removed riverbed material %s", delMat)
		}
		if err := mtls.Err(); err != nil {
			return err
		}

		// Insert riverbed materials
		if _, err := tx.StmtContext(ctx, insertMaterialStmt).ExecContext(ctx,
			&pgBnIds,
			&pgMaterials,
		); err != nil {
			feedback.Warn("Failed to insert riverbed materials")
			feedback.Warn(handleError(err).Error())
			return nil
		}
	}

	for _, nid := range bnIds {
		if _, err := tx.StmtContext(ctx, trackStmt).ExecContext(
			ctx, importID, "waterway.bottlenecks", nid,
		); err != nil {
			return err
		}
	}
	if err = tx.Commit(); err != nil {
		return err
	}
	*nids = append(*nids, bn.Bottleneck_id)
	return nil
}