view pkg/imports/fa.go @ 1662:d8ca44615bfc

Implemented first version of fairway availability import.
author Raimund Renkert <raimund.renkert@intevation.de>
date Fri, 21 Dec 2018 15:56:28 +0100
parents
children 10e3dd3b9363
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Raimund Renkert <raimund.renkert@intevation.de>
package imports

import (
	"context"
	"database/sql"
	"errors"
	"time"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/models"
	"gemma.intevation.de/gemma/pkg/soap/ifaf"
	"github.com/jackc/pgx/pgtype"
)

type FairwayAvailability struct {
	URL      string `json:"url"`
	Insecure bool   `json:"insecure"`
}

const FAJobKind JobKind = "fa"

const (
	listBottlenecksSQL = `
SELECT
	bottleneck_id,
	responsible_country
FROM waterway.bottlenecks
WHERE responsible_country = users.current_user_country()
	AND staging_done = true
`
	insertFASQL = `
INSERT INTO waterway.fairway_availability (
  position_code,
  bottleneck_id,
  surdat,
  critical,
  date_info,
  source_organization
) VALUES (
  $1,
  (SELECT id FROM waterway.bottlenecks WHERE bottleneck_id = $2),
  $3,
  $4,
  $5,
  $6
)
RETURNING id`

	insertBnPdfsSQL = `
INSERT INTO waterway.bottleneck_pdfs (
	fairway_availability_id,
	profile_pdf_filename,
	profile_pdf_url,
	pdf_generation_date,
	source_organization
) VALUES (
	$1,
	$2,
	$3,
	$4,
	$5
)`
	insertEFASQL = `
INSERT INTO waterway.effective_fairway_availability (
	fairway_availability_id,
	measure_date,
	level_of_service,
	available_depth_value,
	available_width_value,
	water_level_value,
	measure_type,
	source_organization,
	forecast_generation_time,
	value_lifetime
) VALUES (
	$1,
	$2,
	(SELECT
		level_of_service
	FROM levels_of_service
	WHERE name = $3),
	$4,
	$5,
	$6,
	$7,
	$8,
	$9,
	$10
)`
	insertFAVSQL = `
INSERT INTO waterway.fa_reference_values (
	fairway_availability_id,
	level_of_service,
	fairway_depth,
	fairway_width,
	fairway_radius,
	shallowest_spot
) VALUES (
	$1,
	(SELECT
		level_of_service
	FROM levels_of_service
	WHERE name = $2),
	$3,
	$4,
	$5,
	ST_MakePoint($6, $7)::geography
)`
)

type faJobCreator struct{}

func init() {
	RegisterJobCreator(FAJobKind, faJobCreator{})
}

func (faJobCreator) Description() string {
	return "fairway availability"
}

func (faJobCreator) Create(_ JobKind, data string) (Job, error) {
	fa := new(FairwayAvailability)
	if err := common.FromJSONString(data, fa); err != nil {
		return nil, err
	}
	return fa, nil
}

func (faJobCreator) Depends() []string {
	return []string{
		"bottlenecks",
		"fairway_availability",
		"bottleneck_pdfs",
		"effective_fairway_availability",
		"fa_reference_values",
		"levels_of_service",
	}
}

// StageDone moves the imported fairway availablities out of the staging area.
// Currently doing nothing.
func (faJobCreator) StageDone(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
) error {
	return nil
}

// CleanUp of a fairway availablities import is a NOP.
func (fa *FairwayAvailability) CleanUp() error { return nil }

// Do executes the actual fairway availability import.
func (fa *FairwayAvailability) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	// Get available bottlenecks from database for use as filter in SOAP request
	var rows *sql.Rows

	rows, err := conn.QueryContext(ctx, listBottlenecksSQL)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	bottlenecks := []models.Bottleneck{}

	for rows.Next() {
		var bn models.Bottleneck
		if err = rows.Scan(
			&bn.ID,
			&bn.ResponsibleCountry,
		); err != nil {
			return nil, err
		}
		bottlenecks = append(bottlenecks, bn)
	}

	if err = rows.Err(); err != nil {
		return nil, err
	}

	faids, err := fa.doForFAs(ctx, bottlenecks, conn, feedback)
	if err != nil {
		feedback.Error("Error processing data: %s", err)
	}
	if len(faids) == 0 {
		feedback.Info("No new fairway availablity data found")
		return nil, nil
	}
	feedback.Info("Processed %d of %d bottlenecks", len(faids), len(bottlenecks))
	// TODO: needs to be filled more useful.
	summary := struct {
		FairwayAvailabilities []string `json:"fairwayAvailabilities"`
	}{
		FairwayAvailabilities: faids,
	}
	return &summary, err
}

func (fa *FairwayAvailability) doForFAs(
	ctx context.Context,
	bottlenecks []models.Bottleneck,
	conn *sql.Conn,
	feedback Feedback,
) ([]string, error) {
	start := time.Now()

	client := ifaf.NewFairwayAvailabilityService(fa.URL, fa.Insecure, nil)

	var bnIds []string
	for _, bn := range bottlenecks {
		bnIds = append(bnIds, bn.ID)
	}

	ids := ifaf.ArrayOfString{
		String: bnIds,
	}

	// TODO: Filter by period. Period should start after latest measurement date.
	req := &ifaf.Get_bottleneck_fa{
		Bottleneck_id: &ids,
	}
	resp, err := client.Get_bottleneck_fa(req)
	if err != nil {
		feedback.Error("%v", err)
		return nil, err
	}

	if resp.Get_bottleneck_faResult == nil {
		err := errors.New("no fairway availabilities found")
		return nil, err
	}

	result := resp.Get_bottleneck_faResult

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	insertFAStmt, err := tx.PrepareContext(ctx, insertFASQL)
	if err != nil {
		return nil, err
	}
	defer insertFAStmt.Close()
	insertBnPdfsStmt, err := tx.PrepareContext(ctx, insertBnPdfsSQL)
	if err != nil {
		return nil, err
	}
	defer insertBnPdfsStmt.Close()
	insertEFAStmt, err := tx.PrepareContext(ctx, insertEFASQL)
	if err != nil {
		return nil, err
	}
	defer insertEFAStmt.Close()
	insertFAVStmt, err := tx.PrepareContext(ctx, insertFAVSQL)
	if err != nil {
		return nil, err
	}
	defer insertFAVStmt.Close()

	var faids []string
	var faId int64
	feedback.Info("Found %d fairway availabilities", len(result.FairwayAvailability))
	for _, faRes := range result.FairwayAvailability {
		// TODO: high frequent requests lead to "duplicate key value violates unique constraint "fairway_availability_bottleneck_id_surdat_key"
		// in the database. This has to be resolved.
		// All data subsets can also ocure as duplicates!
		err = insertFAStmt.QueryRowContext(
			ctx,
			faRes.POSITION,
			faRes.Bottleneck_id,
			faRes.SURDAT,
			faRes.Critical,
			faRes.Date_Info,
			faRes.Source,
		).Scan(&faId)
		if err != nil {
			return nil, err
		}
		feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id)
		faids = append(faids, faRes.Bottleneck_id)
		for _, bnPdfs := range faRes.Bottleneck_PDFs.PdfInfo {
			_, err = insertBnPdfsStmt.ExecContext(
				ctx,
				faId,
				bnPdfs.ProfilePdfFilename,
				bnPdfs.ProfilePdfURL,
				bnPdfs.PDF_Generation_Date,
				bnPdfs.Source,
			)
			if err != nil {
				return nil, err
			}
			feedback.Info("Add %d Pdfs", len(faRes.Bottleneck_PDFs.PdfInfo))
		}
		for _, efa := range faRes.Effective_fairway_availability.EffectiveFairwayAvailability {
			los := efa.Level_of_Service
			fgt := efa.Forecast_generation_time
			if efa.Forecast_generation_time.Status == pgtype.Undefined {
				fgt = pgtype.Timestamp{
					Status: pgtype.Null,
				}
			}
			_, err = insertEFAStmt.ExecContext(
				ctx,
				faId,
				efa.Measure_date,
				string(*los),
				efa.Available_depth_value,
				efa.Available_width_value,
				efa.Water_level_value,
				efa.Measure_type,
				efa.Source,
				fgt,
				efa.Value_lifetime,
			)
			if err != nil {
				return nil, err
			}
			feedback.Info("Add %d Effective Fairway Availability", len(
				faRes.Effective_fairway_availability.EffectiveFairwayAvailability))
		}
		for _, fav := range faRes.Reference_values.ReferenceValue {
			_, err = insertFAVStmt.ExecContext(
				ctx,
				faId,
				fav.Level_of_Service,
				fav.Fairway_depth,
				fav.Fairway_width,
				fav.Fairway_radius,
				fav.Shallowest_spot_Lat,
				fav.Shallowest_spot_Lon,
			)
			if err != nil {
				return nil, err
			}
			feedback.Info("Add %d Reference Values",
				len(faRes.Reference_values.ReferenceValue))
		}
	}
	feedback.Info("Storing fairway availabilities took %s", time.Since(start))
	if err = tx.Commit(); err == nil {
		feedback.Info("Import of fairway availabilities was successful")
	}

	return faids, nil
}