view pkg/imports/wg.go @ 5565:ade07a3f2cfd

Forget to change signature of some imports.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 08 Mar 2022 17:38:48 +0100
parents 59a99655f34d
children e1936db6db8e
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Tom Gottfried <tom.gottfried@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/models"
	"gemma.intevation.de/gemma/pkg/pgxutils"
	"gemma.intevation.de/gemma/pkg/soap/erdms"
)

// WaterwayGauge is a Job to load gauge data from
// a specified NTS service and stores them into the database.
type WaterwayGauge struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Username is the username used to authenticate.
	Username string `json:"username"`
	// Passwort is the password to authenticate.
	Password string `json:"password"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

// Description gives a short info about relevant facts of this import.
func (wg *WaterwayGauge) Description([]string) (string, error) {
	return wg.URL, nil
}

// WGJobKind is the unique name of this import job type.
const WGJobKind JobKind = "wg"

type wgJobCreator struct{}

func init() { RegisterJobCreator(WGJobKind, wgJobCreator{}) }

func (wgJobCreator) Description() string { return "waterway gauges" }

func (wgJobCreator) AutoAccept() bool { return true }

func (wgJobCreator) Create() Job { return new(WaterwayGauge) }

func (wgJobCreator) Depends() [2][]string {
	return [2][]string{
		{"gauges_reference_water_levels", "gauges"},
		{"depth_references"},
	}
}

// StageDone does nothing as there is no staging for gauges.
func (wgJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error { return nil }

// CleanUp does nothing as there is nothing to cleanup with gauges.
func (*WaterwayGauge) CleanUp() error { return nil }

const (
	eraseObsoleteGaugesSQL = `
UPDATE waterway.gauges SET erased = true, validity = validity - '[now,)'
WHERE NOT erased
  AND (location).country_code = ANY($1)
  AND isrs_astext(location) <> ALL($2)
RETURNING isrs_astext(location)
`

	eraseGaugeSQL = `
WITH upd AS (
  UPDATE waterway.gauges SET
    erased = true
  WHERE isrs_astext(location) = $1
    AND NOT erased
    -- Don't touch old entry if new validity contains old: will be updated
    AND NOT validity <@ $2
  RETURNING 1
)
-- Decide whether a new version will be INSERTed
SELECT EXISTS(SELECT 1 FROM upd)
  OR NOT EXISTS(SELECT 1 FROM waterway.gauges WHERE isrs_astext(location) = $1)
`

	insertGaugeSQL = `
INSERT INTO waterway.gauges (
  location,
  objname,
  geom,
  applicability_from_km,
  applicability_to_km,
  validity,
  zero_point,
  geodref,
  date_info,
  source_organization,
  lastupdate
) VALUES (
  ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
  $6,
  ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14,
  $15,
  $16
)
`

	fixValiditySQL = `
UPDATE waterway.gauges SET
   -- Set enddate of old entry to new startdate in case of overlap:
  validity = validity - $2
WHERE isrs_astext(location) = $1
  AND validity && $2
  AND erased
`

	updateGaugeSQL = `
UPDATE waterway.gauges SET
  objname = $6,
  geom = ST_SetSRID(ST_MakePoint($7, $8), 4326),
  applicability_from_km = $9,
  applicability_to_km = $10,
  zero_point = $11,
  geodref = $12,
  date_info = $13,
  source_organization = $14,
  lastupdate = $15,
  validity = $16
WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
  AND NOT erased
  AND $15 > lastupdate
RETURNING 1
`

	deleteReferenceWaterLevelsSQL = `
DELETE FROM waterway.gauges_reference_water_levels
WHERE isrs_astext(location) = $1
  AND validity = $2
  AND depth_reference <> ALL($3)
RETURNING depth_reference
`

	isNtSDepthRefSQL = `
SELECT EXISTS(SELECT 1 FROM depth_references WHERE depth_reference = $1)`

	insertReferenceWaterLevelsSQL = `
INSERT INTO waterway.gauges_reference_water_levels (
  location,
  validity,
  depth_reference,
  value
) VALUES (
  ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
  $6,
  $7,
  $8
) ON CONFLICT (location, validity, depth_reference) DO UPDATE SET
    value = EXCLUDED.value
`
)

// Do implements the actual import.
func (wg *WaterwayGauge) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	responseData, countries, err := getRisData(
		ctx,
		conn,
		feedback,
		wg.Username,
		wg.Password,
		wg.URL,
		wg.Insecure,
		"wtwgag")
	if err != nil {
		return nil, err
	}

	var eraseGaugeStmt, insertStmt,
		fixValidityStmt, updateStmt,
		deleteReferenceWaterLevelsStmt,
		isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt
	for _, x := range []struct {
		sql  string
		stmt **sql.Stmt
	}{
		{eraseGaugeSQL, &eraseGaugeStmt},
		{insertGaugeSQL, &insertStmt},
		{fixValiditySQL, &fixValidityStmt},
		{updateGaugeSQL, &updateStmt},
		{deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt},
		{isNtSDepthRefSQL, &isNtSDepthRefStmt},
		{insertReferenceWaterLevelsSQL, &insertWaterLevelStmt},
	} {
		var err error
		if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil {
			return nil, err
		}
		defer (*x.stmt).Close()
	}

	var gauges []string
	var unchanged int

	for _, data := range responseData {
		for _, dr := range data.RisdataReturn {

			isrs := string(*dr.RisidxCode)
			code, err := models.IsrsFromString(isrs)
			if err != nil {
				feedback.Warn("Invalid ISRS code '%s': %v", isrs, err)
				continue
			}
			gauges = append(gauges, isrs)
			feedback.Info("Processing %s", code)

			// We need a valid, non-empty time range to identify gauge versions
			if dr.Enddate != nil && dr.Startdate != nil &&
				!time.Time(*dr.Enddate).After(time.Time(*dr.Startdate)) {
				feedback.Error("End date not after start date")
				unchanged++
				continue
			}

			var from, to sql.NullInt64

			if dr.Applicabilityfromkm != nil {
				from = sql.NullInt64{
					Int64: int64(*dr.Applicabilityfromkm),
					Valid: true,
				}
			}
			if dr.Applicabilitytokm != nil {
				to = sql.NullInt64{
					Int64: int64(*dr.Applicabilitytokm),
					Valid: true,
				}
			}

			var tfrom, tto, dateInfo pgtype.Timestamptz

			if dr.Startdate != nil {
				tfrom = pgtype.Timestamptz{
					Time:   time.Time(*dr.Startdate),
					Status: pgtype.Present,
				}
			} else {
				tfrom = pgtype.Timestamptz{
					Status: pgtype.Null,
				}
			}

			if dr.Enddate != nil {
				tto = pgtype.Timestamptz{
					Time:   time.Time(*dr.Enddate),
					Status: pgtype.Present,
				}
			} else {
				tto = pgtype.Timestamptz{
					Status: pgtype.Null,
				}
			}

			validity := pgtype.Tstzrange{
				Lower:     tfrom,
				Upper:     tto,
				LowerType: pgtype.Inclusive,
				UpperType: pgtype.Exclusive,
				Status:    pgtype.Present,
			}

			if dr.Infodate != nil {
				dateInfo = pgtype.Timestamptz{
					Time:   time.Time(*dr.Infodate),
					Status: pgtype.Present,
				}
			} else {
				dateInfo = pgtype.Timestamptz{
					Status: pgtype.Null,
				}
			}

			var geodref sql.NullString
			if dr.Geodref != nil {
				geodref = sql.NullString{
					String: string(*dr.Geodref),
					Valid:  true,
				}
			}

			var source sql.NullString
			if dr.Source != nil {
				source = sql.NullString{
					String: string(*dr.Source),
					Valid:  true,
				}
			}

			tx, err := conn.BeginTx(ctx, nil)
			if err != nil {
				return nil, err
			}
			defer tx.Rollback()

			// Mark old entry of gauge as erased, if applicable
			var isNew bool
			err = tx.StmtContext(ctx, eraseGaugeStmt).QueryRowContext(ctx,
				code.String(),
				validity,
			).Scan(&isNew)
			switch {
			case err != nil:
				feedback.Error(pgxutils.ReadableError{Err: err}.Error())
				if err2 := tx.Rollback(); err2 != nil {
					return nil, err2
				}
				unchanged++
				continue
			case isNew:
				// insert gauge version entry
				if _, err = tx.StmtContext(ctx, insertStmt).ExecContext(ctx,
					code.CountryCode,
					code.LoCode,
					code.FairwaySection,
					code.Orc,
					code.Hectometre,
					dr.Objname.Loc,
					dr.Lon, dr.Lat,
					from,
					to,
					&validity,
					dr.Zeropoint,
					geodref,
					&dateInfo,
					source,
					time.Time(*dr.Lastupdate),
				); err != nil {
					feedback.Error(pgxutils.ReadableError{Err: err}.Error())
					if err2 := tx.Rollback(); err2 != nil {
						return nil, err2
					}
					unchanged++
					continue
				}
				feedback.Info("insert new version")
			case !isNew:
				// try to update
				var dummy int
				err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx,
					code.CountryCode,
					code.LoCode,
					code.FairwaySection,
					code.Orc,
					code.Hectometre,
					dr.Objname.Loc,
					dr.Lon, dr.Lat,
					from,
					to,
					dr.Zeropoint,
					geodref,
					&dateInfo,
					source,
					time.Time(*dr.Lastupdate),
					&validity,
				).Scan(&dummy)
				switch {
				case err2 == sql.ErrNoRows:
					feedback.Info("unchanged")
					if err3 := tx.Rollback(); err3 != nil {
						return nil, err3
					}
					unchanged++
					continue
				case err2 != nil:
					feedback.Error(pgxutils.ReadableError{Err: err2}.Error())
					if err3 := tx.Rollback(); err3 != nil {
						return nil, err3
					}
					unchanged++
					continue
				default:
					feedback.Info("update")
				}

				// Remove obsolete reference water levels
				var currLevels pgtype.VarcharArray
				currLevels.Set([]string{
					string(*dr.Reflevel1code),
					string(*dr.Reflevel2code),
					string(*dr.Reflevel3code),
				})
				rwls, err := tx.StmtContext(ctx,
					deleteReferenceWaterLevelsStmt).QueryContext(ctx,
					code.String(),
					&validity,
					&currLevels,
				)
				if err != nil {
					return nil, err
				}
				defer rwls.Close()
				for rwls.Next() {
					var delRef string
					if err := rwls.Scan(&delRef); err != nil {
						return nil, err
					}
					feedback.Warn("Removed reference water level %s from %s",
						delRef, code)
				}
				if err := rwls.Err(); err != nil {
					return nil, err
				}
			}

			// Set end of validity of old version to start of new version
			// in case of overlap
			if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext(
				ctx,
				code.String(),
				&validity,
			); err != nil {
				feedback.Error(pgxutils.ReadableError{Err: err}.Error())
				if err2 := tx.Rollback(); err2 != nil {
					return nil, err2
				}
				unchanged++
				continue
			}

			// "Upsert" reference water levels
			for _, wl := range []struct {
				level **erdms.RisreflevelcodeType
				value **erdms.RisreflevelvalueType
			}{
				{&dr.Reflevel1code, &dr.Reflevel1value},
				{&dr.Reflevel2code, &dr.Reflevel2value},
				{&dr.Reflevel3code, &dr.Reflevel3value},
			} {
				if *wl.level == nil || *wl.value == nil {
					continue
				}

				var isNtSDepthRef bool
				if err := tx.StmtContext(
					ctx, isNtSDepthRefStmt).QueryRowContext(ctx,
					string(**wl.level),
				).Scan(
					&isNtSDepthRef,
				); err != nil {
					return nil, err
				}
				if !isNtSDepthRef {
					feedback.Warn(
						"Reference level code '%s' is not in line "+
							"with the NtS reference_code table",
						string(**wl.level))
				}

				if _, err := tx.StmtContext(
					ctx, insertWaterLevelStmt).ExecContext(ctx,
					code.CountryCode,
					code.LoCode,
					code.FairwaySection,
					code.Orc,
					code.Hectometre,
					&validity,
					string(**wl.level),
					int64(**wl.value),
				); err != nil {
					feedback.Error(pgxutils.ReadableError{Err: err}.Error())
					tx.Rollback()
					continue
				}
			}

			if err = tx.Commit(); err != nil {
				return nil, err
			}
		}
	}

	if len(gauges) == 0 {
		return nil, UnchangedError("No gauges returned from ERDMS")
	}

	var pgCountries, pgGauges pgtype.VarcharArray
	pgCountries.Set(countries)
	pgGauges.Set(gauges)
	obsGauges, err := conn.QueryContext(ctx,
		eraseObsoleteGaugesSQL,
		&pgCountries,
		&pgGauges)
	if err != nil {
		return nil, err
	}
	defer obsGauges.Close()
	for obsGauges.Next() {
		var isrs string
		if err := obsGauges.Scan(&isrs); err != nil {
			return nil, err
		}
		feedback.Info("Erased %s", isrs)
		unchanged--
	}
	if err := obsGauges.Err(); err != nil {
		return nil, err
	}

	if unchanged == len(gauges) {
		return nil, UnchangedError("All gauges unchanged")
	}

	feedback.Info("Importing gauges took %s",
		time.Since(start))

	return nil, err
}