view pkg/imports/fa.go @ 2248:cce158db02b0

Input area as multipolygons to generate area from stretch Doing so is more resilient against invalid geometries and gives more plausible results if tributaries are involved (i.e. does not include the adjacent area of the tributary in the result).
author Tom Gottfried <tom@intevation.de>
date Wed, 13 Feb 2019 16:48:52 +0100
parents 137addc77b1b
children cfc523c70e90
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Raimund Renkert <raimund.renkert@intevation.de>
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"errors"
	"fmt"
	"sort"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/soap/ifaf"
)

// FairwayAvailability imports fairway availability data
// from an IFAF SOAP service.
type FairwayAvailability struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

type uniqueFairwayAvailability struct {
	BottleneckId string
	Surdat       time.Time
}

// FAJobKind is import queue type identifier.
const FAJobKind JobKind = "fa"

const (
	listBottlenecksSQL = `
SELECT
  bottleneck_id
FROM waterway.bottlenecks
WHERE responsible_country = users.current_user_country()
  AND staging_done = true
ORDER BY bottleneck_id
`

	latestMeasureDateSQL = `
SELECT
	measure_date
FROM waterway.effective_fairway_availability
ORDER BY measure_date DESC LIMIT 1
`
	listFairwayAvailabilitySQL = `
SELECT
  fa.id,
  bn.bottleneck_id,
  fa.surdat
FROM waterway.fairway_availability fa
JOIN waterway.bottlenecks bn ON bn.id = fa.bottleneck_id
`
	insertFASQL = `
INSERT INTO waterway.fairway_availability (
  position_code,
  bottleneck_id,
  surdat,
  critical,
  date_info,
  source_organization
) VALUES (
  $1,
  (SELECT id FROM waterway.bottlenecks WHERE bottleneck_id = $2),
  $3,
  $4,
  $5,
  $6
)
RETURNING id`

	insertBnPdfsSQL = `
INSERT INTO waterway.bottleneck_pdfs (
  fairway_availability_id,
  profile_pdf_filename,
  profile_pdf_url,
  pdf_generation_date,
  source_organization
) VALUES (
  $1,
  $2,
  $3,
  $4,
  $5
) ON CONFLICT ON CONSTRAINT bottleneck_pdfs_pkey DO NOTHING`
	insertEFASQL = `
INSERT INTO waterway.effective_fairway_availability (
  fairway_availability_id,
  measure_date,
  level_of_service,
  available_depth_value,
  available_width_value,
  water_level_value,
  measure_type,
  source_organization,
  forecast_generation_time,
  value_lifetime
) VALUES (
  $1,
  $2,
  (SELECT
    level_of_service
  FROM levels_of_service
  WHERE name = $3),
  $4,
  $5,
  $6,
  $7,
  $8,
  $9,
  $10
) ON CONFLICT ON CONSTRAINT effective_fairway_availability_pkey DO NOTHING`
	insertFAVSQL = `
INSERT INTO waterway.fa_reference_values (
  fairway_availability_id,
  level_of_service,
  fairway_depth,
  fairway_width,
  fairway_radius,
  shallowest_spot
) VALUES (
  $1,
  (SELECT
    level_of_service
  FROM levels_of_service
  WHERE name = $2),
  $3,
  $4,
  $5,
  ST_MakePoint($6, $7)::geography
)ON CONFLICT ON CONSTRAINT fa_reference_values_pkey DO NOTHING`
)

type faJobCreator struct{}

func init() {
	RegisterJobCreator(FAJobKind, faJobCreator{})
}

func (faJobCreator) Description() string {
	return "fairway availability"
}

func (faJobCreator) Create() Job { return new(FairwayAvailability) }

func (faJobCreator) Depends() []string {
	return []string{
		"bottlenecks",
		"fairway_availability",
		"bottleneck_pdfs",
		"effective_fairway_availability",
		"fa_reference_values",
		"levels_of_service",
	}
}

func (faJobCreator) AutoAccept() bool { return true }

// StageDone moves the imported fairway availablities out of the staging area.
// Currently doing nothing.
func (faJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
	return nil
}

// CleanUp of a fairway availablities import is a NOP.
func (*FairwayAvailability) CleanUp() error { return nil }

type bottlenecks []string

func (bns bottlenecks) contains(bn string) bool {
	idx := sort.SearchStrings(bns, bn)
	return idx < len(bns) && bns[idx] == bn
}

func loadBottleneckCountries(ctx context.Context, tx *sql.Tx) (bottlenecks, error) {

	// Get available bottlenecks from database for use as filter in SOAP request
	rows, err := tx.QueryContext(ctx, listBottlenecksSQL)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var bns bottlenecks

	for rows.Next() {
		var bn string
		if err = rows.Scan(&bn); err != nil {
			return nil, err
		}
		bns = append(bns, bn)
	}
	if err = rows.Err(); err != nil {
		return nil, err
	}

	return bns, nil
}

func loadFairwayAvailabilities(ctx context.Context, tx *sql.Tx) (map[uniqueFairwayAvailability]int64, error) {
	rows, err := tx.QueryContext(ctx, listFairwayAvailabilitySQL)
	if err != nil {
		return nil, err
	}
	defer rows.Close()
	fairwayAvailabilities := map[uniqueFairwayAvailability]int64{}
	for rows.Next() {
		var id int64
		var bnId string
		var sd time.Time
		if err = rows.Scan(
			&id,
			&bnId,
			&sd,
		); err != nil {
			return nil, err
		}
		key := uniqueFairwayAvailability{
			BottleneckId: bnId,
			Surdat:       sd,
		}
		fairwayAvailabilities[key] = id
	}
	if err = rows.Err(); err != nil {
		return nil, err
	}
	return fairwayAvailabilities, nil
}

func latestDate(ctx context.Context, tx *sql.Tx) (pgtype.Timestamp, error) {
	var date pgtype.Timestamp
	err := tx.QueryRowContext(ctx, latestMeasureDateSQL).Scan(&date)
	switch {
	case err == sql.ErrNoRows:
		date = pgtype.Timestamp{
			// Fill Database with data of the last 5 days. Change this to a more useful value.
			Time: time.Now().AddDate(0, 0, -5),
		}
	case err != nil:
		return pgtype.Timestamp{}, err
	}
	return date, nil
}

func storeFairwayAvailability(
	ctx context.Context,
	conn *sql.Conn,
	feedback Feedback,
	fetch func(context.Context, *sql.Tx, bottlenecks) ([]*ifaf.FairwayAvailability, error),
) (interface{}, error) {

	start := time.Now()

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	bns, err := loadBottleneckCountries(ctx, tx)
	if err != nil {
		return nil, err
	}

	fas, err := fetch(ctx, tx, bns)
	if err != nil {
		return nil, err
	}

	fairwayAvailabilities, err := loadFairwayAvailabilities(ctx, tx)
	if err != nil {
		return nil, err
	}

	faids, err := doForFAs(ctx, bns, fairwayAvailabilities, fas, tx, feedback)
	if err != nil {
		return nil, fmt.Errorf("Error processing data: %v", err)
	}
	if len(faids) == 0 {
		feedback.Info("No new fairway availablity data found")
		return nil, UnchangedError("No new fairway availablity data found")
	}
	feedback.Info("Processed %d fairway availabilities", len(faids))

	if err = tx.Commit(); err != nil {
		feedback.Info(
			"Importing fairway availabilities failed after %s", time.Since(start))
		return nil, err
	}
	feedback.Info(
		"Importing fairway availabilities successfully took %s", time.Since(start))

	// TODO: needs to be filled more useful.
	summary := struct {
		FairwayAvailabilities []string `json:"fairwayAvailabilities"`
	}{
		FairwayAvailabilities: faids,
	}
	return &summary, nil
}

func doForFAs(
	ctx context.Context,
	bnIds bottlenecks,
	fairwayAvailabilities map[uniqueFairwayAvailability]int64,
	fas []*ifaf.FairwayAvailability,
	tx *sql.Tx,
	feedback Feedback,
) ([]string, error) {

	insertFAStmt, err := tx.PrepareContext(ctx, insertFASQL)
	if err != nil {
		return nil, err
	}
	defer insertFAStmt.Close()
	insertBnPdfsStmt, err := tx.PrepareContext(ctx, insertBnPdfsSQL)
	if err != nil {
		return nil, err
	}
	defer insertBnPdfsStmt.Close()
	insertEFAStmt, err := tx.PrepareContext(ctx, insertEFASQL)
	if err != nil {
		return nil, err
	}
	defer insertEFAStmt.Close()
	insertFAVStmt, err := tx.PrepareContext(ctx, insertFAVSQL)
	if err != nil {
		return nil, err
	}
	defer insertFAVStmt.Close()

	var faIDs []string
	var faID int64
	feedback.Info("Found %d fairway availabilities", len(fas))
	for _, faRes := range fas {
		if !bnIds.contains(faRes.Bottleneck_id) {
			feedback.Warn("Bottleneck %s not found in database.", faRes.Bottleneck_id)
			continue
		}
		uniqueFa := uniqueFairwayAvailability{
			BottleneckId: faRes.Bottleneck_id,
			Surdat:       faRes.SURDAT,
		}
		var found bool
		if faID, found = fairwayAvailabilities[uniqueFa]; !found {
			err = insertFAStmt.QueryRowContext(
				ctx,
				faRes.POSITION,
				faRes.Bottleneck_id,
				faRes.SURDAT,
				faRes.Critical,
				faRes.Date_Info,
				faRes.Source,
			).Scan(&faID)
			if err != nil {
				return nil, err
			}
			fairwayAvailabilities[uniqueFa] = faID
		}
		feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id)
		faIDs = append(faIDs, faRes.Bottleneck_id)
		if faRes.Bottleneck_PDFs != nil {
			bnPdfCount := 0
			for _, bnPdfs := range faRes.Bottleneck_PDFs.PdfInfo {
				res, err := insertBnPdfsStmt.ExecContext(
					ctx,
					faID,
					bnPdfs.ProfilePdfFilename,
					bnPdfs.ProfilePdfURL,
					bnPdfs.PDF_Generation_Date,
					bnPdfs.Source,
				)
				if err != nil {
					return nil, err
				}
				affected, err := res.RowsAffected()
				if err == nil {
					bnPdfCount += int(affected)
				} else {
					bnPdfCount++
				}
			}
			feedback.Info("Add %d Pdfs", bnPdfCount)
		}
		if faRes.Effective_fairway_availability != nil {
			efaCount := 0
			for _, efa := range faRes.Effective_fairway_availability.EffectiveFairwayAvailability {
				los := efa.Level_of_Service
				fgt := efa.Forecast_generation_time
				if efa.Forecast_generation_time.Status == pgtype.Undefined {
					fgt = pgtype.Timestamp{
						Status: pgtype.Null,
					}
				}
				res, err := insertEFAStmt.ExecContext(
					ctx,
					faID,
					efa.Measure_date,
					string(*los),
					efa.Available_depth_value,
					efa.Available_width_value,
					efa.Water_level_value,
					efa.Measure_type,
					efa.Source,
					fgt.Get(),
					efa.Value_lifetime,
				)
				if err != nil {
					return nil, err
				}
				affected, err := res.RowsAffected()
				if err == nil {
					efaCount += int(affected)
				} else {
					efaCount++
				}
			}
			feedback.Info("Add %d Effective Fairway Availability", efaCount)
		}

		if faRes.Reference_values != nil {
			rvCount := 0
			for _, fav := range faRes.Reference_values.ReferenceValue {
				res, err := insertFAVStmt.ExecContext(
					ctx,
					faID,
					fav.Level_of_Service,
					fav.Fairway_depth,
					fav.Fairway_width,
					fav.Fairway_radius,
					fav.Shallowest_spot_Lat,
					fav.Shallowest_spot_Lon,
				)
				if err != nil {
					return nil, err
				}
				affected, err := res.RowsAffected()
				if err == nil {
					rvCount += int(affected)
				} else {
					rvCount++
				}
			}
			feedback.Info("Add %d Reference Values", rvCount)
		}
	}
	return faIDs, nil
}

// Do executes the actual fairway availability import.
func (fa *FairwayAvailability) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	fetch := func(
		ctx context.Context,
		tx *sql.Tx, bns bottlenecks,
	) ([]*ifaf.FairwayAvailability, error) {

		latest, err := latestDate(ctx, tx)
		if err != nil {
			return nil, err
		}

		client := ifaf.NewFairwayAvailabilityService(fa.URL, fa.Insecure, nil)

		var period ifaf.RequestedPeriod
		period.Date_start = latest.Time
		period.Date_end = time.Now()

		ids := ifaf.ArrayOfString{String: bns}

		req := &ifaf.Get_bottleneck_fa{
			Bottleneck_id: &ids,
			Period:        &period,
		}
		resp, err := client.Get_bottleneck_fa(req)
		if err != nil {
			return nil, err
		}

		if resp.Get_bottleneck_faResult == nil {
			return nil, errors.New("no fairway availabilities found")
		}

		result := resp.Get_bottleneck_faResult
		return result.FairwayAvailability, nil
	}

	return storeFairwayAvailability(ctx, conn, feedback, fetch)
}