view pkg/imports/bn.go @ 1902:c4af342be999

Use waterway area as basis for bottleneck area generation.
author Sascha Wilde <wilde@intevation.de>
date Fri, 18 Jan 2019 16:11:08 +0100
parents 807569b08513
children 32c56e6c089a
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"errors"
	"regexp"
	"strconv"
	"time"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/soap/ifbn"
)

// Bottleneck is an import job to import
// bottlenecks from an IFBN SOAP service.
type Bottleneck struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

// BNJobKind is the import queue type identifier.
const BNJobKind JobKind = "bn"

const (
	hasBottleneckSQL = `
SELECT true FROM waterway.bottlenecks WHERE bottleneck_id = $1`

	insertBottleneckSQL = `
INSERT INTO waterway.bottlenecks (
  bottleneck_id,
  fk_g_fid,
  objnam,
  nobjnm,
  stretch,
  area,
  rb,
  lb,
  responsible_country,
  revisiting_time,
  limiting,
  date_info,
  source_organization
) VALUES(
  $1,
  isrs_fromText($2),
  $3,
  $4,
  isrsrange(isrs_fromText($5), isrs_fromText($6)),
  ISRSrange_area(
    isrsrange(isrs_fromText($5), isrs_fromText($6)),
    (SELECT ST_Union(CAST(area AS geometry))
        FROM waterway.waterway_area)),
  $7,
  $8,
  $9,
  $10,
  $11,
  $12,
  $13
)
RETURNING id`
)

type bnJobCreator struct{}

func init() {
	RegisterJobCreator(BNJobKind, bnJobCreator{})
}

func (bnJobCreator) Description() string { return "bottlenecks" }

func (bnJobCreator) AutoAccept() bool { return false }

func (bnJobCreator) Create(_ JobKind, data string) (Job, error) {
	bn := new(Bottleneck)
	if err := common.FromJSONString(data, bn); err != nil {
		return nil, err
	}
	return bn, nil
}

func (bnJobCreator) Depends() []string {
	return []string{
		"gauges",
		"bottlenecks",
	}
}

const (
	bnStageDoneSQL = `
UPDATE waterway.bottlenecks SET staging_done = true
WHERE id IN (
  SELECT key from waterway.track_imports
  WHERE import_id = $1 AND
        relation = 'waterway.bottlenecks'::regclass)`
)

// StageDone moves the imported bottleneck out of the staging area.
func (bnJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
) error {
	_, err := tx.ExecContext(ctx, bnStageDoneSQL, id)
	return err
}

// CleanUp of a bottleneck import is a NOP.
func (*Bottleneck) CleanUp() error { return nil }

var rblbRe = regexp.MustCompile(`(..)_(..)`)

func splitRBLB(s string) (string, string) {
	m := rblbRe.FindStringSubmatch(s)
	if len(m) == 0 {
		return "", ""
	}
	return m[1], m[2]
}

func revisitingTime(s string) int {
	v, err := strconv.Atoi(s)
	if err != nil {
		v = 0
	}
	return v
}

// Do executes the actual bottleneck import.
func (bn *Bottleneck) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {
	client := ifbn.NewIBottleneckService(bn.URL, bn.Insecure, nil)

	req := &ifbn.Export_bn_by_isrs{}

	resp, err := client.Export_bn_by_isrs(req)
	if err != nil {
		feedback.Error("%v", err)
		return nil, err
	}

	if resp.Export_bn_by_isrsResult == nil {
		err := errors.New("no Bottlenecks found")
		feedback.Error("%v", err)
		return nil, err
	}

	bns := resp.Export_bn_by_isrsResult.BottleNeckType
	feedback.Info("Found %d bottlenecks for import", len(bns))

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	var hasStmt, insertStmt, trackStmt *sql.Stmt

	for _, x := range []struct {
		sql  string
		stmt **sql.Stmt
	}{
		{hasBottleneckSQL, &hasStmt},
		{insertBottleneckSQL, &insertStmt},
		{trackImportSQL, &trackStmt},
	} {
		var err error
		if *x.stmt, err = tx.PrepareContext(ctx, x.sql); err != nil {
			return nil, err
		}
		defer (*x.stmt).Close()
	}

	var nids []string

	start := time.Now()

nextBN:
	for _, bn := range bns {

		var found bool
		err := hasStmt.QueryRowContext(ctx, bn.Bottleneck_id).Scan(&found)
		switch {
		case err == sql.ErrNoRows:
			// This is good.
		case err != nil:
			return nil, err
		case found:
			// TODO: Deep comparison database vs. SOAP.
			continue nextBN
		}

		rb, lb := splitRBLB(bn.Rb_lb)

		var limiting, country string

		if bn.Limiting_factor != nil {
			limiting = string(*bn.Limiting_factor)
		}

		if bn.Responsible_country != nil {
			country = string(*bn.Responsible_country)
		}

		var nid int64

		err = insertStmt.QueryRowContext(
			ctx,
			bn.Bottleneck_id,
			bn.Fk_g_fid,
			bn.OBJNAM,
			bn.NOBJNM,
			bn.From_ISRS, bn.To_ISRS,
			rb,
			lb,
			country,
			revisitingTime(bn.Revisiting_time),
			limiting,
			bn.Date_Info,
			bn.Source,
		).Scan(&nid)
		if err != nil {
			return nil, err
		}
		nids = append(nids, bn.Bottleneck_id)
		if _, err := trackStmt.ExecContext(
			ctx, importID, "waterway.bottlenecks", nid,
		); err != nil {
			return nil, err
		}
		feedback.Info("Inserted '%s' into database", bn.OBJNAM)
	}
	if len(nids) == 0 {
		feedback.Error("No new bottlenecks found")
		return nil, errors.New("No new bottlenecks found")
	}

	feedback.Info("Storing %d bottlenecks took %s", len(nids), time.Since(start))
	if err = tx.Commit(); err == nil {
		feedback.Info("Import of bottlenecks was successful")
	}

	summary := struct {
		Bottlenecks []string `json:"bottlenecks"`
	}{
		Bottlenecks: nids,
	}
	return &summary, err
}