view pkg/imports/fa.go @ 2006:35acb7f9ae0c

Do anything else before expectedly failing role creation Creating roles during database setup expectedly fails in case there already is another gemma database in the cluster. Doing it at the end of the transaction ensures it does not hide errors in other commands in the script. In passing, add the default admin via the designated view to ensure it will become a correctly set up application user.
author Tom Gottfried <tom@intevation.de>
date Thu, 24 Jan 2019 17:23:43 +0100
parents 295c82c5bc3e
children 25967829cf00
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Raimund Renkert <raimund.renkert@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"errors"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/models"
	"gemma.intevation.de/gemma/pkg/soap/ifaf"
)

// FairwayAvailability imports fairway availability data
// from an IFAF SOAP service.
type FairwayAvailability struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

// FAJobKind is import queue type identifier.
const FAJobKind JobKind = "fa"

const (
	listBottlenecksSQL = `
SELECT
  bottleneck_id,
  responsible_country
FROM waterway.bottlenecks
WHERE responsible_country = users.current_user_country()
  AND staging_done = true
`
	latestMeasureDateSQL = `
SELECT
	measure_date
FROM waterway.effective_fairway_availability
ORDER BY measure_date DESC LIMIT 1
`
	listFairwayAvailabilitySQL = `
SELECT
  fa.id,
  bn.bottleneck_id,
  fa.surdat
FROM waterway.fairway_availability fa
JOIN waterway.bottlenecks bn ON bn.id = fa.bottleneck_id
`
	insertFASQL = `
INSERT INTO waterway.fairway_availability (
  position_code,
  bottleneck_id,
  surdat,
  critical,
  date_info,
  source_organization
) VALUES (
  $1,
  (SELECT id FROM waterway.bottlenecks WHERE bottleneck_id = $2),
  $3,
  $4,
  $5,
  $6
)
RETURNING id`

	insertBnPdfsSQL = `
INSERT INTO waterway.bottleneck_pdfs (
  fairway_availability_id,
  profile_pdf_filename,
  profile_pdf_url,
  pdf_generation_date,
  source_organization
) VALUES (
  $1,
  $2,
  $3,
  $4,
  $5
) ON CONFLICT ON CONSTRAINT bottleneck_pdfs_pkey DO NOTHING`
	insertEFASQL = `
INSERT INTO waterway.effective_fairway_availability (
  fairway_availability_id,
  measure_date,
  level_of_service,
  available_depth_value,
  available_width_value,
  water_level_value,
  measure_type,
  source_organization,
  forecast_generation_time,
  value_lifetime
) VALUES (
  $1,
  $2,
  (SELECT
    level_of_service
  FROM levels_of_service
  WHERE name = $3),
  $4,
  $5,
  $6,
  $7,
  $8,
  $9,
  $10
) ON CONFLICT ON CONSTRAINT effective_fairway_availability_pkey DO NOTHING`
	insertFAVSQL = `
INSERT INTO waterway.fa_reference_values (
  fairway_availability_id,
  level_of_service,
  fairway_depth,
  fairway_width,
  fairway_radius,
  shallowest_spot
) VALUES (
  $1,
  (SELECT
    level_of_service
  FROM levels_of_service
  WHERE name = $2),
  $3,
  $4,
  $5,
  ST_MakePoint($6, $7)::geography
)ON CONFLICT ON CONSTRAINT fa_reference_values_pkey DO NOTHING`
)

type faJobCreator struct{}

func init() {
	RegisterJobCreator(FAJobKind, faJobCreator{})
}

func (faJobCreator) Description() string {
	return "fairway availability"
}

func (faJobCreator) Create(_ JobKind, data string) (Job, error) {
	fa := new(FairwayAvailability)
	if err := common.FromJSONString(data, fa); err != nil {
		return nil, err
	}
	return fa, nil
}

func (faJobCreator) Depends() []string {
	return []string{
		"bottlenecks",
		"fairway_availability",
		"bottleneck_pdfs",
		"effective_fairway_availability",
		"fa_reference_values",
		"levels_of_service",
	}
}

func (faJobCreator) AutoAccept() bool { return true }

// StageDone moves the imported fairway availablities out of the staging area.
// Currently doing nothing.
func (faJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
	return nil
}

// CleanUp of a fairway availablities import is a NOP.
func (*FairwayAvailability) CleanUp() error { return nil }

// Do executes the actual fairway availability import.
func (fa *FairwayAvailability) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	// Get available bottlenecks from database for use as filter in SOAP request
	var rows *sql.Rows

	rows, err := conn.QueryContext(ctx, listBottlenecksSQL)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	bottlenecks := []models.Bottleneck{}

	for rows.Next() {
		var bn models.Bottleneck
		if err = rows.Scan(
			&bn.ID,
			&bn.ResponsibleCountry,
		); err != nil {
			return nil, err
		}
		bottlenecks = append(bottlenecks, bn)
	}
	if err = rows.Err(); err != nil {
		return nil, err
	}

	var faRows *sql.Rows
	faRows, err = conn.QueryContext(ctx, listFairwayAvailabilitySQL)
	if err != nil {
		return nil, err
	}
	fairwayAvailabilities := map[models.UniqueFairwayAvailability]int64{}
	for faRows.Next() {
		var id int64
		var bnId string
		var sd time.Time
		if err = faRows.Scan(
			&id,
			&bnId,
			&sd,
		); err != nil {
			return nil, err
		}
		key := models.UniqueFairwayAvailability{
			BottleneckId: bnId,
			Surdat:       sd,
		}
		fairwayAvailabilities[key] = id
	}
	if err = faRows.Err(); err != nil {
		return nil, err
	}

	var latestDate pgtype.Timestamp
	err = conn.QueryRowContext(ctx, latestMeasureDateSQL).Scan(&latestDate)
	switch {
	case err == sql.ErrNoRows:
		latestDate = pgtype.Timestamp{
			// Fill Database with data of the last 5 days. Change this to a more useful value.
			Time: time.Now().AddDate(0, 0, -5),
		}
	case err != nil:
		return nil, err
	}

	faids, err := fa.doForFAs(ctx, bottlenecks, fairwayAvailabilities, latestDate, conn, feedback)
	if err != nil {
		feedback.Error("Error processing data: %s", err)
	}
	if len(faids) == 0 {
		feedback.Info("No new fairway availablity data found")
		return nil, nil
	}
	feedback.Info("Processed %d fairway availabilities", len(faids))
	// TODO: needs to be filled more useful.
	summary := struct {
		FairwayAvailabilities []string `json:"fairwayAvailabilities"`
	}{
		FairwayAvailabilities: faids,
	}
	return &summary, err
}

func (fa *FairwayAvailability) doForFAs(
	ctx context.Context,
	bottlenecks []models.Bottleneck,
	fairwayAvailabilities map[models.UniqueFairwayAvailability]int64,
	latestDate pgtype.Timestamp,
	conn *sql.Conn,
	feedback Feedback,
) ([]string, error) {
	start := time.Now()

	client := ifaf.NewFairwayAvailabilityService(fa.URL, fa.Insecure, nil)

	var bnIds []string
	for _, bn := range bottlenecks {
		bnIds = append(bnIds, bn.ID)
	}
	var period ifaf.RequestedPeriod
	period.Date_start = latestDate.Time
	period.Date_end = time.Now()

	ids := ifaf.ArrayOfString{
		String: bnIds,
	}

	req := &ifaf.Get_bottleneck_fa{
		Bottleneck_id: &ids,
		Period:        &period,
	}
	resp, err := client.Get_bottleneck_fa(req)
	if err != nil {
		feedback.Error("%v", err)
		return nil, err
	}

	if resp.Get_bottleneck_faResult == nil {
		err := errors.New("no fairway availabilities found")
		return nil, err
	}

	result := resp.Get_bottleneck_faResult

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	insertFAStmt, err := tx.PrepareContext(ctx, insertFASQL)
	if err != nil {
		return nil, err
	}
	defer insertFAStmt.Close()
	insertBnPdfsStmt, err := tx.PrepareContext(ctx, insertBnPdfsSQL)
	if err != nil {
		return nil, err
	}
	defer insertBnPdfsStmt.Close()
	insertEFAStmt, err := tx.PrepareContext(ctx, insertEFASQL)
	if err != nil {
		return nil, err
	}
	defer insertEFAStmt.Close()
	insertFAVStmt, err := tx.PrepareContext(ctx, insertFAVSQL)
	if err != nil {
		return nil, err
	}
	defer insertFAVStmt.Close()

	var faIDs []string
	var faID int64
	feedback.Info("Found %d fairway availabilities", len(result.FairwayAvailability))
	for _, faRes := range result.FairwayAvailability {
		uniqueFa := models.UniqueFairwayAvailability{
			BottleneckId: faRes.Bottleneck_id,
			Surdat:       faRes.SURDAT,
		}
		var found bool
		if faID, found = fairwayAvailabilities[uniqueFa]; !found {
			err = insertFAStmt.QueryRowContext(
				ctx,
				faRes.POSITION,
				faRes.Bottleneck_id,
				faRes.SURDAT,
				faRes.Critical,
				faRes.Date_Info,
				faRes.Source,
			).Scan(&faID)
			if err != nil {
				return nil, err
			}
			fairwayAvailabilities[uniqueFa] = faID
		}
		feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id)
		faIDs = append(faIDs, faRes.Bottleneck_id)
		if faRes.Bottleneck_PDFs != nil {
			bnPdfCount := 0
			for _, bnPdfs := range faRes.Bottleneck_PDFs.PdfInfo {
				res, err := insertBnPdfsStmt.ExecContext(
					ctx,
					faID,
					bnPdfs.ProfilePdfFilename,
					bnPdfs.ProfilePdfURL,
					bnPdfs.PDF_Generation_Date,
					bnPdfs.Source,
				)
				if err != nil {
					return nil, err
				}
				affected, err := res.RowsAffected()
				if err == nil {
					bnPdfCount += int(affected)
				} else {
					bnPdfCount++
				}
			}
			feedback.Info("Add %d Pdfs", bnPdfCount)
		}
		if faRes.Effective_fairway_availability != nil {
			efaCount := 0
			for _, efa := range faRes.Effective_fairway_availability.EffectiveFairwayAvailability {
				los := efa.Level_of_Service
				fgt := efa.Forecast_generation_time
				if efa.Forecast_generation_time.Status == pgtype.Undefined {
					fgt = pgtype.Timestamp{
						Status: pgtype.Null,
					}
				}
				res, err := insertEFAStmt.ExecContext(
					ctx,
					faID,
					efa.Measure_date,
					string(*los),
					efa.Available_depth_value,
					efa.Available_width_value,
					efa.Water_level_value,
					efa.Measure_type,
					efa.Source,
					fgt.Get(),
					efa.Value_lifetime,
				)
				if err != nil {
					return nil, err
				}
				affected, err := res.RowsAffected()
				if err == nil {
					efaCount += int(affected)
				} else {
					efaCount++
				}
			}
			feedback.Info("Add %d Effective Fairway Availability", efaCount)
		}

		if faRes.Reference_values != nil {
			rvCount := 0
			for _, fav := range faRes.Reference_values.ReferenceValue {
				res, err := insertFAVStmt.ExecContext(
					ctx,
					faID,
					fav.Level_of_Service,
					fav.Fairway_depth,
					fav.Fairway_width,
					fav.Fairway_radius,
					fav.Shallowest_spot_Lat,
					fav.Shallowest_spot_Lon,
				)
				if err != nil {
					return nil, err
				}
				affected, err := res.RowsAffected()
				if err == nil {
					rvCount += int(affected)
				} else {
					rvCount++
				}
			}
			feedback.Info("Add %d Reference Values", rvCount)
		}
	}
	feedback.Info("Storing fairway availabilities took %s", time.Since(start))
	if err = tx.Commit(); err == nil {
		feedback.Info("Import of fairway availabilities was successful")
	}

	return faIDs, nil
}