view pkg/imports/fa.go @ 2130:f3aabc05f9b2

Fix constraints on waterway profiles staging_done in the UNIQUE constraint had no effect, because the exclusion constraint prevented two rows with equal location and validity anyhow. Adding staging_done to the exclusion constraint makes the UNIQUE constraint checking only a corner case of what the exclusion constraint checks. Thus, remove the UNIQUE constraint. Casting staging_done to int is needed because there is no appropriate operator class for booleans. Casting to smallint or even bit would have been better (i.e. should result in smaller index size), but that would have required creating such a CAST, in addition.
author Tom Gottfried <tom@intevation.de>
date Wed, 06 Feb 2019 15:42:32 +0100
parents aeff01c6edec
children b868cb653c4d
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Raimund Renkert <raimund.renkert@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"errors"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/soap/ifaf"
)

// FairwayAvailability imports fairway availability data
// from an IFAF SOAP service.
type FairwayAvailability struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

type uniqueFairwayAvailability struct {
	BottleneckId string
	Surdat       time.Time
}

// FAJobKind is import queue type identifier.
const FAJobKind JobKind = "fa"

const (
	listBottlenecksSQL = `
SELECT
  bottleneck_id,
  responsible_country
FROM waterway.bottlenecks
WHERE responsible_country = users.current_user_country()
  AND staging_done = true
`
	latestMeasureDateSQL = `
SELECT
	measure_date
FROM waterway.effective_fairway_availability
ORDER BY measure_date DESC LIMIT 1
`
	listFairwayAvailabilitySQL = `
SELECT
  fa.id,
  bn.bottleneck_id,
  fa.surdat
FROM waterway.fairway_availability fa
JOIN waterway.bottlenecks bn ON bn.id = fa.bottleneck_id
`
	insertFASQL = `
INSERT INTO waterway.fairway_availability (
  position_code,
  bottleneck_id,
  surdat,
  critical,
  date_info,
  source_organization
) VALUES (
  $1,
  (SELECT id FROM waterway.bottlenecks WHERE bottleneck_id = $2),
  $3,
  $4,
  $5,
  $6
)
RETURNING id`

	insertBnPdfsSQL = `
INSERT INTO waterway.bottleneck_pdfs (
  fairway_availability_id,
  profile_pdf_filename,
  profile_pdf_url,
  pdf_generation_date,
  source_organization
) VALUES (
  $1,
  $2,
  $3,
  $4,
  $5
) ON CONFLICT ON CONSTRAINT bottleneck_pdfs_pkey DO NOTHING`
	insertEFASQL = `
INSERT INTO waterway.effective_fairway_availability (
  fairway_availability_id,
  measure_date,
  level_of_service,
  available_depth_value,
  available_width_value,
  water_level_value,
  measure_type,
  source_organization,
  forecast_generation_time,
  value_lifetime
) VALUES (
  $1,
  $2,
  (SELECT
    level_of_service
  FROM levels_of_service
  WHERE name = $3),
  $4,
  $5,
  $6,
  $7,
  $8,
  $9,
  $10
) ON CONFLICT ON CONSTRAINT effective_fairway_availability_pkey DO NOTHING`
	insertFAVSQL = `
INSERT INTO waterway.fa_reference_values (
  fairway_availability_id,
  level_of_service,
  fairway_depth,
  fairway_width,
  fairway_radius,
  shallowest_spot
) VALUES (
  $1,
  (SELECT
    level_of_service
  FROM levels_of_service
  WHERE name = $2),
  $3,
  $4,
  $5,
  ST_MakePoint($6, $7)::geography
)ON CONFLICT ON CONSTRAINT fa_reference_values_pkey DO NOTHING`
)

type faJobCreator struct{}

func init() {
	RegisterJobCreator(FAJobKind, faJobCreator{})
}

func (faJobCreator) Description() string {
	return "fairway availability"
}

func (faJobCreator) Create(_ JobKind, data string) (Job, error) {
	fa := new(FairwayAvailability)
	if err := common.FromJSONString(data, fa); err != nil {
		return nil, err
	}
	return fa, nil
}

func (faJobCreator) Depends() []string {
	return []string{
		"bottlenecks",
		"fairway_availability",
		"bottleneck_pdfs",
		"effective_fairway_availability",
		"fa_reference_values",
		"levels_of_service",
	}
}

func (faJobCreator) AutoAccept() bool { return true }

// StageDone moves the imported fairway availablities out of the staging area.
// Currently doing nothing.
func (faJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
	return nil
}

// CleanUp of a fairway availablities import is a NOP.
func (*FairwayAvailability) CleanUp() error { return nil }

type bottleneckCountry struct {
	ID                 string
	ResponsibleCountry string
}

// Do executes the actual fairway availability import.
func (fa *FairwayAvailability) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	// Get available bottlenecks from database for use as filter in SOAP request
	var rows *sql.Rows

	rows, err := conn.QueryContext(ctx, listBottlenecksSQL)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	bottlenecks := []bottleneckCountry{}

	for rows.Next() {
		var bn bottleneckCountry
		if err = rows.Scan(
			&bn.ID,
			&bn.ResponsibleCountry,
		); err != nil {
			return nil, err
		}
		bottlenecks = append(bottlenecks, bn)
	}
	if err = rows.Err(); err != nil {
		return nil, err
	}

	var faRows *sql.Rows
	faRows, err = conn.QueryContext(ctx, listFairwayAvailabilitySQL)
	if err != nil {
		return nil, err
	}
	fairwayAvailabilities := map[uniqueFairwayAvailability]int64{}
	for faRows.Next() {
		var id int64
		var bnId string
		var sd time.Time
		if err = faRows.Scan(
			&id,
			&bnId,
			&sd,
		); err != nil {
			return nil, err
		}
		key := uniqueFairwayAvailability{
			BottleneckId: bnId,
			Surdat:       sd,
		}
		fairwayAvailabilities[key] = id
	}
	if err = faRows.Err(); err != nil {
		return nil, err
	}

	var latestDate pgtype.Timestamp
	err = conn.QueryRowContext(ctx, latestMeasureDateSQL).Scan(&latestDate)
	switch {
	case err == sql.ErrNoRows:
		latestDate = pgtype.Timestamp{
			// Fill Database with data of the last 5 days. Change this to a more useful value.
			Time: time.Now().AddDate(0, 0, -5),
		}
	case err != nil:
		return nil, err
	}

	faids, err := fa.doForFAs(ctx, bottlenecks, fairwayAvailabilities, latestDate, conn, feedback)
	if err != nil {
		feedback.Error("Error processing data: %s", err)
	}
	if len(faids) == 0 {
		feedback.Info("No new fairway availablity data found")
		return nil, nil
	}
	feedback.Info("Processed %d fairway availabilities", len(faids))
	// TODO: needs to be filled more useful.
	summary := struct {
		FairwayAvailabilities []string `json:"fairwayAvailabilities"`
	}{
		FairwayAvailabilities: faids,
	}
	return &summary, err
}

func (fa *FairwayAvailability) doForFAs(
	ctx context.Context,
	bottlenecks []bottleneckCountry,
	fairwayAvailabilities map[uniqueFairwayAvailability]int64,
	latestDate pgtype.Timestamp,
	conn *sql.Conn,
	feedback Feedback,
) ([]string, error) {
	start := time.Now()

	client := ifaf.NewFairwayAvailabilityService(fa.URL, fa.Insecure, nil)

	var bnIds []string
	for _, bn := range bottlenecks {
		bnIds = append(bnIds, bn.ID)
	}
	var period ifaf.RequestedPeriod
	period.Date_start = latestDate.Time
	period.Date_end = time.Now()

	ids := ifaf.ArrayOfString{
		String: bnIds,
	}

	req := &ifaf.Get_bottleneck_fa{
		Bottleneck_id: &ids,
		Period:        &period,
	}
	resp, err := client.Get_bottleneck_fa(req)
	if err != nil {
		feedback.Error("%v", err)
		return nil, err
	}

	if resp.Get_bottleneck_faResult == nil {
		err := errors.New("no fairway availabilities found")
		return nil, err
	}

	result := resp.Get_bottleneck_faResult

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	insertFAStmt, err := tx.PrepareContext(ctx, insertFASQL)
	if err != nil {
		return nil, err
	}
	defer insertFAStmt.Close()
	insertBnPdfsStmt, err := tx.PrepareContext(ctx, insertBnPdfsSQL)
	if err != nil {
		return nil, err
	}
	defer insertBnPdfsStmt.Close()
	insertEFAStmt, err := tx.PrepareContext(ctx, insertEFASQL)
	if err != nil {
		return nil, err
	}
	defer insertEFAStmt.Close()
	insertFAVStmt, err := tx.PrepareContext(ctx, insertFAVSQL)
	if err != nil {
		return nil, err
	}
	defer insertFAVStmt.Close()

	var faIDs []string
	var faID int64
	feedback.Info("Found %d fairway availabilities", len(result.FairwayAvailability))
	for _, faRes := range result.FairwayAvailability {
		uniqueFa := uniqueFairwayAvailability{
			BottleneckId: faRes.Bottleneck_id,
			Surdat:       faRes.SURDAT,
		}
		var found bool
		if faID, found = fairwayAvailabilities[uniqueFa]; !found {
			err = insertFAStmt.QueryRowContext(
				ctx,
				faRes.POSITION,
				faRes.Bottleneck_id,
				faRes.SURDAT,
				faRes.Critical,
				faRes.Date_Info,
				faRes.Source,
			).Scan(&faID)
			if err != nil {
				return nil, err
			}
			fairwayAvailabilities[uniqueFa] = faID
		}
		feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id)
		faIDs = append(faIDs, faRes.Bottleneck_id)
		if faRes.Bottleneck_PDFs != nil {
			bnPdfCount := 0
			for _, bnPdfs := range faRes.Bottleneck_PDFs.PdfInfo {
				res, err := insertBnPdfsStmt.ExecContext(
					ctx,
					faID,
					bnPdfs.ProfilePdfFilename,
					bnPdfs.ProfilePdfURL,
					bnPdfs.PDF_Generation_Date,
					bnPdfs.Source,
				)
				if err != nil {
					return nil, err
				}
				affected, err := res.RowsAffected()
				if err == nil {
					bnPdfCount += int(affected)
				} else {
					bnPdfCount++
				}
			}
			feedback.Info("Add %d Pdfs", bnPdfCount)
		}
		if faRes.Effective_fairway_availability != nil {
			efaCount := 0
			for _, efa := range faRes.Effective_fairway_availability.EffectiveFairwayAvailability {
				los := efa.Level_of_Service
				fgt := efa.Forecast_generation_time
				if efa.Forecast_generation_time.Status == pgtype.Undefined {
					fgt = pgtype.Timestamp{
						Status: pgtype.Null,
					}
				}
				res, err := insertEFAStmt.ExecContext(
					ctx,
					faID,
					efa.Measure_date,
					string(*los),
					efa.Available_depth_value,
					efa.Available_width_value,
					efa.Water_level_value,
					efa.Measure_type,
					efa.Source,
					fgt.Get(),
					efa.Value_lifetime,
				)
				if err != nil {
					return nil, err
				}
				affected, err := res.RowsAffected()
				if err == nil {
					efaCount += int(affected)
				} else {
					efaCount++
				}
			}
			feedback.Info("Add %d Effective Fairway Availability", efaCount)
		}

		if faRes.Reference_values != nil {
			rvCount := 0
			for _, fav := range faRes.Reference_values.ReferenceValue {
				res, err := insertFAVStmt.ExecContext(
					ctx,
					faID,
					fav.Level_of_Service,
					fav.Fairway_depth,
					fav.Fairway_width,
					fav.Fairway_radius,
					fav.Shallowest_spot_Lat,
					fav.Shallowest_spot_Lon,
				)
				if err != nil {
					return nil, err
				}
				affected, err := res.RowsAffected()
				if err == nil {
					rvCount += int(affected)
				} else {
					rvCount++
				}
			}
			feedback.Info("Add %d Reference Values", rvCount)
		}
	}
	feedback.Info("Storing fairway availabilities took %s", time.Since(start))
	if err = tx.Commit(); err == nil {
		feedback.Info("Import of fairway availabilities was successful")
	}

	return faIDs, nil
}