view pkg/imports/fa.go @ 5711:2dd155cc95ec revive-cleanup

Fix all revive issue (w/o machine generated stuff).
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 20 Feb 2024 22:22:57 +0100
parents f2204f91d286
children 6270951dda28
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018,2019,2020 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Raimund Renkert <raimund.renkert@intevation.de>
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Sascha Wilde <wilde@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"errors"
	"fmt"
	"sort"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/soap/ifaf"
)

// FairwayAvailability imports fairway availability data
// from an IFAF SOAP service.
type FairwayAvailability struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

const (
	// FAJobKind is import queue type identifier.
	FAJobKind JobKind = "fa"
)

const (
	// maxHistoryDays is the numbers of days to look back.
	maxHistoryDays = 7
)

// Find bottlenecks of the current users country for a given time range
const (
	listBottlenecksSQL = `
SELECT DISTINCT
  bottleneck_id
FROM waterway.bottlenecks
WHERE responsible_country = (
    SELECT country FROM users.list_users WHERE username = current_user)
  AND staging_done = true
  AND validity && $1
`

	latestMeasureDateSQL = `
SELECT CASE WHEN (SELECT array_agg(DISTINCT bottleneck_id) @> $1::varchar[]
                    FROM waterway.fairway_availability)
       THEN (SELECT min(x.m) FROM
              (SELECT max(efa.measure_date) AS m
                FROM waterway.fairway_availability fa,
                     waterway.effective_fairway_availability efa
                WHERE fa.bottleneck_id = ANY($1)
                  AND efa.fairway_availability_id = fa.id
                GROUP BY fa.bottleneck_id) AS x)
       END
`

	insertFASQL = `
INSERT INTO waterway.fairway_availability (
  position,
  bottleneck_id,
  surdat,
  critical,
  date_info,
  source_organization
) VALUES (
  $1,
  $2,
  $3,
  $4,
  $5,
  $6
) ON CONFLICT (bottleneck_id, surdat) DO UPDATE SET
  position = EXCLUDED.position,
  critical = EXCLUDED.critical,
  date_info = EXCLUDED.date_info,
  source_organization = EXCLUDED.source_organization
RETURNING id`

	insertBnPdfsSQL = `
INSERT INTO waterway.bottleneck_pdfs (
  fairway_availability_id,
  profile_pdf_filename,
  profile_pdf_url,
  pdf_generation_date,
  source_organization
) VALUES (
  $1,
  $2,
  $3,
  $4,
  $5
) ON CONFLICT ON CONSTRAINT bottleneck_pdfs_pkey DO NOTHING`

	insertEFASQL = `
INSERT INTO waterway.effective_fairway_availability (
  fairway_availability_id,
  measure_date,
  level_of_service,
  available_depth_value,
  available_width_value,
  water_level_value,
  measure_type,
  source_organization,
  forecast_generation_time,
  value_lifetime
) VALUES (
  $1,
  $2,
  (SELECT
    level_of_service
  FROM levels_of_service
  WHERE name = $3),
  $4,
  $5,
  $6,
  $7,
  $8,
  $9,
  $10
) ON CONFLICT ON CONSTRAINT effective_fairway_availability_pkey DO NOTHING`

	insertFAVSQL = `
INSERT INTO waterway.fa_reference_values (
  fairway_availability_id,
  level_of_service,
  fairway_depth,
  fairway_width,
  fairway_radius,
  shallowest_spot
) VALUES (
  $1,
  (SELECT
    level_of_service
  FROM levels_of_service
  WHERE name = $2),
  $3,
  $4,
  $5,
  ST_MakePoint($6, $7)::geography
)ON CONFLICT ON CONSTRAINT fa_reference_values_pkey DO NOTHING`
)

// Description gives a short info about relevant facts of this import.
func (fa *FairwayAvailability) Description([]string) (string, error) {
	return fa.URL, nil
}

type faJobCreator struct{}

func init() {
	RegisterJobCreator(FAJobKind, faJobCreator{})
}

func (faJobCreator) Description() string {
	return "fairway availability"
}

func (faJobCreator) Create() Job { return new(FairwayAvailability) }

func (faJobCreator) Depends() [2][]string {
	return [2][]string{
		{"effective_fairway_availability", "fa_reference_values",
			"bottleneck_pdfs", "fairway_availability"},
		{"bottlenecks", "levels_of_service"},
	}
}

func (faJobCreator) AutoAccept() bool { return true }

// StageDone is a NOP for fairway availability imports.
func (faJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error {
	return nil
}

// CleanUp of a fairway availablities import is a NOP.
func (*FairwayAvailability) CleanUp() error { return nil }

type bottlenecks []string

func (bns bottlenecks) contains(bn string) bool {
	idx := sort.SearchStrings(bns, bn)
	return idx < len(bns) && bns[idx] == bn
}

func loadBottleneckCountries(ctx context.Context, tx *sql.Tx) (bottlenecks, error) {

	// Get available bottlenecks from database for use as filter in SOAP request
	// We are only interested in bottlenecks which were valid in the last
	// maxHistoryDays
	var tfrom pgtype.Timestamptz
	tfrom.Set(time.Now().AddDate(0, 0, -maxHistoryDays))
	trange := pgtype.Tstzrange{
		Lower:     tfrom,
		LowerType: pgtype.Inclusive,
		UpperType: pgtype.Unbounded,
		Status:    pgtype.Present,
	}

	rows, err := tx.QueryContext(ctx, listBottlenecksSQL, trange)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var bns bottlenecks

	for rows.Next() {
		var bn string
		if err = rows.Scan(&bn); err != nil {
			return nil, err
		}
		bns = append(bns, bn)
	}
	if err = rows.Err(); err != nil {
		return nil, err
	}

	// bottlenecks MUST be sorted according to the sort packages
	// expectations for contains() to work
	sort.Strings(bns)
	return bns, nil
}

// Get the earliest of all the latest measure_dates for a list of bottlenecks.
// We do not pick the latest of all dates, so that we don't miss data if one of
// the bottlenecks we are taking into account was not active for some time...
func latestDate(
	ctx context.Context,
	tx *sql.Tx,
	bns bottlenecks,
) (pgtype.Timestamp, error) {
	var (
		date  pgtype.Timestamp
		pgbns pgtype.TextArray
	)
	pgbns.Set(bns)
	err := tx.QueryRowContext(ctx, latestMeasureDateSQL, &pgbns).Scan(&date)
	switch {
	case err == sql.ErrNoRows:
		date = pgtype.Timestamp{
			Time: time.Now().AddDate(0, 0, -maxHistoryDays),
		}
	case err != nil:
		return pgtype.Timestamp{}, err
	}
	// Limit request range to MaxHistoryDays.  Otherwise the Request might
	// fail, e.g. the AT service has an upper liimit of 10000 results.
	//
	//  FIXME: the better solution would be to detect such errors and
	//    dynamically and implement some kind of chunking...
	if time.Since(date.Time).Hours() > maxHistoryDays*24 {
		date = pgtype.Timestamp{
			Time: time.Now().AddDate(0, 0, -maxHistoryDays),
		}
	}

	return date, nil
}

func storeFairwayAvailability(
	ctx context.Context,
	conn *sql.Conn,
	feedback Feedback,
	fetch func(context.Context, *sql.Tx, bottlenecks) ([]*ifaf.FairwayAvailability, error),
) (interface{}, error) {

	start := time.Now()

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	bns, err := loadBottleneckCountries(ctx, tx)
	if err != nil {
		return nil, err
	}

	fas, err := fetch(ctx, tx, bns)
	if err != nil {
		return nil, err
	}

	faids, err := doForFAs(ctx, bns, fas, tx, feedback)
	if err != nil {
		return nil, fmt.Errorf("Error processing data: %v", err)
	}
	if len(faids) == 0 {
		feedback.Info("No new fairway availablity data found")
		return nil, UnchangedError("No new fairway availablity data found")
	}
	feedback.Info("Processed %d fairway availabilities", len(faids))

	if err = tx.Commit(); err != nil {
		feedback.Info(
			"Importing fairway availabilities failed after %s", time.Since(start))
		return nil, err
	}
	feedback.Info(
		"Importing fairway availabilities successfully took %s", time.Since(start))

	// TODO: needs to be filled more useful.
	summary := struct {
		FairwayAvailabilities []string `json:"fairwayAvailabilities"`
	}{
		FairwayAvailabilities: faids,
	}
	return &summary, nil
}

// defaultLOS defaults to LOS3 when no expicit LOS is given.
func defaultLOS(los *ifaf.LosEnum) ifaf.LosEnum {
	if los == nil {
		return ifaf.LosEnumLOS3
	}
	return *los
}

func doForFAs(
	ctx context.Context,
	bnIDs bottlenecks,
	fas []*ifaf.FairwayAvailability,
	tx *sql.Tx,
	feedback Feedback,
) ([]string, error) {

	insertFAStmt, err := tx.PrepareContext(ctx, insertFASQL)
	if err != nil {
		return nil, err
	}
	defer insertFAStmt.Close()
	insertBnPdfsStmt, err := tx.PrepareContext(ctx, insertBnPdfsSQL)
	if err != nil {
		return nil, err
	}
	defer insertBnPdfsStmt.Close()
	insertEFAStmt, err := tx.PrepareContext(ctx, insertEFASQL)
	if err != nil {
		return nil, err
	}
	defer insertEFAStmt.Close()
	insertFAVStmt, err := tx.PrepareContext(ctx, insertFAVSQL)
	if err != nil {
		return nil, err
	}
	defer insertFAVStmt.Close()

	var faIDs []string
	var faID int64
	feedback.Info("Found %d fairway availabilities", len(fas))
	for _, faRes := range fas {
		// FIXME: The following test is propably unneccessary as already
		//   done by DB constraints...  [sw]
		if !bnIDs.contains(faRes.Bottleneck_id) {
			feedback.Warn("Bottleneck %s not found in database.", faRes.Bottleneck_id)
			continue
		}
		err = insertFAStmt.QueryRowContext(
			ctx,
			faRes.POSITION,
			faRes.Bottleneck_id,
			faRes.SURDAT,
			faRes.Critical,
			faRes.Date_Info,
			faRes.Source,
		).Scan(&faID)
		if err != nil {
			return nil, err
		}
		feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id)
		faIDs = append(faIDs, faRes.Bottleneck_id)
		if faRes.Bottleneck_PDFs != nil {
			bnPdfCount := 0
			for _, bnPdfs := range faRes.Bottleneck_PDFs.PdfInfo {
				res, err := insertBnPdfsStmt.ExecContext(
					ctx,
					faID,
					bnPdfs.ProfilePdfFilename,
					bnPdfs.ProfilePdfURL,
					bnPdfs.PDF_Generation_Date,
					bnPdfs.Source,
				)
				if err != nil {
					return nil, err
				}
				affected, err := res.RowsAffected()
				if err == nil {
					bnPdfCount += int(affected)
				} else {
					bnPdfCount++
				}
			}
			feedback.Info("Add %d Pdfs", bnPdfCount)
		}
		if faRes.Effective_fairway_availability != nil {
			efaCount := 0
			for _, efa := range faRes.Effective_fairway_availability.EffectiveFairwayAvailability {
				fgt := efa.Forecast_generation_time
				if efa.Forecast_generation_time.Status == pgtype.Undefined {
					fgt = pgtype.Timestamp{
						Status: pgtype.Null,
					}
				}
				res, err := insertEFAStmt.ExecContext(
					ctx,
					faID,
					efa.Measure_date,
					string(defaultLOS(efa.Level_of_Service)),
					efa.Available_depth_value,
					efa.Available_width_value,
					efa.Water_level_value,
					efa.Measure_type,
					efa.Source,
					fgt.Get(),
					efa.Value_lifetime,
				)
				if err != nil {
					return nil, err
				}
				affected, err := res.RowsAffected()
				if err == nil {
					efaCount += int(affected)
				} else {
					efaCount++
				}
			}
			feedback.Info("Add %d Effective Fairway Availability", efaCount)
		}

		if faRes.Reference_values != nil {
			rvCount := 0
			for _, fav := range faRes.Reference_values.ReferenceValue {
				res, err := insertFAVStmt.ExecContext(
					ctx,
					faID,
					string(defaultLOS(fav.Level_of_Service)),
					fav.Fairway_depth,
					fav.Fairway_width,
					fav.Fairway_radius,
					fav.Shallowest_spot_Lat,
					fav.Shallowest_spot_Lon,
				)
				if err != nil {
					return nil, err
				}
				affected, err := res.RowsAffected()
				if err == nil {
					rvCount += int(affected)
				} else {
					rvCount++
				}
			}
			feedback.Info("Add %d Reference Values", rvCount)
		}
	}
	return faIDs, nil
}

// Do executes the actual fairway availability import.
func (fa *FairwayAvailability) Do(
	ctx context.Context,
	_ int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	fetch := func(
		ctx context.Context,
		tx *sql.Tx, bns bottlenecks,
	) ([]*ifaf.FairwayAvailability, error) {
		feedback.Info("Requesting data for: %v", bns)

		latest, err := latestDate(ctx, tx, bns)
		if err != nil {
			return nil, err
		}
		feedback.Info("Requesting data starting from %s", latest.Time)

		client := ifaf.NewFairwayAvailabilityService(fa.URL, fa.Insecure, nil)

		period := ifaf.RequestedPeriod{
			Date_start: latest.Time,
			Date_end:   time.Now(),
		}

		ids := ifaf.ArrayOfString{String: bns}

		req := &ifaf.Get_bottleneck_fa{
			Bottleneck_id: &ids,
			Period:        &period,
		}
		resp, err := client.Get_bottleneck_fa(req)
		if err != nil {
			return nil, err
		}

		if resp.Get_bottleneck_faResult == nil {
			return nil, errors.New("no fairway availabilities found")
		}

		result := resp.Get_bottleneck_faResult
		return result.FairwayAvailability, nil
	}

	return storeFairwayAvailability(ctx, conn, feedback, fetch)
}