view pkg/imports/bn.go @ 3648:0ec5c8ec1e44

Avoid empty validity time ranges An entry which is not valid at any point in time makes no sense. Further, multiple of such entries would violate a UNIQUE constraint. Since an UPDATE now can change validity time ranges, do all the adjustments for that regardles of whether an INSERT or UPDATE happens.
author Tom Gottfried <tom@intevation.de>
date Wed, 12 Jun 2019 18:26:26 +0200
parents 02951a62e8c6
children 2a079d0a71c1
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Tom Gottfried <tom.gottfried@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"errors"
	"regexp"
	"strconv"
	"strings"
	"time"

	"gemma.intevation.de/gemma/pkg/soap/ifbn"
	"github.com/jackc/pgx/pgtype"
)

// Bottleneck is an import job to import
// bottlenecks from an IFBN SOAP service.
type Bottleneck struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Tolerance used for axis snapping
	Tolerance float64 `json:"tolerance"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

// BNJobKind is the import queue type identifier.
const BNJobKind JobKind = "bn"

const (
	hasBottleneckSQL = `
WITH upd AS (
  UPDATE waterway.bottlenecks SET
    erased = true
  WHERE bottleneck_id = $1
    AND NOT erased
    -- Don't touch old entry if new validity contains old: will be updated
    AND NOT validity <@ $2
  RETURNING 1
)
-- Decide whether a new version will be INSERTed
SELECT EXISTS(SELECT 1 FROM upd)
  OR NOT EXISTS(SELECT 1 FROM waterway.bottlenecks WHERE bottleneck_id = $1)
`

	insertBottleneckSQL = `
WITH
bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))),
r AS (SELECT isrsrange(
    (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY),
    (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r)
INSERT INTO waterway.bottlenecks (
  bottleneck_id,
  validity,
  gauge_location,
  gauge_validity,
  objnam,
  nobjnm,
  stretch,
  area,
  rb,
  lb,
  responsible_country,
  revisiting_time,
  limiting,
  date_info,
  source_organization
) VALUES (
  $1,
  $2,
  isrs_fromText($3),
  COALESCE(
    (SELECT validity FROM waterway.gauges
       WHERE location = isrs_fromText($3)
         AND validity @> lower(CAST($2 AS tstzrange))),
    tstzrange(NULL, NULL)),
  $4,
  $5,
  (SELECT r FROM r),
  ISRSrange_area(
    ISRSrange_axis((SELECT r FROM r),
                   $15),
    (SELECT ST_Collect(CAST(area AS geometry))
        FROM waterway.waterway_area)),
  $8,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14
)
RETURNING id`

	fixBNValiditySQL = `
UPDATE waterway.bottlenecks SET
   -- Set enddate of old entry to new startdate in case of overlap:
  validity = validity - $2
WHERE bottleneck_id = $1
  AND validity && $2
  AND erased
`

	updateBottleneckSQL = `
WITH
bounds (b) AS (VALUES (isrs_fromText($5)), (isrs_fromText($6))),
r AS (SELECT isrsrange(
    (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY),
    (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r)
UPDATE waterway.bottlenecks b SET
  gauge_location = isrs_fromtext($2),
  gauge_validity = COALESCE(
    (SELECT validity FROM waterway.gauges g
       WHERE g.location = isrs_fromText($2)
         AND g.validity @> lower(b.validity)),
    tstzrange(NULL, NULL)),
  objnam = $3,
  nobjnm = $4,
  stretch = (SELECT r FROM r),
  area = ISRSrange_area(
    ISRSrange_axis((SELECT r FROM r), $14),
    (SELECT ST_Collect(CAST(area AS geometry))
        FROM waterway.waterway_area)),
  rb = $7,
  lb = $8,
  responsible_country = $9,
  revisiting_time = $10,
  limiting = $11,
  date_info = $12,
  source_organization = $13,
  validity = $15
WHERE bottleneck_id = $1
  AND NOT erased
  AND $12 > date_info
RETURNING id
`

	deleteBottleneckMaterialSQL = `
DELETE FROM waterway.bottlenecks_riverbed_materials
WHERE bottleneck_id = $1
  AND riverbed <> ALL($2)
RETURNING riverbed
`

	insertBottleneckMaterialSQL = `
INSERT INTO waterway.bottlenecks_riverbed_materials (
   bottleneck_id,
   riverbed
) VALUES (
   $1,
   $2
) ON CONFLICT (bottleneck_id, riverbed) DO NOTHING
`
)

type bnJobCreator struct{}

func init() {
	RegisterJobCreator(BNJobKind, bnJobCreator{})
}

func (bnJobCreator) Description() string { return "bottlenecks" }

func (bnJobCreator) AutoAccept() bool { return false }

func (bnJobCreator) Create() Job { return new(Bottleneck) }

func (bnJobCreator) Depends() [2][]string {
	return [2][]string{
		{"bottlenecks", "bottlenecks_riverbed_materials"},
		{"gauges", "distance_marks_virtual", "waterway_axis", "waterway_area"},
	}
}

const (
	bnStageDoneSQL = `
UPDATE waterway.bottlenecks SET staging_done = true
WHERE id IN (
  SELECT key from import.track_imports
  WHERE import_id = $1 AND
        relation = 'waterway.bottlenecks'::regclass)`
)

// StageDone moves the imported bottleneck out of the staging area.
func (bnJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
) error {
	_, err := tx.ExecContext(ctx, bnStageDoneSQL, id)
	return err
}

// CleanUp of a bottleneck import is a NOP.
func (*Bottleneck) CleanUp() error { return nil }

var rblbRe = regexp.MustCompile(`(..)_(..)`)

func splitRBLB(s string) (*string, *string) {
	m := rblbRe.FindStringSubmatch(s)
	if len(m) == 0 {
		return nil, nil
	}
	return &m[1], &m[2]
}

// Do executes the actual bottleneck import.
func (bn *Bottleneck) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	fetch := func() ([]*ifbn.BottleNeckType, error) {
		client := ifbn.NewIBottleneckService(bn.URL, bn.Insecure, nil)

		req := &ifbn.Export_bn_by_isrs{}

		resp, err := client.Export_bn_by_isrs(req)
		if err != nil {
			return nil, err
		}

		if resp.Export_bn_by_isrsResult == nil {
			return nil, errors.New(
				"The requested service returned no bottlenecks")
		}

		return resp.Export_bn_by_isrsResult.BottleNeckType, nil
	}

	return storeBottlenecks(ctx, fetch, importID, conn, feedback, bn.Tolerance)
}

func storeBottlenecks(
	ctx context.Context,
	fetch func() ([]*ifbn.BottleNeckType, error),
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
	tolerance float64,
) (interface{}, error) {
	start := time.Now()

	bns, err := fetch()
	if err != nil {
		return nil, err
	}

	feedback.Info("Found %d bottlenecks for import", len(bns))

	var hasStmt, insertStmt, fixValidityStmt, updateStmt,
		deleteMaterialStmt, insertMaterialStmt, trackStmt *sql.Stmt

	for _, x := range []struct {
		sql  string
		stmt **sql.Stmt
	}{
		{hasBottleneckSQL, &hasStmt},
		{insertBottleneckSQL, &insertStmt},
		{fixBNValiditySQL, &fixValidityStmt},
		{updateBottleneckSQL, &updateStmt},
		{deleteBottleneckMaterialSQL, &deleteMaterialStmt},
		{insertBottleneckMaterialSQL, &insertMaterialStmt},
		{trackImportSQL, &trackStmt},
	} {
		var err error
		if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil {
			return nil, err
		}
		defer (*x.stmt).Close()
	}

	var nids []string

	feedback.Info("Tolerance used to snap waterway axis: %g", tolerance)

	for _, bn := range bns {
		if err := storeBottleneck(
			ctx, importID, conn, feedback, bn, &nids, tolerance,
			hasStmt, insertStmt, fixValidityStmt, updateStmt,
			deleteMaterialStmt, insertMaterialStmt, trackStmt); err != nil {
			return nil, err
		}
	}
	if len(nids) == 0 {
		return nil, UnchangedError("No new bottlenecks inserted")
	}

	feedback.Info("Storing %d bottlenecks took %s", len(nids), time.Since(start))
	feedback.Info("Import of bottlenecks was successful")
	summary := struct {
		Bottlenecks []string `json:"bottlenecks"`
	}{
		Bottlenecks: nids,
	}
	return &summary, nil
}

func storeBottleneck(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
	bn *ifbn.BottleNeckType,
	nids *[]string,
	tolerance float64,
	hasStmt, insertStmt, fixValidityStmt, updateStmt,
	deleteMaterialStmt, insertMaterialStmt, trackStmt *sql.Stmt,
) error {
	feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id)

	if bn.AdditionalData == nil || bn.AdditionalData.KeyValuePair == nil {
		feedback.Warn("Missing validity information")
		return nil
	}
	const (
		fromKey = "Valid_from_date"
		toKey   = "Valid_to_date"
	)
	fromTo := make(map[string]time.Time)
	for _, kv := range bn.AdditionalData.KeyValuePair {
		k := string(kv.Key)
		if k == fromKey || k == toKey {
			if t, err := time.Parse(time.RFC3339, kv.Value); err != nil {
				return err
			} else {
				fromTo[k] = t
			}
		}
	}

	var tfrom, tto pgtype.Timestamptz
	if t, ok := fromTo[fromKey]; ok {
		tfrom.Set(t)
	} else {
		feedback.Warn("Missing start date")
		return nil
	}
	var uBound pgtype.BoundType
	if t, ok := fromTo[toKey]; ok {
		tto.Set(t)
		uBound = pgtype.Exclusive
	} else {
		uBound = pgtype.Unbounded
	}
	validity := pgtype.Tstzrange{
		Lower:     tfrom,
		Upper:     tto,
		LowerType: pgtype.Inclusive,
		UpperType: uBound,
		Status:    pgtype.Present,
	}

	rb, lb := splitRBLB(bn.Rb_lb)

	var revisitingTime *int
	if bn.Revisiting_time != nil &&
		len(strings.TrimSpace(*bn.Revisiting_time)) > 0 {
		i, err := strconv.Atoi(*bn.Revisiting_time)
		if err != nil {
			feedback.Warn(
				"Cannot convert given revisiting time '%s' to number of months",
				*bn.Revisiting_time)
		} else {
			revisitingTime = &i
		}
	}

	var limiting, country string

	if bn.Limiting_factor != nil {
		limiting = string(*bn.Limiting_factor)
	}

	if bn.Responsible_country != nil {
		country = string(*bn.Responsible_country)
	}

	var materials []string
	if bn.Riverbed != nil {
		for _, material := range bn.Riverbed.Material {
			if material != nil {
				materials = append(materials, string(*material))
			}
		}
	}

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return err
	}
	defer tx.Rollback()

	var isNew bool
	var nid int64
	err = tx.StmtContext(ctx, hasStmt).QueryRowContext(ctx,
		bn.Bottleneck_id,
		validity,
	).Scan(&isNew)
	switch {
	case err != nil:
		feedback.Warn(handleError(err).Error())
		if err2 := tx.Rollback(); err2 != nil {
			return err2
		}
		return nil
	case isNew:
		err = tx.StmtContext(ctx, insertStmt).QueryRowContext(
			ctx,
			bn.Bottleneck_id,
			&validity,
			bn.Fk_g_fid,
			bn.OBJNAM,
			bn.NOBJNM,
			bn.From_ISRS, bn.To_ISRS,
			rb,
			lb,
			country,
			revisitingTime,
			limiting,
			bn.Date_Info,
			bn.Source,
			tolerance,
		).Scan(&nid)
		if err != nil {
			feedback.Warn(handleError(err).Error())
			return nil
		}
		feedback.Info("insert new version")
	case !isNew:
		// try to update
		err := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx,
			bn.Bottleneck_id,
			bn.Fk_g_fid,
			bn.OBJNAM,
			bn.NOBJNM,
			bn.From_ISRS, bn.To_ISRS,
			rb,
			lb,
			country,
			revisitingTime,
			limiting,
			bn.Date_Info,
			bn.Source,
			tolerance,
			&validity,
		).Scan(&nid)
		switch {
		case err == sql.ErrNoRows:
			feedback.Info("unchanged")
			if err := tx.Rollback(); err != nil {
				return err
			}
			return nil
		case err != nil:
			feedback.Warn(handleError(err).Error())
			if err := tx.Rollback(); err != nil {
				return err
			}
			return nil
		default:
			feedback.Info("update")

			// Remove obsolete riverbed materials
			var pgMaterials pgtype.VarcharArray
			pgMaterials.Set(materials)
			mtls, err := tx.StmtContext(ctx,
				deleteMaterialStmt).QueryContext(ctx,
				nid,
				&pgMaterials,
			)
			if err != nil {
				return err
			}
			defer mtls.Close()
			for mtls.Next() {
				var delMat string
				if err := mtls.Scan(&delMat); err != nil {
					return err
				}
				feedback.Warn("Removed riverbed material %s", delMat)
			}
			if err := mtls.Err(); err != nil {
				return err
			}
		}
	}

	// Set end of validity of old version to start of new version
	// in case of overlap
	if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext(ctx,
		bn.Bottleneck_id,
		validity,
	); err != nil {
		feedback.Warn(handleError(err).Error())
		if err2 := tx.Rollback(); err2 != nil {
			return err2
		}
		return nil
	}

	// Insert riverbed materials
	if materials != nil {
		for _, mat := range materials {
			if _, err := tx.StmtContext(ctx,
				insertMaterialStmt).ExecContext(
				ctx, nid, mat); err != nil {
				feedback.Warn("Failed to insert riverbed material '%s'", mat)
				feedback.Warn(handleError(err).Error())
			}
		}
	}

	if _, err := tx.StmtContext(ctx, trackStmt).ExecContext(
		ctx, importID, "waterway.bottlenecks", nid,
	); err != nil {
		return err
	}
	if err = tx.Commit(); err != nil {
		return err
	}
	*nids = append(*nids, bn.Bottleneck_id)
	return nil
}