view pkg/imports/fa.go @ 3681:c9e1848a516a

Handle violation of responsibility areas graceful for BN import. As many others this is an error, which should only affect the specific bottleneck failing and not the whole import.
author Sascha Wilde <wilde@intevation.de>
date Tue, 18 Jun 2019 12:43:01 +0200
parents db87f34805fb
children 1eb39e9e8ec2 7fb75deff16b
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Raimund Renkert <raimund.renkert@intevation.de>
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"errors"
	"fmt"
	"sort"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/soap/ifaf"
)

// FairwayAvailability imports fairway availability data
// from an IFAF SOAP service.
type FairwayAvailability struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

type uniqueFairwayAvailability struct {
	BottleneckId string
	Surdat       time.Time
}

// FAJobKind is import queue type identifier.
const FAJobKind JobKind = "fa"

const (
	listBottlenecksSQL = `
SELECT DISTINCT
  bottleneck_id
FROM waterway.bottlenecks
WHERE responsible_country = users.current_user_country()
  AND staging_done = true
ORDER BY bottleneck_id
`

	latestMeasureDateSQL = `
SELECT
	measure_date
FROM waterway.effective_fairway_availability
ORDER BY measure_date DESC LIMIT 1
`
	listFairwayAvailabilitySQL = `
SELECT
  fa.id,
  bn.bottleneck_id,
  fa.surdat
FROM waterway.fairway_availability fa
JOIN waterway.bottlenecks bn ON bn.id = fa.bottleneck_id
`
	insertFASQL = `
INSERT INTO waterway.fairway_availability (
  position_code,
  bottleneck_id,
  surdat,
  critical,
  date_info,
  source_organization
) VALUES (
  $1,
  -- Always associate fairway availability data to newest bottleneck
  -- version to prevent problems in analysis over longer time periods
  (SELECT id FROM waterway.bottlenecks WHERE bottleneck_id = $2
     ORDER BY validity DESC FETCH FIRST ROW ONLY),
  $3,
  $4,
  $5,
  $6
)
RETURNING id`

	insertBnPdfsSQL = `
INSERT INTO waterway.bottleneck_pdfs (
  fairway_availability_id,
  profile_pdf_filename,
  profile_pdf_url,
  pdf_generation_date,
  source_organization
) VALUES (
  $1,
  $2,
  $3,
  $4,
  $5
) ON CONFLICT ON CONSTRAINT bottleneck_pdfs_pkey DO NOTHING`
	insertEFASQL = `
INSERT INTO waterway.effective_fairway_availability (
  fairway_availability_id,
  measure_date,
  level_of_service,
  available_depth_value,
  available_width_value,
  water_level_value,
  measure_type,
  source_organization,
  forecast_generation_time,
  value_lifetime
) VALUES (
  $1,
  $2,
  (SELECT
    level_of_service
  FROM levels_of_service
  WHERE name = $3),
  $4,
  $5,
  $6,
  $7,
  $8,
  $9,
  $10
) ON CONFLICT ON CONSTRAINT effective_fairway_availability_pkey DO NOTHING`
	insertFAVSQL = `
INSERT INTO waterway.fa_reference_values (
  fairway_availability_id,
  level_of_service,
  fairway_depth,
  fairway_width,
  fairway_radius,
  shallowest_spot
) VALUES (
  $1,
  (SELECT
    level_of_service
  FROM levels_of_service
  WHERE name = $2),
  $3,
  $4,
  $5,
  ST_MakePoint($6, $7)::geography
)ON CONFLICT ON CONSTRAINT fa_reference_values_pkey DO NOTHING`
)

type faJobCreator struct{}

func init() {
	RegisterJobCreator(FAJobKind, faJobCreator{})
}

func (faJobCreator) Description() string {
	return "fairway availability"
}

func (faJobCreator) Create() Job { return new(FairwayAvailability) }

func (faJobCreator) Depends() [2][]string {
	return [2][]string{
		{"effective_fairway_availability", "fa_reference_values",
			"bottleneck_pdfs", "fairway_availability"},
		{"bottlenecks", "levels_of_service"},
	}
}

func (faJobCreator) AutoAccept() bool { return true }

// StageDone moves the imported fairway availablities out of the staging area.
// Currently doing nothing.
func (faJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
	return nil
}

// CleanUp of a fairway availablities import is a NOP.
func (*FairwayAvailability) CleanUp() error { return nil }

type bottlenecks []string

func (bns bottlenecks) contains(bn string) bool {
	idx := sort.SearchStrings(bns, bn)
	return idx < len(bns) && bns[idx] == bn
}

func loadBottleneckCountries(ctx context.Context, tx *sql.Tx) (bottlenecks, error) {

	// Get available bottlenecks from database for use as filter in SOAP request
	rows, err := tx.QueryContext(ctx, listBottlenecksSQL)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var bns bottlenecks

	for rows.Next() {
		var bn string
		if err = rows.Scan(&bn); err != nil {
			return nil, err
		}
		bns = append(bns, bn)
	}
	if err = rows.Err(); err != nil {
		return nil, err
	}

	return bns, nil
}

func loadFairwayAvailabilities(ctx context.Context, tx *sql.Tx) (map[uniqueFairwayAvailability]int64, error) {
	rows, err := tx.QueryContext(ctx, listFairwayAvailabilitySQL)
	if err != nil {
		return nil, err
	}
	defer rows.Close()
	fairwayAvailabilities := map[uniqueFairwayAvailability]int64{}
	for rows.Next() {
		var id int64
		var bnId string
		var sd time.Time
		if err = rows.Scan(
			&id,
			&bnId,
			&sd,
		); err != nil {
			return nil, err
		}
		key := uniqueFairwayAvailability{
			BottleneckId: bnId,
			Surdat:       sd,
		}
		fairwayAvailabilities[key] = id
	}
	if err = rows.Err(); err != nil {
		return nil, err
	}
	return fairwayAvailabilities, nil
}

func latestDate(ctx context.Context, tx *sql.Tx) (pgtype.Timestamp, error) {
	var date pgtype.Timestamp
	err := tx.QueryRowContext(ctx, latestMeasureDateSQL).Scan(&date)
	switch {
	case err == sql.ErrNoRows:
		date = pgtype.Timestamp{
			// Fill Database with data of the last 5 days. Change this to a more useful value.
			Time: time.Now().AddDate(0, 0, -5),
		}
	case err != nil:
		return pgtype.Timestamp{}, err
	}
	return date, nil
}

func storeFairwayAvailability(
	ctx context.Context,
	conn *sql.Conn,
	feedback Feedback,
	fetch func(context.Context, *sql.Tx, bottlenecks) ([]*ifaf.FairwayAvailability, error),
) (interface{}, error) {

	start := time.Now()

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	bns, err := loadBottleneckCountries(ctx, tx)
	if err != nil {
		return nil, err
	}

	fas, err := fetch(ctx, tx, bns)
	if err != nil {
		return nil, err
	}

	fairwayAvailabilities, err := loadFairwayAvailabilities(ctx, tx)
	if err != nil {
		return nil, err
	}

	faids, err := doForFAs(ctx, bns, fairwayAvailabilities, fas, tx, feedback)
	if err != nil {
		return nil, fmt.Errorf("Error processing data: %v", err)
	}
	if len(faids) == 0 {
		feedback.Info("No new fairway availablity data found")
		return nil, UnchangedError("No new fairway availablity data found")
	}
	feedback.Info("Processed %d fairway availabilities", len(faids))

	if err = tx.Commit(); err != nil {
		feedback.Info(
			"Importing fairway availabilities failed after %s", time.Since(start))
		return nil, err
	}
	feedback.Info(
		"Importing fairway availabilities successfully took %s", time.Since(start))

	// TODO: needs to be filled more useful.
	summary := struct {
		FairwayAvailabilities []string `json:"fairwayAvailabilities"`
	}{
		FairwayAvailabilities: faids,
	}
	return &summary, nil
}

func doForFAs(
	ctx context.Context,
	bnIds bottlenecks,
	fairwayAvailabilities map[uniqueFairwayAvailability]int64,
	fas []*ifaf.FairwayAvailability,
	tx *sql.Tx,
	feedback Feedback,
) ([]string, error) {

	insertFAStmt, err := tx.PrepareContext(ctx, insertFASQL)
	if err != nil {
		return nil, err
	}
	defer insertFAStmt.Close()
	insertBnPdfsStmt, err := tx.PrepareContext(ctx, insertBnPdfsSQL)
	if err != nil {
		return nil, err
	}
	defer insertBnPdfsStmt.Close()
	insertEFAStmt, err := tx.PrepareContext(ctx, insertEFASQL)
	if err != nil {
		return nil, err
	}
	defer insertEFAStmt.Close()
	insertFAVStmt, err := tx.PrepareContext(ctx, insertFAVSQL)
	if err != nil {
		return nil, err
	}
	defer insertFAVStmt.Close()

	var faIDs []string
	var faID int64
	feedback.Info("Found %d fairway availabilities", len(fas))
	for _, faRes := range fas {
		if !bnIds.contains(faRes.Bottleneck_id) {
			feedback.Warn("Bottleneck %s not found in database.", faRes.Bottleneck_id)
			continue
		}
		uniqueFa := uniqueFairwayAvailability{
			BottleneckId: faRes.Bottleneck_id,
			Surdat:       faRes.SURDAT,
		}
		var found bool
		if faID, found = fairwayAvailabilities[uniqueFa]; !found {
			err = insertFAStmt.QueryRowContext(
				ctx,
				faRes.POSITION,
				faRes.Bottleneck_id,
				faRes.SURDAT,
				faRes.Critical,
				faRes.Date_Info,
				faRes.Source,
			).Scan(&faID)
			if err != nil {
				return nil, err
			}
			fairwayAvailabilities[uniqueFa] = faID
		}
		feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id)
		faIDs = append(faIDs, faRes.Bottleneck_id)
		if faRes.Bottleneck_PDFs != nil {
			bnPdfCount := 0
			for _, bnPdfs := range faRes.Bottleneck_PDFs.PdfInfo {
				res, err := insertBnPdfsStmt.ExecContext(
					ctx,
					faID,
					bnPdfs.ProfilePdfFilename,
					bnPdfs.ProfilePdfURL,
					bnPdfs.PDF_Generation_Date,
					bnPdfs.Source,
				)
				if err != nil {
					return nil, err
				}
				affected, err := res.RowsAffected()
				if err == nil {
					bnPdfCount += int(affected)
				} else {
					bnPdfCount++
				}
			}
			feedback.Info("Add %d Pdfs", bnPdfCount)
		}
		if faRes.Effective_fairway_availability != nil {
			efaCount := 0
			for _, efa := range faRes.Effective_fairway_availability.EffectiveFairwayAvailability {
				los := efa.Level_of_Service
				fgt := efa.Forecast_generation_time
				if efa.Forecast_generation_time.Status == pgtype.Undefined {
					fgt = pgtype.Timestamp{
						Status: pgtype.Null,
					}
				}
				res, err := insertEFAStmt.ExecContext(
					ctx,
					faID,
					efa.Measure_date,
					string(*los),
					efa.Available_depth_value,
					efa.Available_width_value,
					efa.Water_level_value,
					efa.Measure_type,
					efa.Source,
					fgt.Get(),
					efa.Value_lifetime,
				)
				if err != nil {
					return nil, err
				}
				affected, err := res.RowsAffected()
				if err == nil {
					efaCount += int(affected)
				} else {
					efaCount++
				}
			}
			feedback.Info("Add %d Effective Fairway Availability", efaCount)
		}

		if faRes.Reference_values != nil {
			rvCount := 0
			for _, fav := range faRes.Reference_values.ReferenceValue {
				res, err := insertFAVStmt.ExecContext(
					ctx,
					faID,
					fav.Level_of_Service,
					fav.Fairway_depth,
					fav.Fairway_width,
					fav.Fairway_radius,
					fav.Shallowest_spot_Lat,
					fav.Shallowest_spot_Lon,
				)
				if err != nil {
					return nil, err
				}
				affected, err := res.RowsAffected()
				if err == nil {
					rvCount += int(affected)
				} else {
					rvCount++
				}
			}
			feedback.Info("Add %d Reference Values", rvCount)
		}
	}
	return faIDs, nil
}

// Do executes the actual fairway availability import.
func (fa *FairwayAvailability) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	fetch := func(
		ctx context.Context,
		tx *sql.Tx, bns bottlenecks,
	) ([]*ifaf.FairwayAvailability, error) {

		latest, err := latestDate(ctx, tx)
		if err != nil {
			return nil, err
		}

		client := ifaf.NewFairwayAvailabilityService(fa.URL, fa.Insecure, nil)

		period := ifaf.RequestedPeriod{
			Date_start: latest.Time,
			Date_end:   time.Now(),
		}

		ids := ifaf.ArrayOfString{String: bns}

		req := &ifaf.Get_bottleneck_fa{
			Bottleneck_id: &ids,
			Period:        &period,
		}
		resp, err := client.Get_bottleneck_fa(req)
		if err != nil {
			return nil, err
		}

		if resp.Get_bottleneck_faResult == nil {
			return nil, errors.New("no fairway availabilities found")
		}

		result := resp.Get_bottleneck_faResult
		return result.FairwayAvailability, nil
	}

	return storeFairwayAvailability(ctx, conn, feedback, fetch)
}