view pkg/imports/fa.go @ 4611:b5aa1eb83bb0 geoserver_sql_views

Add possibility to configure SRS for GeoServer SQL view Automatic detection of spatial reference system for SQL views in GeoServer does not always find the correct SRS.
author Tom Gottfried <tom@intevation.de>
date Fri, 06 Sep 2019 11:58:03 +0200
parents c811d5da69bd
children 7128741faeb9
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018,2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Raimund Renkert <raimund.renkert@intevation.de>
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Sascha Wilde <wilde@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"errors"
	"fmt"
	"sort"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/soap/ifaf"
)

// FairwayAvailability imports fairway availability data
// from an IFAF SOAP service.
type FairwayAvailability struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

const (
	// FAJobKind is import queue type identifier.
	FAJobKind JobKind = "fa"
)

const (
	// maxHistoryDays is the numbers of days to look back.
	maxHistoryDays = 7
)

const (
	listBottlenecksSQL = `
SELECT DISTINCT
  bottleneck_id
FROM waterway.bottlenecks
WHERE responsible_country = (
    SELECT country FROM users.list_users WHERE username = current_user)
  AND staging_done = true
ORDER BY bottleneck_id
`

	latestMeasureDateSQL = `
SELECT
	measure_date
FROM waterway.effective_fairway_availability
ORDER BY measure_date DESC LIMIT 1
`

	insertFASQL = `
INSERT INTO waterway.fairway_availability (
  position_code,
  bottleneck_id,
  surdat,
  critical,
  date_info,
  source_organization
) VALUES (
  $1,
  $2,
  $3,
  $4,
  $5,
  $6
) ON CONFLICT (bottleneck_id, surdat) DO UPDATE SET
  position_code = EXCLUDED.position_code,
  critical = EXCLUDED.critical,
  date_info = EXCLUDED.date_info,
  source_organization = EXCLUDED.source_organization
RETURNING id`

	insertBnPdfsSQL = `
INSERT INTO waterway.bottleneck_pdfs (
  fairway_availability_id,
  profile_pdf_filename,
  profile_pdf_url,
  pdf_generation_date,
  source_organization
) VALUES (
  $1,
  $2,
  $3,
  $4,
  $5
) ON CONFLICT ON CONSTRAINT bottleneck_pdfs_pkey DO NOTHING`

	insertEFASQL = `
INSERT INTO waterway.effective_fairway_availability (
  fairway_availability_id,
  measure_date,
  level_of_service,
  available_depth_value,
  available_width_value,
  water_level_value,
  measure_type,
  source_organization,
  forecast_generation_time,
  value_lifetime
) VALUES (
  $1,
  $2,
  (SELECT
    level_of_service
  FROM levels_of_service
  WHERE name = $3),
  $4,
  $5,
  $6,
  $7,
  $8,
  $9,
  $10
) ON CONFLICT ON CONSTRAINT effective_fairway_availability_pkey DO NOTHING`

	insertFAVSQL = `
INSERT INTO waterway.fa_reference_values (
  fairway_availability_id,
  level_of_service,
  fairway_depth,
  fairway_width,
  fairway_radius,
  shallowest_spot
) VALUES (
  $1,
  (SELECT
    level_of_service
  FROM levels_of_service
  WHERE name = $2),
  $3,
  $4,
  $5,
  ST_MakePoint($6, $7)::geography
)ON CONFLICT ON CONSTRAINT fa_reference_values_pkey DO NOTHING`
)

type faJobCreator struct{}

func init() {
	RegisterJobCreator(FAJobKind, faJobCreator{})
}

func (faJobCreator) Description() string {
	return "fairway availability"
}

func (faJobCreator) Create() Job { return new(FairwayAvailability) }

func (faJobCreator) Depends() [2][]string {
	return [2][]string{
		{"effective_fairway_availability", "fa_reference_values",
			"bottleneck_pdfs", "fairway_availability"},
		{"bottlenecks", "levels_of_service"},
	}
}

func (faJobCreator) AutoAccept() bool { return true }

// StageDone moves the imported fairway availablities out of the staging area.
// Currently doing nothing.
func (faJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
	return nil
}

// CleanUp of a fairway availablities import is a NOP.
func (*FairwayAvailability) CleanUp() error { return nil }

type bottlenecks []string

func (bns bottlenecks) contains(bn string) bool {
	idx := sort.SearchStrings(bns, bn)
	return idx < len(bns) && bns[idx] == bn
}

func loadBottleneckCountries(ctx context.Context, tx *sql.Tx) (bottlenecks, error) {

	// Get available bottlenecks from database for use as filter in SOAP request
	rows, err := tx.QueryContext(ctx, listBottlenecksSQL)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var bns bottlenecks

	for rows.Next() {
		var bn string
		if err = rows.Scan(&bn); err != nil {
			return nil, err
		}
		bns = append(bns, bn)
	}
	if err = rows.Err(); err != nil {
		return nil, err
	}

	return bns, nil
}

func latestDate(ctx context.Context, tx *sql.Tx) (pgtype.Timestamp, error) {
	var date pgtype.Timestamp
	err := tx.QueryRowContext(ctx, latestMeasureDateSQL).Scan(&date)
	switch {
	case err == sql.ErrNoRows:
		date = pgtype.Timestamp{
			Time: time.Now().AddDate(0, 0, -maxHistoryDays),
		}
	case err != nil:
		return pgtype.Timestamp{}, err
	}
	// Limit request range to MaxHistoryDays.  Otherwise the Request might
	// fail, e.g. the AT service has an upper liimit of 10000 results.
	//
	//  FIXME: the better solution would be to detect such errors and
	//    dynamically and implement some kind of chunking...
	if time.Since(date.Time).Hours() > maxHistoryDays*24 {
		date = pgtype.Timestamp{
			Time: time.Now().AddDate(0, 0, -maxHistoryDays),
		}
	}

	return date, nil
}

func storeFairwayAvailability(
	ctx context.Context,
	conn *sql.Conn,
	feedback Feedback,
	fetch func(context.Context, *sql.Tx, bottlenecks) ([]*ifaf.FairwayAvailability, error),
) (interface{}, error) {

	start := time.Now()

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	bns, err := loadBottleneckCountries(ctx, tx)
	if err != nil {
		return nil, err
	}

	fas, err := fetch(ctx, tx, bns)
	if err != nil {
		return nil, err
	}

	faids, err := doForFAs(ctx, bns, fas, tx, feedback)
	if err != nil {
		return nil, fmt.Errorf("Error processing data: %v", err)
	}
	if len(faids) == 0 {
		feedback.Info("No new fairway availablity data found")
		return nil, UnchangedError("No new fairway availablity data found")
	}
	feedback.Info("Processed %d fairway availabilities", len(faids))

	if err = tx.Commit(); err != nil {
		feedback.Info(
			"Importing fairway availabilities failed after %s", time.Since(start))
		return nil, err
	}
	feedback.Info(
		"Importing fairway availabilities successfully took %s", time.Since(start))

	// TODO: needs to be filled more useful.
	summary := struct {
		FairwayAvailabilities []string `json:"fairwayAvailabilities"`
	}{
		FairwayAvailabilities: faids,
	}
	return &summary, nil
}

func doForFAs(
	ctx context.Context,
	bnIds bottlenecks,
	fas []*ifaf.FairwayAvailability,
	tx *sql.Tx,
	feedback Feedback,
) ([]string, error) {

	insertFAStmt, err := tx.PrepareContext(ctx, insertFASQL)
	if err != nil {
		return nil, err
	}
	defer insertFAStmt.Close()
	insertBnPdfsStmt, err := tx.PrepareContext(ctx, insertBnPdfsSQL)
	if err != nil {
		return nil, err
	}
	defer insertBnPdfsStmt.Close()
	insertEFAStmt, err := tx.PrepareContext(ctx, insertEFASQL)
	if err != nil {
		return nil, err
	}
	defer insertEFAStmt.Close()
	insertFAVStmt, err := tx.PrepareContext(ctx, insertFAVSQL)
	if err != nil {
		return nil, err
	}
	defer insertFAVStmt.Close()

	var faIDs []string
	var faID int64
	feedback.Info("Found %d fairway availabilities", len(fas))
	for _, faRes := range fas {
		// FIXME: The following test is propably unneccessary as already
		//   done by DB constraints...  [sw]
		if !bnIds.contains(faRes.Bottleneck_id) {
			feedback.Warn("Bottleneck %s not found in database.", faRes.Bottleneck_id)
			continue
		}
		err = insertFAStmt.QueryRowContext(
			ctx,
			faRes.POSITION,
			faRes.Bottleneck_id,
			faRes.SURDAT,
			faRes.Critical,
			faRes.Date_Info,
			faRes.Source,
		).Scan(&faID)
		if err != nil {
			return nil, err
		}
		feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id)
		faIDs = append(faIDs, faRes.Bottleneck_id)
		if faRes.Bottleneck_PDFs != nil {
			bnPdfCount := 0
			for _, bnPdfs := range faRes.Bottleneck_PDFs.PdfInfo {
				res, err := insertBnPdfsStmt.ExecContext(
					ctx,
					faID,
					bnPdfs.ProfilePdfFilename,
					bnPdfs.ProfilePdfURL,
					bnPdfs.PDF_Generation_Date,
					bnPdfs.Source,
				)
				if err != nil {
					return nil, err
				}
				affected, err := res.RowsAffected()
				if err == nil {
					bnPdfCount += int(affected)
				} else {
					bnPdfCount++
				}
			}
			feedback.Info("Add %d Pdfs", bnPdfCount)
		}
		if faRes.Effective_fairway_availability != nil {
			efaCount := 0
			for _, efa := range faRes.Effective_fairway_availability.EffectiveFairwayAvailability {
				los := efa.Level_of_Service
				fgt := efa.Forecast_generation_time
				if efa.Forecast_generation_time.Status == pgtype.Undefined {
					fgt = pgtype.Timestamp{
						Status: pgtype.Null,
					}
				}
				res, err := insertEFAStmt.ExecContext(
					ctx,
					faID,
					efa.Measure_date,
					string(*los),
					efa.Available_depth_value,
					efa.Available_width_value,
					efa.Water_level_value,
					efa.Measure_type,
					efa.Source,
					fgt.Get(),
					efa.Value_lifetime,
				)
				if err != nil {
					return nil, err
				}
				affected, err := res.RowsAffected()
				if err == nil {
					efaCount += int(affected)
				} else {
					efaCount++
				}
			}
			feedback.Info("Add %d Effective Fairway Availability", efaCount)
		}

		if faRes.Reference_values != nil {
			rvCount := 0
			for _, fav := range faRes.Reference_values.ReferenceValue {
				res, err := insertFAVStmt.ExecContext(
					ctx,
					faID,
					fav.Level_of_Service,
					fav.Fairway_depth,
					fav.Fairway_width,
					fav.Fairway_radius,
					fav.Shallowest_spot_Lat,
					fav.Shallowest_spot_Lon,
				)
				if err != nil {
					return nil, err
				}
				affected, err := res.RowsAffected()
				if err == nil {
					rvCount += int(affected)
				} else {
					rvCount++
				}
			}
			feedback.Info("Add %d Reference Values", rvCount)
		}
	}
	return faIDs, nil
}

// Do executes the actual fairway availability import.
func (fa *FairwayAvailability) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	fetch := func(
		ctx context.Context,
		tx *sql.Tx, bns bottlenecks,
	) ([]*ifaf.FairwayAvailability, error) {

		latest, err := latestDate(ctx, tx)
		if err != nil {
			return nil, err
		}

		client := ifaf.NewFairwayAvailabilityService(fa.URL, fa.Insecure, nil)

		period := ifaf.RequestedPeriod{
			Date_start: latest.Time,
			Date_end:   time.Now(),
		}

		ids := ifaf.ArrayOfString{String: bns}

		req := &ifaf.Get_bottleneck_fa{
			Bottleneck_id: &ids,
			Period:        &period,
		}
		resp, err := client.Get_bottleneck_fa(req)
		if err != nil {
			return nil, err
		}

		if resp.Get_bottleneck_faResult == nil {
			return nil, errors.New("no fairway availabilities found")
		}

		result := resp.Get_bottleneck_faResult
		return result.FairwayAvailability, nil
	}

	return storeFairwayAvailability(ctx, conn, feedback, fetch)
}