view pkg/imports/fa.go @ 5560:f2204f91d286

Join the log lines of imports to the log exports to recover data from them. Used in SR export to extract information that where in the meta json but now are only found in the log.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 09 Feb 2022 18:34:40 +0100
parents 4bc14bea3fc9
children 2dd155cc95ec
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018,2019,2020 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Raimund Renkert <raimund.renkert@intevation.de>
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Sascha Wilde <wilde@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"errors"
	"fmt"
	"sort"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/soap/ifaf"
)

// FairwayAvailability imports fairway availability data
// from an IFAF SOAP service.
type FairwayAvailability struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

const (
	// FAJobKind is import queue type identifier.
	FAJobKind JobKind = "fa"
)

const (
	// maxHistoryDays is the numbers of days to look back.
	maxHistoryDays = 7
)

// Find bottlenecks of the current users country for a given time range
const (
	listBottlenecksSQL = `
SELECT DISTINCT
  bottleneck_id
FROM waterway.bottlenecks
WHERE responsible_country = (
    SELECT country FROM users.list_users WHERE username = current_user)
  AND staging_done = true
  AND validity && $1
`

	latestMeasureDateSQL = `
SELECT CASE WHEN (SELECT array_agg(DISTINCT bottleneck_id) @> $1::varchar[]
                    FROM waterway.fairway_availability)
       THEN (SELECT min(x.m) FROM
              (SELECT max(efa.measure_date) AS m
                FROM waterway.fairway_availability fa,
                     waterway.effective_fairway_availability efa
                WHERE fa.bottleneck_id = ANY($1)
                  AND efa.fairway_availability_id = fa.id
                GROUP BY fa.bottleneck_id) AS x)
       END
`

	insertFASQL = `
INSERT INTO waterway.fairway_availability (
  position,
  bottleneck_id,
  surdat,
  critical,
  date_info,
  source_organization
) VALUES (
  $1,
  $2,
  $3,
  $4,
  $5,
  $6
) ON CONFLICT (bottleneck_id, surdat) DO UPDATE SET
  position = EXCLUDED.position,
  critical = EXCLUDED.critical,
  date_info = EXCLUDED.date_info,
  source_organization = EXCLUDED.source_organization
RETURNING id`

	insertBnPdfsSQL = `
INSERT INTO waterway.bottleneck_pdfs (
  fairway_availability_id,
  profile_pdf_filename,
  profile_pdf_url,
  pdf_generation_date,
  source_organization
) VALUES (
  $1,
  $2,
  $3,
  $4,
  $5
) ON CONFLICT ON CONSTRAINT bottleneck_pdfs_pkey DO NOTHING`

	insertEFASQL = `
INSERT INTO waterway.effective_fairway_availability (
  fairway_availability_id,
  measure_date,
  level_of_service,
  available_depth_value,
  available_width_value,
  water_level_value,
  measure_type,
  source_organization,
  forecast_generation_time,
  value_lifetime
) VALUES (
  $1,
  $2,
  (SELECT
    level_of_service
  FROM levels_of_service
  WHERE name = $3),
  $4,
  $5,
  $6,
  $7,
  $8,
  $9,
  $10
) ON CONFLICT ON CONSTRAINT effective_fairway_availability_pkey DO NOTHING`

	insertFAVSQL = `
INSERT INTO waterway.fa_reference_values (
  fairway_availability_id,
  level_of_service,
  fairway_depth,
  fairway_width,
  fairway_radius,
  shallowest_spot
) VALUES (
  $1,
  (SELECT
    level_of_service
  FROM levels_of_service
  WHERE name = $2),
  $3,
  $4,
  $5,
  ST_MakePoint($6, $7)::geography
)ON CONFLICT ON CONSTRAINT fa_reference_values_pkey DO NOTHING`
)

// Description gives a short info about relevant facts of this import.
func (fa *FairwayAvailability) Description([]string) (string, error) {
	return fa.URL, nil
}

type faJobCreator struct{}

func init() {
	RegisterJobCreator(FAJobKind, faJobCreator{})
}

func (faJobCreator) Description() string {
	return "fairway availability"
}

func (faJobCreator) Create() Job { return new(FairwayAvailability) }

func (faJobCreator) Depends() [2][]string {
	return [2][]string{
		{"effective_fairway_availability", "fa_reference_values",
			"bottleneck_pdfs", "fairway_availability"},
		{"bottlenecks", "levels_of_service"},
	}
}

func (faJobCreator) AutoAccept() bool { return true }

// StageDone is a NOP for fairway availability imports.
func (faJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error {
	return nil
}

// CleanUp of a fairway availablities import is a NOP.
func (*FairwayAvailability) CleanUp() error { return nil }

type bottlenecks []string

func (bns bottlenecks) contains(bn string) bool {
	idx := sort.SearchStrings(bns, bn)
	return idx < len(bns) && bns[idx] == bn
}

func loadBottleneckCountries(ctx context.Context, tx *sql.Tx) (bottlenecks, error) {

	// Get available bottlenecks from database for use as filter in SOAP request
	// We are only interested in bottlenecks which were valid in the last
	// maxHistoryDays
	var tfrom pgtype.Timestamptz
	tfrom.Set(time.Now().AddDate(0, 0, -maxHistoryDays))
	trange := pgtype.Tstzrange{
		Lower:     tfrom,
		LowerType: pgtype.Inclusive,
		UpperType: pgtype.Unbounded,
		Status:    pgtype.Present,
	}

	rows, err := tx.QueryContext(ctx, listBottlenecksSQL, trange)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var bns bottlenecks

	for rows.Next() {
		var bn string
		if err = rows.Scan(&bn); err != nil {
			return nil, err
		}
		bns = append(bns, bn)
	}
	if err = rows.Err(); err != nil {
		return nil, err
	}

	// bottlenecks MUST be sorted according to the sort packages
	// expectations for contains() to work
	sort.Strings(bns)
	return bns, nil
}

// Get the earliest of all the latest measure_dates for a list of bottlenecks.
// We do not pick the latest of all dates, so that we don't miss data if one of
// the bottlenecks we are taking into account was not active for some time...
func latestDate(
	ctx context.Context,
	tx *sql.Tx,
	bns bottlenecks,
) (pgtype.Timestamp, error) {
	var (
		date  pgtype.Timestamp
		pgbns pgtype.TextArray
	)
	pgbns.Set(bns)
	err := tx.QueryRowContext(ctx, latestMeasureDateSQL, &pgbns).Scan(&date)
	switch {
	case err == sql.ErrNoRows:
		date = pgtype.Timestamp{
			Time: time.Now().AddDate(0, 0, -maxHistoryDays),
		}
	case err != nil:
		return pgtype.Timestamp{}, err
	}
	// Limit request range to MaxHistoryDays.  Otherwise the Request might
	// fail, e.g. the AT service has an upper liimit of 10000 results.
	//
	//  FIXME: the better solution would be to detect such errors and
	//    dynamically and implement some kind of chunking...
	if time.Since(date.Time).Hours() > maxHistoryDays*24 {
		date = pgtype.Timestamp{
			Time: time.Now().AddDate(0, 0, -maxHistoryDays),
		}
	}

	return date, nil
}

func storeFairwayAvailability(
	ctx context.Context,
	conn *sql.Conn,
	feedback Feedback,
	fetch func(context.Context, *sql.Tx, bottlenecks) ([]*ifaf.FairwayAvailability, error),
) (interface{}, error) {

	start := time.Now()

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	bns, err := loadBottleneckCountries(ctx, tx)
	if err != nil {
		return nil, err
	}

	fas, err := fetch(ctx, tx, bns)
	if err != nil {
		return nil, err
	}

	faids, err := doForFAs(ctx, bns, fas, tx, feedback)
	if err != nil {
		return nil, fmt.Errorf("Error processing data: %v", err)
	}
	if len(faids) == 0 {
		feedback.Info("No new fairway availablity data found")
		return nil, UnchangedError("No new fairway availablity data found")
	}
	feedback.Info("Processed %d fairway availabilities", len(faids))

	if err = tx.Commit(); err != nil {
		feedback.Info(
			"Importing fairway availabilities failed after %s", time.Since(start))
		return nil, err
	}
	feedback.Info(
		"Importing fairway availabilities successfully took %s", time.Since(start))

	// TODO: needs to be filled more useful.
	summary := struct {
		FairwayAvailabilities []string `json:"fairwayAvailabilities"`
	}{
		FairwayAvailabilities: faids,
	}
	return &summary, nil
}

// defaultLOS defaults to LOS3 when no expicit LOS is given.
func defaultLOS(los *ifaf.LosEnum) ifaf.LosEnum {
	if los == nil {
		return ifaf.LosEnumLOS3
	}
	return *los
}

func doForFAs(
	ctx context.Context,
	bnIds bottlenecks,
	fas []*ifaf.FairwayAvailability,
	tx *sql.Tx,
	feedback Feedback,
) ([]string, error) {

	insertFAStmt, err := tx.PrepareContext(ctx, insertFASQL)
	if err != nil {
		return nil, err
	}
	defer insertFAStmt.Close()
	insertBnPdfsStmt, err := tx.PrepareContext(ctx, insertBnPdfsSQL)
	if err != nil {
		return nil, err
	}
	defer insertBnPdfsStmt.Close()
	insertEFAStmt, err := tx.PrepareContext(ctx, insertEFASQL)
	if err != nil {
		return nil, err
	}
	defer insertEFAStmt.Close()
	insertFAVStmt, err := tx.PrepareContext(ctx, insertFAVSQL)
	if err != nil {
		return nil, err
	}
	defer insertFAVStmt.Close()

	var faIDs []string
	var faID int64
	feedback.Info("Found %d fairway availabilities", len(fas))
	for _, faRes := range fas {
		// FIXME: The following test is propably unneccessary as already
		//   done by DB constraints...  [sw]
		if !bnIds.contains(faRes.Bottleneck_id) {
			feedback.Warn("Bottleneck %s not found in database.", faRes.Bottleneck_id)
			continue
		}
		err = insertFAStmt.QueryRowContext(
			ctx,
			faRes.POSITION,
			faRes.Bottleneck_id,
			faRes.SURDAT,
			faRes.Critical,
			faRes.Date_Info,
			faRes.Source,
		).Scan(&faID)
		if err != nil {
			return nil, err
		}
		feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id)
		faIDs = append(faIDs, faRes.Bottleneck_id)
		if faRes.Bottleneck_PDFs != nil {
			bnPdfCount := 0
			for _, bnPdfs := range faRes.Bottleneck_PDFs.PdfInfo {
				res, err := insertBnPdfsStmt.ExecContext(
					ctx,
					faID,
					bnPdfs.ProfilePdfFilename,
					bnPdfs.ProfilePdfURL,
					bnPdfs.PDF_Generation_Date,
					bnPdfs.Source,
				)
				if err != nil {
					return nil, err
				}
				affected, err := res.RowsAffected()
				if err == nil {
					bnPdfCount += int(affected)
				} else {
					bnPdfCount++
				}
			}
			feedback.Info("Add %d Pdfs", bnPdfCount)
		}
		if faRes.Effective_fairway_availability != nil {
			efaCount := 0
			for _, efa := range faRes.Effective_fairway_availability.EffectiveFairwayAvailability {
				fgt := efa.Forecast_generation_time
				if efa.Forecast_generation_time.Status == pgtype.Undefined {
					fgt = pgtype.Timestamp{
						Status: pgtype.Null,
					}
				}
				res, err := insertEFAStmt.ExecContext(
					ctx,
					faID,
					efa.Measure_date,
					string(defaultLOS(efa.Level_of_Service)),
					efa.Available_depth_value,
					efa.Available_width_value,
					efa.Water_level_value,
					efa.Measure_type,
					efa.Source,
					fgt.Get(),
					efa.Value_lifetime,
				)
				if err != nil {
					return nil, err
				}
				affected, err := res.RowsAffected()
				if err == nil {
					efaCount += int(affected)
				} else {
					efaCount++
				}
			}
			feedback.Info("Add %d Effective Fairway Availability", efaCount)
		}

		if faRes.Reference_values != nil {
			rvCount := 0
			for _, fav := range faRes.Reference_values.ReferenceValue {
				res, err := insertFAVStmt.ExecContext(
					ctx,
					faID,
					string(defaultLOS(fav.Level_of_Service)),
					fav.Fairway_depth,
					fav.Fairway_width,
					fav.Fairway_radius,
					fav.Shallowest_spot_Lat,
					fav.Shallowest_spot_Lon,
				)
				if err != nil {
					return nil, err
				}
				affected, err := res.RowsAffected()
				if err == nil {
					rvCount += int(affected)
				} else {
					rvCount++
				}
			}
			feedback.Info("Add %d Reference Values", rvCount)
		}
	}
	return faIDs, nil
}

// Do executes the actual fairway availability import.
func (fa *FairwayAvailability) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	fetch := func(
		ctx context.Context,
		tx *sql.Tx, bns bottlenecks,
	) ([]*ifaf.FairwayAvailability, error) {
		feedback.Info("Requesting data for: %v", bns)

		latest, err := latestDate(ctx, tx, bns)
		if err != nil {
			return nil, err
		}
		feedback.Info("Requesting data starting from %s", latest.Time)

		client := ifaf.NewFairwayAvailabilityService(fa.URL, fa.Insecure, nil)

		period := ifaf.RequestedPeriod{
			Date_start: latest.Time,
			Date_end:   time.Now(),
		}

		ids := ifaf.ArrayOfString{String: bns}

		req := &ifaf.Get_bottleneck_fa{
			Bottleneck_id: &ids,
			Period:        &period,
		}
		resp, err := client.Get_bottleneck_fa(req)
		if err != nil {
			return nil, err
		}

		if resp.Get_bottleneck_faResult == nil {
			return nil, errors.New("no fairway availabilities found")
		}

		result := resp.Get_bottleneck_faResult
		return result.FairwayAvailability, nil
	}

	return storeFairwayAvailability(ctx, conn, feedback, fetch)
}