view pkg/imports/fa.go @ 2198:4db1fa4f049c

Fairway availabilty import: Fixed row query leak.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 12 Feb 2019 14:55:09 +0100
parents e57ba9585aaa
children 64147a137e0a
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Raimund Renkert <raimund.renkert@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"errors"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/soap/ifaf"
)

// FairwayAvailability imports fairway availability data
// from an IFAF SOAP service.
type FairwayAvailability struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

type uniqueFairwayAvailability struct {
	BottleneckId string
	Surdat       time.Time
}

// FAJobKind is import queue type identifier.
const FAJobKind JobKind = "fa"

const (
	listBottlenecksSQL = `
SELECT
  bottleneck_id,
  responsible_country
FROM waterway.bottlenecks
WHERE responsible_country = users.current_user_country()
  AND staging_done = true
`
	latestMeasureDateSQL = `
SELECT
	measure_date
FROM waterway.effective_fairway_availability
ORDER BY measure_date DESC LIMIT 1
`
	listFairwayAvailabilitySQL = `
SELECT
  fa.id,
  bn.bottleneck_id,
  fa.surdat
FROM waterway.fairway_availability fa
JOIN waterway.bottlenecks bn ON bn.id = fa.bottleneck_id
`
	insertFASQL = `
INSERT INTO waterway.fairway_availability (
  position_code,
  bottleneck_id,
  surdat,
  critical,
  date_info,
  source_organization
) VALUES (
  $1,
  (SELECT id FROM waterway.bottlenecks WHERE bottleneck_id = $2),
  $3,
  $4,
  $5,
  $6
)
RETURNING id`

	insertBnPdfsSQL = `
INSERT INTO waterway.bottleneck_pdfs (
  fairway_availability_id,
  profile_pdf_filename,
  profile_pdf_url,
  pdf_generation_date,
  source_organization
) VALUES (
  $1,
  $2,
  $3,
  $4,
  $5
) ON CONFLICT ON CONSTRAINT bottleneck_pdfs_pkey DO NOTHING`
	insertEFASQL = `
INSERT INTO waterway.effective_fairway_availability (
  fairway_availability_id,
  measure_date,
  level_of_service,
  available_depth_value,
  available_width_value,
  water_level_value,
  measure_type,
  source_organization,
  forecast_generation_time,
  value_lifetime
) VALUES (
  $1,
  $2,
  (SELECT
    level_of_service
  FROM levels_of_service
  WHERE name = $3),
  $4,
  $5,
  $6,
  $7,
  $8,
  $9,
  $10
) ON CONFLICT ON CONSTRAINT effective_fairway_availability_pkey DO NOTHING`
	insertFAVSQL = `
INSERT INTO waterway.fa_reference_values (
  fairway_availability_id,
  level_of_service,
  fairway_depth,
  fairway_width,
  fairway_radius,
  shallowest_spot
) VALUES (
  $1,
  (SELECT
    level_of_service
  FROM levels_of_service
  WHERE name = $2),
  $3,
  $4,
  $5,
  ST_MakePoint($6, $7)::geography
)ON CONFLICT ON CONSTRAINT fa_reference_values_pkey DO NOTHING`
)

type faJobCreator struct{}

func init() {
	RegisterJobCreator(FAJobKind, faJobCreator{})
}

func (faJobCreator) Description() string {
	return "fairway availability"
}

func (faJobCreator) Create() Job { return new(FairwayAvailability) }

func (faJobCreator) Depends() []string {
	return []string{
		"bottlenecks",
		"fairway_availability",
		"bottleneck_pdfs",
		"effective_fairway_availability",
		"fa_reference_values",
		"levels_of_service",
	}
}

func (faJobCreator) AutoAccept() bool { return true }

// StageDone moves the imported fairway availablities out of the staging area.
// Currently doing nothing.
func (faJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
	return nil
}

// CleanUp of a fairway availablities import is a NOP.
func (*FairwayAvailability) CleanUp() error { return nil }

type bottleneckCountry struct {
	ID                 string
	ResponsibleCountry string
}

func loadBottleneckCountries(ctx context.Context, conn *sql.Conn) ([]bottleneckCountry, error) {

	// Get available bottlenecks from database for use as filter in SOAP request
	rows, err := conn.QueryContext(ctx, listBottlenecksSQL)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var bottlenecks []bottleneckCountry

	for rows.Next() {
		var bn bottleneckCountry
		if err = rows.Scan(
			&bn.ID,
			&bn.ResponsibleCountry,
		); err != nil {
			return nil, err
		}
		bottlenecks = append(bottlenecks, bn)
	}
	if err = rows.Err(); err != nil {
		return nil, err
	}
	return bottlenecks, nil
}

func loadFairwayAvailabilities(ctx context.Context, conn *sql.Conn) (map[uniqueFairwayAvailability]int64, error) {
	rows, err := conn.QueryContext(ctx, listFairwayAvailabilitySQL)
	if err != nil {
		return nil, err
	}
	defer rows.Close()
	fairwayAvailabilities := map[uniqueFairwayAvailability]int64{}
	for rows.Next() {
		var id int64
		var bnId string
		var sd time.Time
		if err = rows.Scan(
			&id,
			&bnId,
			&sd,
		); err != nil {
			return nil, err
		}
		key := uniqueFairwayAvailability{
			BottleneckId: bnId,
			Surdat:       sd,
		}
		fairwayAvailabilities[key] = id
	}
	if err = rows.Err(); err != nil {
		return nil, err
	}
	return fairwayAvailabilities, nil
}

func latestDate(ctx context.Context, conn *sql.Conn) (pgtype.Timestamp, error) {
	var date pgtype.Timestamp
	err := conn.QueryRowContext(ctx, latestMeasureDateSQL).Scan(&date)
	switch {
	case err == sql.ErrNoRows:
		date = pgtype.Timestamp{
			// Fill Database with data of the last 5 days. Change this to a more useful value.
			Time: time.Now().AddDate(0, 0, -5),
		}
	case err != nil:
		return pgtype.Timestamp{}, err
	}
	return date, nil
}

// Do executes the actual fairway availability import.
func (fa *FairwayAvailability) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	bottlenecks, err := loadBottleneckCountries(ctx, conn)
	if err != nil {
		return nil, err
	}

	fairwayAvailabilities, err := loadFairwayAvailabilities(ctx, conn)
	if err != nil {
		return nil, err
	}

	latest, err := latestDate(ctx, conn)
	if err != nil {
		return nil, err
	}

	faids, err := fa.doForFAs(ctx, bottlenecks, fairwayAvailabilities, latest, conn, feedback)
	if err != nil {
		feedback.Error("Error processing data: %s", err)
	}
	if len(faids) == 0 {
		feedback.Info("No new fairway availablity data found")
		return nil, nil
	}
	feedback.Info("Processed %d fairway availabilities", len(faids))
	// TODO: needs to be filled more useful.
	summary := struct {
		FairwayAvailabilities []string `json:"fairwayAvailabilities"`
	}{
		FairwayAvailabilities: faids,
	}
	return &summary, err
}

func (fa *FairwayAvailability) doForFAs(
	ctx context.Context,
	bottlenecks []bottleneckCountry,
	fairwayAvailabilities map[uniqueFairwayAvailability]int64,
	latestDate pgtype.Timestamp,
	conn *sql.Conn,
	feedback Feedback,
) ([]string, error) {
	start := time.Now()

	client := ifaf.NewFairwayAvailabilityService(fa.URL, fa.Insecure, nil)

	bnIds := make([]string, len(bottlenecks))
	for i := range bottlenecks {
		bnIds[i] = bottlenecks[i].ID
	}

	var period ifaf.RequestedPeriod
	period.Date_start = latestDate.Time
	period.Date_end = time.Now()

	ids := ifaf.ArrayOfString{
		String: bnIds,
	}

	req := &ifaf.Get_bottleneck_fa{
		Bottleneck_id: &ids,
		Period:        &period,
	}
	resp, err := client.Get_bottleneck_fa(req)
	if err != nil {
		return nil, err
	}

	if resp.Get_bottleneck_faResult == nil {
		return nil, errors.New("no fairway availabilities found")
	}

	result := resp.Get_bottleneck_faResult

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	insertFAStmt, err := tx.PrepareContext(ctx, insertFASQL)
	if err != nil {
		return nil, err
	}
	defer insertFAStmt.Close()
	insertBnPdfsStmt, err := tx.PrepareContext(ctx, insertBnPdfsSQL)
	if err != nil {
		return nil, err
	}
	defer insertBnPdfsStmt.Close()
	insertEFAStmt, err := tx.PrepareContext(ctx, insertEFASQL)
	if err != nil {
		return nil, err
	}
	defer insertEFAStmt.Close()
	insertFAVStmt, err := tx.PrepareContext(ctx, insertFAVSQL)
	if err != nil {
		return nil, err
	}
	defer insertFAVStmt.Close()

	var faIDs []string
	var faID int64
	feedback.Info("Found %d fairway availabilities", len(result.FairwayAvailability))
	for _, faRes := range result.FairwayAvailability {
		uniqueFa := uniqueFairwayAvailability{
			BottleneckId: faRes.Bottleneck_id,
			Surdat:       faRes.SURDAT,
		}
		var found bool
		if faID, found = fairwayAvailabilities[uniqueFa]; !found {
			err = insertFAStmt.QueryRowContext(
				ctx,
				faRes.POSITION,
				faRes.Bottleneck_id,
				faRes.SURDAT,
				faRes.Critical,
				faRes.Date_Info,
				faRes.Source,
			).Scan(&faID)
			if err != nil {
				return nil, err
			}
			fairwayAvailabilities[uniqueFa] = faID
		}
		feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id)
		faIDs = append(faIDs, faRes.Bottleneck_id)
		if faRes.Bottleneck_PDFs != nil {
			bnPdfCount := 0
			for _, bnPdfs := range faRes.Bottleneck_PDFs.PdfInfo {
				res, err := insertBnPdfsStmt.ExecContext(
					ctx,
					faID,
					bnPdfs.ProfilePdfFilename,
					bnPdfs.ProfilePdfURL,
					bnPdfs.PDF_Generation_Date,
					bnPdfs.Source,
				)
				if err != nil {
					return nil, err
				}
				affected, err := res.RowsAffected()
				if err == nil {
					bnPdfCount += int(affected)
				} else {
					bnPdfCount++
				}
			}
			feedback.Info("Add %d Pdfs", bnPdfCount)
		}
		if faRes.Effective_fairway_availability != nil {
			efaCount := 0
			for _, efa := range faRes.Effective_fairway_availability.EffectiveFairwayAvailability {
				los := efa.Level_of_Service
				fgt := efa.Forecast_generation_time
				if efa.Forecast_generation_time.Status == pgtype.Undefined {
					fgt = pgtype.Timestamp{
						Status: pgtype.Null,
					}
				}
				res, err := insertEFAStmt.ExecContext(
					ctx,
					faID,
					efa.Measure_date,
					string(*los),
					efa.Available_depth_value,
					efa.Available_width_value,
					efa.Water_level_value,
					efa.Measure_type,
					efa.Source,
					fgt.Get(),
					efa.Value_lifetime,
				)
				if err != nil {
					return nil, err
				}
				affected, err := res.RowsAffected()
				if err == nil {
					efaCount += int(affected)
				} else {
					efaCount++
				}
			}
			feedback.Info("Add %d Effective Fairway Availability", efaCount)
		}

		if faRes.Reference_values != nil {
			rvCount := 0
			for _, fav := range faRes.Reference_values.ReferenceValue {
				res, err := insertFAVStmt.ExecContext(
					ctx,
					faID,
					fav.Level_of_Service,
					fav.Fairway_depth,
					fav.Fairway_width,
					fav.Fairway_radius,
					fav.Shallowest_spot_Lat,
					fav.Shallowest_spot_Lon,
				)
				if err != nil {
					return nil, err
				}
				affected, err := res.RowsAffected()
				if err == nil {
					rvCount += int(affected)
				} else {
					rvCount++
				}
			}
			feedback.Info("Add %d Reference Values", rvCount)
		}
	}
	feedback.Info("Storing fairway availabilities took %s", time.Since(start))
	if err = tx.Commit(); err == nil {
		feedback.Info("Import of fairway availabilities was successful")
	}

	return faIDs, nil
}