view pkg/imports/bn.go @ 4134:9864d682ab47

bn-import: Downgraded info on missing validity from Warning to Info. The validity is optional in the upstream data and currently only available in AT. As warnings this was cluttering the log, therefor the downgrade to regular "Info".
author Sascha Wilde <wilde@intevation.de>
date Thu, 01 Aug 2019 19:27:30 +0200
parents eb08fbe33074
children f464cbcdf2f2
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Tom Gottfried <tom.gottfried@intevation.de>
//  * Sascha Wilde <sascha.wilde@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"errors"
	"regexp"
	"strconv"
	"strings"
	"time"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/pgxutils"
	"gemma.intevation.de/gemma/pkg/soap/ifbn"
	"github.com/jackc/pgx/pgtype"
)

// Bottleneck is an import job to import
// bottlenecks from an IFBN SOAP service.
type Bottleneck struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Tolerance used for axis snapping
	Tolerance float64 `json:"tolerance"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

// BNJobKind is the import queue type identifier.
const BNJobKind JobKind = "bn"

const (
	insertBottleneckSQL = `
WITH
bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))),
r AS (SELECT isrsrange(
    (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY),
    (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r)
INSERT INTO waterway.bottlenecks (
  bottleneck_id,
  validity,
  gauge_location,
  objnam,
  nobjnm,
  stretch,
  area,
  rb,
  lb,
  responsible_country,
  revisiting_time,
  limiting,
  date_info,
  source_organization
) VALUES (
  $1,
  $2::tstzrange,
  isrs_fromText($3),
  $4,
  $5,
  (SELECT r FROM r),
  ISRSrange_area(
    ISRSrange_axis((SELECT r FROM r),
                   $15),
    (SELECT ST_Collect(CAST(area AS geometry))
        FROM waterway.waterway_area)),
  $8,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14
)
RETURNING id
`

	// We only check for NOT NULL values, for correct compairison with
	// values, which might be null (and then muyst not be compairt with `='
	// but with `IS NULL' is comlicated and that we are checking more than
	// only (bottleneck_id, validity, date_info) is luxury already.
	findExactMatchBottleneckSQL = `
WITH
bounds (b) AS (VALUES (isrs_fromText($4)), (isrs_fromText($5))),
r AS (SELECT isrsrange(
      (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY),
      (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r)
SELECT id FROM waterway.bottlenecks
WHERE (
  bottleneck_id,
  validity,
  gauge_location,
  stretch,
  responsible_country,
  limiting,
  date_info,
  source_organization,
  staging_done
) = ( SELECT
  $1,
  $2::tstzrange,
  isrs_fromText($3),
  (SELECT r FROM r),
  $6,
  $7,
  $8::timestamptz,
  $9,
  true
)
`

	findIntersectingBottleneckSQL = `
SELECT id FROM waterway.bottlenecks
WHERE (bottleneck_id, staging_done) = ($1, true)
  AND $2::tstzrange && validity
`

	insertBottleneckMaterialSQL = `
INSERT INTO waterway.bottlenecks_riverbed_materials (
   bottleneck_id,
   riverbed
) SELECT *
FROM unnest(CAST($1 AS int[])) AS bns,
  unnest(CAST($2 AS varchar[])) AS materials
ON CONFLICT (bottleneck_id, riverbed) DO NOTHING
`

	bnStageDoneDeleteSQL = `
DELETE FROM waterway.bottlenecks WHERE id IN (
  SELECT key
  FROM import.track_imports
  WHERE import_id = $1
        AND relation = 'waterway.bottlenecks'::regclass
	AND deletion
)`

	bnStageDoneSQL = `
UPDATE waterway.bottlenecks SET staging_done = true
WHERE id IN (
  SELECT key
  FROM import.track_imports
  WHERE import_id = $1
        AND relation = 'waterway.bottlenecks'::regclass
	AND NOT deletion
)`
)

type bnJobCreator struct{}

func init() {
	RegisterJobCreator(BNJobKind, bnJobCreator{})
}

func (bnJobCreator) Description() string { return "bottlenecks" }

func (bnJobCreator) AutoAccept() bool { return false }

func (bnJobCreator) Create() Job { return new(Bottleneck) }

func (bnJobCreator) Depends() [2][]string {
	return [2][]string{
		{"bottlenecks", "bottlenecks_riverbed_materials"},
		{"gauges", "distance_marks_virtual", "waterway_axis", "waterway_area"},
	}
}

// StageDone moves the imported bottleneck out of the staging area.
func (bnJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
) error {
	_, err := tx.ExecContext(ctx, bnStageDoneDeleteSQL, id)
	if err == nil {
		_, err = tx.ExecContext(ctx, bnStageDoneSQL, id)
	}
	return err
}

// CleanUp of a bottleneck import is a NOP.
func (*Bottleneck) CleanUp() error { return nil }

var rblbRe = regexp.MustCompile(`(..)_(..)`)

func splitRBLB(s string) (*string, *string) {
	m := rblbRe.FindStringSubmatch(s)
	if len(m) == 0 {
		return nil, nil
	}
	return &m[1], &m[2]
}

// Do executes the actual bottleneck import.
func (bn *Bottleneck) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	fetch := func() ([]*ifbn.BottleNeckType, error) {
		client := ifbn.NewIBottleneckService(bn.URL, bn.Insecure, nil)

		req := &ifbn.Export_bn_by_isrs{
			Period: &ifbn.RequestedPeriod{Date_start: &time.Time{}},
		}

		resp, err := client.Export_bn_by_isrs(req)
		if err != nil {
			return nil, err
		}

		if resp.Export_bn_by_isrsResult == nil {
			return nil, errors.New(
				"The requested service returned no bottlenecks")
		}

		return resp.Export_bn_by_isrsResult.BottleNeckType, nil
	}

	return storeBottlenecks(ctx, fetch, importID, conn, feedback, bn.Tolerance)
}

type bnStmts struct {
	insert           *sql.Stmt
	findExactMatch   *sql.Stmt
	findIntersecting *sql.Stmt
	insertMaterial   *sql.Stmt
	track            *sql.Stmt
}

func (bs *bnStmts) close() {
	for _, s := range []**sql.Stmt{
		&bs.insert,
		&bs.findExactMatch,
		&bs.findIntersecting,
		&bs.insertMaterial,
		&bs.track,
	} {
		if *s != nil {
			(*s).Close()
			*s = nil
		}
	}
}

func (bs *bnStmts) prepare(ctx context.Context, conn *sql.Conn) error {
	for _, x := range []struct {
		sql  string
		stmt **sql.Stmt
	}{
		{insertBottleneckSQL, &bs.insert},
		{findExactMatchBottleneckSQL, &bs.findExactMatch},
		{findIntersectingBottleneckSQL, &bs.findIntersecting},
		{insertBottleneckMaterialSQL, &bs.insertMaterial},
		{trackImportDeletionSQL, &bs.track},
	} {
		var err error
		if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil {
			return err
		}
	}
	return nil
}

func storeBottlenecks(
	ctx context.Context,
	fetch func() ([]*ifbn.BottleNeckType, error),
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
	tolerance float64,
) (interface{}, error) {
	start := time.Now()

	bns, err := fetch()
	if err != nil {
		return nil, err
	}

	feedback.Info("Found %d bottlenecks for import", len(bns))

	var bs bnStmts
	defer bs.close()

	if err := bs.prepare(ctx, conn); err != nil {
		return nil, err
	}

	var nids []string
	seenOldBnIds := make(map[int64]bool)

	feedback.Info("Tolerance used to snap waterway axis: %g", tolerance)

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	for _, bn := range bns {
		if err := storeBottleneck(
			tx, ctx, importID, feedback, bn,
			&nids, seenOldBnIds, tolerance,
			&bs,
		); err != nil {
			return nil, err
		}
	}
	if err = tx.Commit(); err != nil {
		return nil, err
	}

	if len(nids) == 0 {
		return nil, UnchangedError("No new bottlenecks inserted")
	}

	feedback.Info("Storing %d bottlenecks took %s", len(nids), time.Since(start))
	feedback.Info("Import of bottlenecks was successful")
	summary := struct {
		Bottlenecks []string `json:"bottlenecks"`
	}{
		Bottlenecks: nids,
	}
	return &summary, nil
}

func storeBottleneck(
	tx *sql.Tx,
	ctx context.Context,
	importID int64,
	feedback Feedback,
	bn *ifbn.BottleNeckType,
	nids *[]string,
	seenOldBnIds map[int64]bool,
	tolerance float64,
	bs *bnStmts,
) error {
	feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id)

	var tfrom, tto pgtype.Timestamptz
	var uBound pgtype.BoundType

	if bn.AdditionalData == nil || bn.AdditionalData.KeyValuePair == nil {
		// This is a workaround for the fact that most BN data does not
		// contain the optional validity information.
		//
		// The current solution makes every change in the data an
		// update, which might be not the best solution.  Alternative
		// approaches could include usingthe Date Info of the data as
		// start date, and maybe even inspecting the details of changed
		// data for hints wether an update or historisation with a new
		// version is advisable.
		//
		// Never the less, the current solution "just works" for the
		// time being and reflects the upstream systems...  --  sw
		feedback.Info("No validity information, assuming infinite validity.")
		tfrom.Set(time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC))
		uBound = pgtype.Unbounded
	} else {
		const (
			fromKey = "Valid_from_date"
			toKey   = "Valid_to_date"
		)
		fromTo := make(map[string]time.Time)
		for _, kv := range bn.AdditionalData.KeyValuePair {
			k := string(kv.Key)
			if k == fromKey || k == toKey {
				if t, err := time.Parse(time.RFC3339, kv.Value); err != nil {
					return err
				} else {
					fromTo[k] = t
				}
			}
		}

		if t, ok := fromTo[fromKey]; ok {
			tfrom.Set(t)
		} else {
			feedback.Warn("Missing start date")
			return nil
		}

		if t, ok := fromTo[toKey]; ok {
			tto.Set(t)
			uBound = pgtype.Exclusive
			feedback.Info("Valid from %s to %s",
				fromTo[fromKey].Format(common.TimeFormat),
				fromTo[toKey].Format(common.TimeFormat))
		} else {
			uBound = pgtype.Unbounded
			feedback.Info("Valid from %s",
				fromTo[fromKey].Format(common.TimeFormat))
		}
	}

	validity := pgtype.Tstzrange{
		Lower:     tfrom,
		Upper:     tto,
		LowerType: pgtype.Inclusive,
		UpperType: uBound,
		Status:    pgtype.Present,
	}

	rb, lb := splitRBLB(bn.Rb_lb)

	var revisitingTime *int
	if bn.Revisiting_time != nil &&
		len(strings.TrimSpace(*bn.Revisiting_time)) > 0 {
		i, err := strconv.Atoi(*bn.Revisiting_time)
		if err != nil {
			feedback.Warn(
				"Cannot convert given revisiting time '%s' to number of months",
				*bn.Revisiting_time)
		} else {
			revisitingTime = &i
		}
	}

	var limiting, country string

	if bn.Limiting_factor != nil {
		limiting = string(*bn.Limiting_factor)
	}

	if bn.Responsible_country != nil {
		country = string(*bn.Responsible_country)
	}

	var materials []string
	if bn.Riverbed != nil {
		for _, material := range bn.Riverbed.Material {
			if material != nil {
				materials = append(materials, string(*material))
			}
		}
	}

	// Check if an bottleneck identical to the one we would insert already
	// exists:
	var old int64
	err := tx.StmtContext(ctx, bs.findExactMatch).QueryRowContext(
		ctx,
		bn.Bottleneck_id,
		&validity,
		bn.Fk_g_fid,
		bn.From_ISRS, bn.To_ISRS,
		country,
		limiting,
		bn.Date_Info,
		bn.Source,
	).Scan(&old)
	switch {
	case err == sql.ErrNoRows:
		// We dont have a matching old.
	case err != nil:
		return err
	default:
		// We could check if the materials are also matching -- but per
		// specification the Date_Info would hvae to change on that kind of
		// change anyway.  So actualy we ar alreayd checking more in dpth than
		// required.
		feedback.Info("unchanged")
		return nil
	}

	// Additional Information, only of interest when we change something, so
	// it can be used for debugging if something goes wrong...
	feedback.Info("Range: from %s to %s", bn.From_ISRS, bn.To_ISRS)

	// Check if the new bottleneck intersects with the validity of existing
	// for the same bottleneck_id we consider this an update and mark the
	// old data for deletion.
	bns, err := tx.StmtContext(ctx, bs.findIntersecting).QueryContext(
		ctx, bn.Bottleneck_id, &validity,
	)
	if err != nil {
		return err
	}
	defer bns.Close()

	// Mark old intersecting bottleneck data for deletion.  Don't worry about
	// materials, they will be deleted via cascading.
	var oldBnIds []int64
	for bns.Next() {
		var oldID int64
		err := bns.Scan(&oldID)
		if err != nil {
			return err
		}
		oldBnIds = append(oldBnIds, oldID)
	}

	if err := bns.Err(); err != nil {
		return err
	}

	switch {
	case len(oldBnIds) == 1:
		feedback.Info("Bottelneck '%s' "+
			"with intersecting validity already exists: "+
			"UPDATING", bn.Bottleneck_id)
	case len(oldBnIds) > 1:
		// This case is unexpected and should only happen when historic
		// data in the bottleneck service was changed subsequently...
		// We handle it gracefully anyway, but warn.
		feedback.Warn("More than one Bottelneck '%s' "+
			"with intersecting validity already exists: "+
			"REPLACING all of them!", bn.Bottleneck_id)
	}
	// We write the actual tracking information for deletion of superseded
	// bottlenecks later to the databs -- AFTER the new bottleneck was
	// created successfully.  That way, we don't change the database, when
	// an error arises during inserting the new data.

	var bnIds []int64
	// Add new BN data:
	savepoint := Savepoint(ctx, tx, "insert_bottlenck")

	err = savepoint(func() error {
		bns, err := tx.StmtContext(ctx, bs.insert).QueryContext(ctx,
			bn.Bottleneck_id,
			&validity,
			bn.Fk_g_fid,
			bn.OBJNAM,
			bn.NOBJNM,
			bn.From_ISRS, bn.To_ISRS,
			rb,
			lb,
			country,
			revisitingTime,
			limiting,
			bn.Date_Info,
			bn.Source,
			tolerance)
		if err != nil {
			return err
		}
		defer bns.Close()
		for bns.Next() {
			var nid int64
			if err := bns.Scan(&nid); err != nil {
				return err
			}
			bnIds = append(bnIds, nid)
		}
		if err := bns.Err(); err != nil {
			return err
		}

		// Add new materials
		if len(bnIds) > 0 && materials != nil {
			var (
				pgBnIds     pgtype.Int8Array
				pgMaterials pgtype.VarcharArray
			)
			pgBnIds.Set(bnIds)
			pgMaterials.Set(materials)

			// Insert riverbed materials
			if _, err := tx.StmtContext(ctx, bs.insertMaterial).ExecContext(ctx,

				&pgBnIds,
				&pgMaterials,
			); err != nil {
				return err
			}
		}
		return nil
	})
	if err != nil {
		feedback.Warn(pgxutils.ReadableError{err}.Error())
		return nil
	}

	// Now that adding BNs to staging was successful, write import tracking
	// information to database:
	for _, oldID := range oldBnIds {
		// It is possible, that two new bottlenecks intersect with the
		// same old noe, therefor we have to handle duplicates in
		// oldBnIds.
		if !seenOldBnIds[oldID] {
			if _, err := tx.StmtContext(ctx, bs.track).ExecContext(
				ctx, importID, "waterway.bottlenecks", oldID, true,
			); err != nil {
				return err
			}
			seenOldBnIds[oldID] = true
		}
	}
	for _, nid := range bnIds {
		if _, err := tx.StmtContext(ctx, bs.track).ExecContext(
			ctx, importID, "waterway.bottlenecks", nid, false,
		); err != nil {
			return err
		}
	}

	*nids = append(*nids, bn.Bottleneck_id)
	return nil
}