view pkg/imports/bn.go @ 5670:b75d0b303328

Various fixes and improvements of gauges import: - Allow update of erased data (and thereby set erased to false) - Fix source_organization to work with ERDMS2 - Give ISRS of new and updated gauges in summary - Fixed reference of null pointers if revlevels are missing - Fixed reference of null pointer on update errors - Added ISRS to reference_code warning
author Sascha Wilde <wilde@sha-bang.de>
date Fri, 08 Dec 2023 17:29:56 +0100
parents f2204f91d286
children 6270951dda28
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Tom Gottfried <tom.gottfried@intevation.de>
//  * Sascha Wilde <sascha.wilde@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"errors"
	"regexp"
	"strconv"
	"strings"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/pgxutils"
	"gemma.intevation.de/gemma/pkg/soap/ifbn"
)

// Bottleneck is an import job to import
// bottlenecks from an IFBN SOAP service.
type Bottleneck struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Tolerance used for axis snapping
	Tolerance float64 `json:"tolerance"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

// BNJobKind is the import queue type identifier.
const BNJobKind JobKind = "bn"

const (
	insertBottleneckSQL = `
WITH
bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))),
r AS (SELECT isrsrange(
    (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY),
    (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r)
INSERT INTO waterway.bottlenecks (
  bottleneck_id,
  validity,
  gauge_location,
  objnam,
  nobjnm,
  stretch,
  area,
  rb,
  lb,
  responsible_country,
  revisiting_time,
  limiting,
  date_info,
  source_organization
) VALUES (
  $1,
  $2::tstzrange,
  isrs_fromText($3),
  $4,
  $5,
  (SELECT r FROM r),
  ISRSrange_area(
    ISRSrange_axis((SELECT r FROM r),
                   $15),
    (SELECT ST_Collect(CAST(area AS geometry))
        FROM waterway.waterway_area)),
  $8,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14
)
RETURNING id
`

	// We only check for NOT NULL values, for correct compairison with
	// values, which might be null (and then muyst not be compairt with `='
	// but with `IS NULL' is comlicated and that we are checking more than
	// only (bottleneck_id, validity, date_info) is luxury already.
	findExactMatchBottleneckSQL = `
WITH
bounds (b) AS (VALUES (isrs_fromText($4)), (isrs_fromText($5))),
r AS (SELECT isrsrange(
      (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY),
      (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r)
SELECT id FROM waterway.bottlenecks
WHERE (
  bottleneck_id,
  validity,
  gauge_location,
  stretch,
  responsible_country,
  limiting,
  date_info,
  source_organization,
  staging_done
) = ( SELECT
  $1,
  $2::tstzrange,
  isrs_fromText($3),
  (SELECT r FROM r),
  $6,
  $7,
  $8::timestamptz,
  $9,
  true
)
`

	findIntersectingBottleneckSQL = `
SELECT id FROM waterway.bottlenecks
WHERE (bottleneck_id, staging_done) = ($1, true)
  AND $2::tstzrange && validity
`

	insertBottleneckMaterialSQL = `
INSERT INTO waterway.bottlenecks_riverbed_materials (
   bottleneck_id,
   riverbed
) VALUES ($1, $2)
ON CONFLICT (bottleneck_id, riverbed) DO NOTHING
`

	bnStageDoneDeleteSQL = `
DELETE FROM waterway.bottlenecks WHERE id IN (
  SELECT key
  FROM import.track_imports
  WHERE import_id = $1
        AND relation = 'waterway.bottlenecks'::regclass
	AND deletion
)`

	bnStageDoneSQL = `
UPDATE waterway.bottlenecks SET staging_done = true
WHERE id IN (
  SELECT key
  FROM import.track_imports
  WHERE import_id = $1
        AND relation = 'waterway.bottlenecks'::regclass
	AND NOT deletion
)`
)

// Description gives a short info about relevant facts of this import.
func (bn *Bottleneck) Description([]string) (string, error) {
	return bn.URL, nil
}

type bnJobCreator struct{}

func init() {
	RegisterJobCreator(BNJobKind, bnJobCreator{})
}

func (bnJobCreator) Description() string { return "bottlenecks" }

func (bnJobCreator) AutoAccept() bool { return false }

func (bnJobCreator) Create() Job { return new(Bottleneck) }

func (bnJobCreator) Depends() [2][]string {
	return [2][]string{
		{"bottlenecks", "bottlenecks_riverbed_materials"},
		{"gauges", "distance_marks_virtual", "waterway_axis", "waterway_area"},
	}
}

// StageDone moves the imported bottleneck out of the staging area.
func (bnJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
	_ Feedback,
) error {
	_, err := tx.ExecContext(ctx, bnStageDoneDeleteSQL, id)
	if err == nil {
		_, err = tx.ExecContext(ctx, bnStageDoneSQL, id)
	}
	return err
}

// CleanUp of a bottleneck import is a NOP.
func (*Bottleneck) CleanUp() error { return nil }

var rblbRe = regexp.MustCompile(`(..)_(..)`)

func splitRBLB(s string) (*string, *string) {
	m := rblbRe.FindStringSubmatch(s)
	if len(m) == 0 {
		return nil, nil
	}
	return &m[1], &m[2]
}

// Do executes the actual bottleneck import.
func (bn *Bottleneck) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	fetch := func() ([]*ifbn.BottleNeckType, error) {
		client := ifbn.NewIBottleneckService(bn.URL, bn.Insecure, nil)

		req := &ifbn.Export_bn_by_isrs{
			Period: &ifbn.RequestedPeriod{Date_start: &time.Time{}},
		}

		resp, err := client.Export_bn_by_isrs(req)
		if err != nil {
			return nil, err
		}

		if resp.Export_bn_by_isrsResult == nil {
			return nil, errors.New(
				"the requested service returned no bottlenecks")
		}

		return resp.Export_bn_by_isrsResult.BottleNeckType, nil
	}

	return storeBottlenecks(ctx, fetch, importID, conn, feedback, bn.Tolerance)
}

type bnStmts struct {
	insert           *sql.Stmt
	findExactMatch   *sql.Stmt
	findIntersecting *sql.Stmt
	insertMaterial   *sql.Stmt
	track            *sql.Stmt
}

func (bs *bnStmts) close() {
	for _, s := range []**sql.Stmt{
		&bs.insert,
		&bs.findExactMatch,
		&bs.findIntersecting,
		&bs.insertMaterial,
		&bs.track,
	} {
		if *s != nil {
			(*s).Close()
			*s = nil
		}
	}
}

func (bs *bnStmts) prepare(ctx context.Context, conn *sql.Conn) error {
	for _, x := range []struct {
		sql  string
		stmt **sql.Stmt
	}{
		{insertBottleneckSQL, &bs.insert},
		{findExactMatchBottleneckSQL, &bs.findExactMatch},
		{findIntersectingBottleneckSQL, &bs.findIntersecting},
		{insertBottleneckMaterialSQL, &bs.insertMaterial},
		{trackImportDeletionSQL, &bs.track},
	} {
		var err error
		if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil {
			return err
		}
	}
	return nil
}

func storeBottlenecks(
	ctx context.Context,
	fetch func() ([]*ifbn.BottleNeckType, error),
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
	tolerance float64,
) (interface{}, error) {
	start := time.Now()

	bns, err := fetch()
	if err != nil {
		return nil, err
	}

	feedback.Info("Found %d bottlenecks for import", len(bns))

	var bs bnStmts
	defer bs.close()

	if err := bs.prepare(ctx, conn); err != nil {
		return nil, err
	}

	var nids []string
	seenOldBnIDs := make(map[int64]bool)

	feedback.Info("Tolerance used to snap waterway axis: %g", tolerance)

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	savepoint := Savepoint(ctx, tx, "insert_bottlenck")

	for _, bn := range bns {
		if err := storeBottleneck(
			ctx, tx,
			importID, feedback, bn,
			&nids, seenOldBnIDs, tolerance,
			&bs,
			savepoint,
		); err != nil {
			return nil, err
		}
	}
	if err = tx.Commit(); err != nil {
		return nil, err
	}

	if len(nids) == 0 {
		return nil, UnchangedError("No new bottlenecks inserted")
	}

	feedback.Info("Storing %d bottlenecks took %s", len(nids), time.Since(start))
	feedback.Info("Import of bottlenecks was successful")
	summary := struct {
		Bottlenecks []string `json:"bottlenecks"`
	}{
		Bottlenecks: nids,
	}
	return &summary, nil
}

func storeBottleneck(
	ctx context.Context,
	tx *sql.Tx,
	importID int64,
	feedback Feedback,
	bn *ifbn.BottleNeckType,
	nids *[]string,
	seenOldBnIDs map[int64]bool,
	tolerance float64,
	bs *bnStmts,
	savepoint func(func() error) error,
) error {
	feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id)

	var (
		tfrom, tto pgtype.Timestamptz
		uBound     pgtype.BoundType
	)

	if bn.AdditionalData == nil || bn.AdditionalData.KeyValuePair == nil {
		// This is a workaround for the fact that most BN data does not
		// contain the optional validity information.
		//
		// The current solution makes every change in the data an
		// update, which might be not the best solution.  Alternative
		// approaches could include usingthe Date Info of the data as
		// start date, and maybe even inspecting the details of changed
		// data for hints wether an update or historisation with a new
		// version is advisable.
		//
		// Never the less, the current solution "just works" for the
		// time being and reflects the upstream systems...  --  sw
		feedback.Info("No validity information, assuming infinite validity.")
		tfrom.Set(time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC))
		uBound = pgtype.Unbounded
	} else {
		const (
			fromKey = "Valid_from_date"
			toKey   = "Valid_to_date"
		)
		fromTo := make(map[string]time.Time)
		for _, kv := range bn.AdditionalData.KeyValuePair {
			if kv.Key == fromKey || kv.Key == toKey {
				t, err := time.Parse(time.RFC3339, kv.Value)
				if err != nil {
					return err
				}
				fromTo[kv.Key] = t
			}
		}

		if t, ok := fromTo[fromKey]; ok {
			tfrom.Set(t)
		} else {
			feedback.Error("Missing start date")
			return nil
		}

		if t, ok := fromTo[toKey]; ok {
			tto.Set(t)
			uBound = pgtype.Exclusive
			feedback.Info("Valid from %s to %s",
				fromTo[fromKey].Format(common.TimeFormat),
				fromTo[toKey].Format(common.TimeFormat))
		} else {
			uBound = pgtype.Unbounded
			feedback.Info("Valid from %s",
				fromTo[fromKey].Format(common.TimeFormat))
		}
	}

	validity := pgtype.Tstzrange{
		Lower:     tfrom,
		Upper:     tto,
		LowerType: pgtype.Inclusive,
		UpperType: uBound,
		Status:    pgtype.Present,
	}

	rb, lb := splitRBLB(bn.Rb_lb)

	var revisitingTime *int
	if bn.Revisiting_time != nil &&
		len(strings.TrimSpace(*bn.Revisiting_time)) > 0 {
		i, err := strconv.Atoi(*bn.Revisiting_time)
		if err != nil {
			feedback.Warn(
				"Cannot convert given revisiting time '%s' to number of months",
				*bn.Revisiting_time)
		} else {
			revisitingTime = &i
		}
	}

	var limiting, country string

	if bn.Limiting_factor != nil {
		limiting = string(*bn.Limiting_factor)
	}

	if bn.Responsible_country != nil {
		country = string(*bn.Responsible_country)
	}

	var materials []string
	if bn.Riverbed != nil {
		for _, material := range bn.Riverbed.Material {
			if material != nil {
				materials = append(materials, string(*material))
			}
		}
	}

	// Check if a bottleneck identical to the one we would insert already
	// exists:
	var old int64
	err := tx.StmtContext(ctx, bs.findExactMatch).QueryRowContext(
		ctx,
		bn.Bottleneck_id,
		&validity,
		bn.Fk_g_fid,
		bn.From_ISRS, bn.To_ISRS,
		country,
		limiting,
		bn.Date_Info,
		bn.Source,
	).Scan(&old)
	switch {
	case err == sql.ErrNoRows:
		// We dont have a matching old.
	case err != nil:
		return err
	default:
		// We could check if the materials are also matching -- but per
		// specification the Date_Info would hvae to change on that kind of
		// change anyway. So actually we are already checking more in depth than
		// required.
		feedback.Info("unchanged")
		return nil
	}

	// Additional Information, only of interest when we change something, so
	// it can be used for debugging if something goes wrong...
	feedback.Info("Range: from %s to %s", bn.From_ISRS, bn.To_ISRS)
	feedback.Info("Reference gauge: %s", bn.Fk_g_fid)

	// Check if the new bottleneck intersects with the validity of existing
	// for the same bottleneck_id we consider this an update and mark the
	// old data for deletion.
	bns, err := tx.StmtContext(ctx, bs.findIntersecting).QueryContext(
		ctx, bn.Bottleneck_id, &validity,
	)
	if err != nil {
		return err
	}
	defer bns.Close()

	// Mark old intersecting bottleneck data for deletion.  Don't worry about
	// materials, they will be deleted via cascading.
	var oldBnIDs []int64
	for bns.Next() {
		var oldID int64
		err := bns.Scan(&oldID)
		if err != nil {
			return err
		}
		oldBnIDs = append(oldBnIDs, oldID)
	}

	if err := bns.Err(); err != nil {
		return err
	}

	switch {
	case len(oldBnIDs) == 1:
		feedback.Info("Bottleneck '%s' "+
			"with intersecting validity already exists: "+
			"UPDATING", bn.Bottleneck_id)
	case len(oldBnIDs) > 1:
		// This case is unexpected and should only happen when historic
		// data in the bottleneck service was changed subsequently...
		// We handle it gracefully anyway, but warn.
		feedback.Warn("More than one Bottelneck '%s' "+
			"with intersecting validity already exists: "+
			"REPLACING all of them!", bn.Bottleneck_id)
	}
	// We write the actual tracking information for deletion of superseded
	// bottlenecks later to the database -- AFTER the new bottleneck was
	// created successfully. That way, we don't change the database, when
	// an error arises during inserting the new data.

	var bnID int64
	// Add new BN data:
	if err := savepoint(func() error {
		if err := tx.StmtContext(ctx, bs.insert).QueryRowContext(
			ctx,
			bn.Bottleneck_id,
			&validity,
			bn.Fk_g_fid,
			bn.OBJNAM,
			bn.NOBJNM,
			bn.From_ISRS, bn.To_ISRS,
			rb,
			lb,
			country,
			revisitingTime,
			limiting,
			bn.Date_Info,
			bn.Source,
			tolerance,
		).Scan(&bnID); err != nil {
			return err
		}

		// Add new materials
		for _, material := range materials {
			if _, err := tx.StmtContext(ctx, bs.insertMaterial).ExecContext(
				ctx,
				bnID,
				material,
			); err != nil {
				return err
			}
		}
		return nil
	}); err != nil {
		feedback.Error(pgxutils.ReadableError{Err: err}.Error())
		return nil
	}

	// Now that adding BNs to staging was successful, write import tracking
	// information to database:
	for _, oldID := range oldBnIDs {
		// It is possible, that two new bottlenecks intersect with the
		// same old one, therefore we have to handle duplicates in
		// oldBnIds.
		if !seenOldBnIDs[oldID] {
			if _, err := tx.StmtContext(ctx, bs.track).ExecContext(
				ctx, importID, "waterway.bottlenecks", oldID, true,
			); err != nil {
				return err
			}
			seenOldBnIDs[oldID] = true
		}
	}

	if _, err := tx.StmtContext(ctx, bs.track).ExecContext(
		ctx, importID, "waterway.bottlenecks", bnID, false,
	); err != nil {
		return err
	}

	*nids = append(*nids, bn.Bottleneck_id)
	return nil
}