view pkg/imports/wg.go @ 5560:f2204f91d286

Join the log lines of imports to the log exports to recover data from them. Used in SR export to extract information that where in the meta json but now are only found in the log.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 09 Feb 2022 18:34:40 +0100
parents 59a99655f34d
children ade07a3f2cfd
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Tom Gottfried <tom.gottfried@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/models"
	"gemma.intevation.de/gemma/pkg/pgxutils"
	"gemma.intevation.de/gemma/pkg/soap/erdms"
)

// WaterwayGauge is a Job to load gauge data from
// a specified NTS service and stores them into the database.
type WaterwayGauge struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Username is the username used to authenticate.
	Username string `json:"username"`
	// Passwort is the password to authenticate.
	Password string `json:"password"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

// Description gives a short info about relevant facts of this import.
func (wg *WaterwayGauge) Description() (string, error) {
	return wg.URL, nil
}

// WGJobKind is the unique name of this import job type.
const WGJobKind JobKind = "wg"

type wgJobCreator struct{}

func init() { RegisterJobCreator(WGJobKind, wgJobCreator{}) }

func (wgJobCreator) Description() string { return "waterway gauges" }

func (wgJobCreator) AutoAccept() bool { return true }

func (wgJobCreator) Create() Job { return new(WaterwayGauge) }

func (wgJobCreator) Depends() [2][]string {
	return [2][]string{
		{"gauges_reference_water_levels", "gauges"},
		{"depth_references"},
	}
}

// StageDone does nothing as there is no staging for gauges.
func (wgJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error { return nil }

// CleanUp does nothing as there is nothing to cleanup with gauges.
func (*WaterwayGauge) CleanUp() error { return nil }

const (
	eraseObsoleteGaugesSQL = `
UPDATE waterway.gauges SET erased = true, validity = validity - '[now,)'
WHERE NOT erased
  AND (location).country_code = ANY($1)
  AND isrs_astext(location) <> ALL($2)
RETURNING isrs_astext(location)
`

	eraseGaugeSQL = `
WITH upd AS (
  UPDATE waterway.gauges SET
    erased = true
  WHERE isrs_astext(location) = $1
    AND NOT erased
    -- Don't touch old entry if new validity contains old: will be updated
    AND NOT validity <@ $2
  RETURNING 1
)
-- Decide whether a new version will be INSERTed
SELECT EXISTS(SELECT 1 FROM upd)
  OR NOT EXISTS(SELECT 1 FROM waterway.gauges WHERE isrs_astext(location) = $1)
`

	insertGaugeSQL = `
INSERT INTO waterway.gauges (
  location,
  objname,
  geom,
  applicability_from_km,
  applicability_to_km,
  validity,
  zero_point,
  geodref,
  date_info,
  source_organization,
  lastupdate
) VALUES (
  ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
  $6,
  ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14,
  $15,
  $16
)
`

	fixValiditySQL = `
UPDATE waterway.gauges SET
   -- Set enddate of old entry to new startdate in case of overlap:
  validity = validity - $2
WHERE isrs_astext(location) = $1
  AND validity && $2
  AND erased
`

	updateGaugeSQL = `
UPDATE waterway.gauges SET
  objname = $6,
  geom = ST_SetSRID(ST_MakePoint($7, $8), 4326),
  applicability_from_km = $9,
  applicability_to_km = $10,
  zero_point = $11,
  geodref = $12,
  date_info = $13,
  source_organization = $14,
  lastupdate = $15,
  validity = $16
WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
  AND NOT erased
  AND $15 > lastupdate
RETURNING 1
`

	deleteReferenceWaterLevelsSQL = `
DELETE FROM waterway.gauges_reference_water_levels
WHERE isrs_astext(location) = $1
  AND validity = $2
  AND depth_reference <> ALL($3)
RETURNING depth_reference
`

	isNtSDepthRefSQL = `
SELECT EXISTS(SELECT 1 FROM depth_references WHERE depth_reference = $1)`

	insertReferenceWaterLevelsSQL = `
INSERT INTO waterway.gauges_reference_water_levels (
  location,
  validity,
  depth_reference,
  value
) VALUES (
  ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
  $6,
  $7,
  $8
) ON CONFLICT (location, validity, depth_reference) DO UPDATE SET
    value = EXCLUDED.value
`
)

// Do implements the actual import.
func (wg *WaterwayGauge) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	responseData, countries, err := getRisData(
		ctx,
		conn,
		feedback,
		wg.Username,
		wg.Password,
		wg.URL,
		wg.Insecure,
		"wtwgag")
	if err != nil {
		return nil, err
	}

	var eraseGaugeStmt, insertStmt,
		fixValidityStmt, updateStmt,
		deleteReferenceWaterLevelsStmt,
		isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt
	for _, x := range []struct {
		sql  string
		stmt **sql.Stmt
	}{
		{eraseGaugeSQL, &eraseGaugeStmt},
		{insertGaugeSQL, &insertStmt},
		{fixValiditySQL, &fixValidityStmt},
		{updateGaugeSQL, &updateStmt},
		{deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt},
		{isNtSDepthRefSQL, &isNtSDepthRefStmt},
		{insertReferenceWaterLevelsSQL, &insertWaterLevelStmt},
	} {
		var err error
		if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil {
			return nil, err
		}
		defer (*x.stmt).Close()
	}

	var gauges []string
	var unchanged int

	for _, data := range responseData {
		for _, dr := range data.RisdataReturn {

			isrs := string(*dr.RisidxCode)
			code, err := models.IsrsFromString(isrs)
			if err != nil {
				feedback.Warn("Invalid ISRS code '%s': %v", isrs, err)
				continue
			}
			gauges = append(gauges, isrs)
			feedback.Info("Processing %s", code)

			// We need a valid, non-empty time range to identify gauge versions
			if dr.Enddate != nil && dr.Startdate != nil &&
				!time.Time(*dr.Enddate).After(time.Time(*dr.Startdate)) {
				feedback.Error("End date not after start date")
				unchanged++
				continue
			}

			var from, to sql.NullInt64

			if dr.Applicabilityfromkm != nil {
				from = sql.NullInt64{
					Int64: int64(*dr.Applicabilityfromkm),
					Valid: true,
				}
			}
			if dr.Applicabilitytokm != nil {
				to = sql.NullInt64{
					Int64: int64(*dr.Applicabilitytokm),
					Valid: true,
				}
			}

			var tfrom, tto, dateInfo pgtype.Timestamptz

			if dr.Startdate != nil {
				tfrom = pgtype.Timestamptz{
					Time:   time.Time(*dr.Startdate),
					Status: pgtype.Present,
				}
			} else {
				tfrom = pgtype.Timestamptz{
					Status: pgtype.Null,
				}
			}

			if dr.Enddate != nil {
				tto = pgtype.Timestamptz{
					Time:   time.Time(*dr.Enddate),
					Status: pgtype.Present,
				}
			} else {
				tto = pgtype.Timestamptz{
					Status: pgtype.Null,
				}
			}

			validity := pgtype.Tstzrange{
				Lower:     tfrom,
				Upper:     tto,
				LowerType: pgtype.Inclusive,
				UpperType: pgtype.Exclusive,
				Status:    pgtype.Present,
			}

			if dr.Infodate != nil {
				dateInfo = pgtype.Timestamptz{
					Time:   time.Time(*dr.Infodate),
					Status: pgtype.Present,
				}
			} else {
				dateInfo = pgtype.Timestamptz{
					Status: pgtype.Null,
				}
			}

			var geodref sql.NullString
			if dr.Geodref != nil {
				geodref = sql.NullString{
					String: string(*dr.Geodref),
					Valid:  true,
				}
			}

			var source sql.NullString
			if dr.Source != nil {
				source = sql.NullString{
					String: string(*dr.Source),
					Valid:  true,
				}
			}

			tx, err := conn.BeginTx(ctx, nil)
			if err != nil {
				return nil, err
			}
			defer tx.Rollback()

			// Mark old entry of gauge as erased, if applicable
			var isNew bool
			err = tx.StmtContext(ctx, eraseGaugeStmt).QueryRowContext(ctx,
				code.String(),
				validity,
			).Scan(&isNew)
			switch {
			case err != nil:
				feedback.Error(pgxutils.ReadableError{Err: err}.Error())
				if err2 := tx.Rollback(); err2 != nil {
					return nil, err2
				}
				unchanged++
				continue
			case isNew:
				// insert gauge version entry
				if _, err = tx.StmtContext(ctx, insertStmt).ExecContext(ctx,
					code.CountryCode,
					code.LoCode,
					code.FairwaySection,
					code.Orc,
					code.Hectometre,
					dr.Objname.Loc,
					dr.Lon, dr.Lat,
					from,
					to,
					&validity,
					dr.Zeropoint,
					geodref,
					&dateInfo,
					source,
					time.Time(*dr.Lastupdate),
				); err != nil {
					feedback.Error(pgxutils.ReadableError{Err: err}.Error())
					if err2 := tx.Rollback(); err2 != nil {
						return nil, err2
					}
					unchanged++
					continue
				}
				feedback.Info("insert new version")
			case !isNew:
				// try to update
				var dummy int
				err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx,
					code.CountryCode,
					code.LoCode,
					code.FairwaySection,
					code.Orc,
					code.Hectometre,
					dr.Objname.Loc,
					dr.Lon, dr.Lat,
					from,
					to,
					dr.Zeropoint,
					geodref,
					&dateInfo,
					source,
					time.Time(*dr.Lastupdate),
					&validity,
				).Scan(&dummy)
				switch {
				case err2 == sql.ErrNoRows:
					feedback.Info("unchanged")
					if err3 := tx.Rollback(); err3 != nil {
						return nil, err3
					}
					unchanged++
					continue
				case err2 != nil:
					feedback.Error(pgxutils.ReadableError{Err: err2}.Error())
					if err3 := tx.Rollback(); err3 != nil {
						return nil, err3
					}
					unchanged++
					continue
				default:
					feedback.Info("update")
				}

				// Remove obsolete reference water levels
				var currLevels pgtype.VarcharArray
				currLevels.Set([]string{
					string(*dr.Reflevel1code),
					string(*dr.Reflevel2code),
					string(*dr.Reflevel3code),
				})
				rwls, err := tx.StmtContext(ctx,
					deleteReferenceWaterLevelsStmt).QueryContext(ctx,
					code.String(),
					&validity,
					&currLevels,
				)
				if err != nil {
					return nil, err
				}
				defer rwls.Close()
				for rwls.Next() {
					var delRef string
					if err := rwls.Scan(&delRef); err != nil {
						return nil, err
					}
					feedback.Warn("Removed reference water level %s from %s",
						delRef, code)
				}
				if err := rwls.Err(); err != nil {
					return nil, err
				}
			}

			// Set end of validity of old version to start of new version
			// in case of overlap
			if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext(
				ctx,
				code.String(),
				&validity,
			); err != nil {
				feedback.Error(pgxutils.ReadableError{Err: err}.Error())
				if err2 := tx.Rollback(); err2 != nil {
					return nil, err2
				}
				unchanged++
				continue
			}

			// "Upsert" reference water levels
			for _, wl := range []struct {
				level **erdms.RisreflevelcodeType
				value **erdms.RisreflevelvalueType
			}{
				{&dr.Reflevel1code, &dr.Reflevel1value},
				{&dr.Reflevel2code, &dr.Reflevel2value},
				{&dr.Reflevel3code, &dr.Reflevel3value},
			} {
				if *wl.level == nil || *wl.value == nil {
					continue
				}

				var isNtSDepthRef bool
				if err := tx.StmtContext(
					ctx, isNtSDepthRefStmt).QueryRowContext(ctx,
					string(**wl.level),
				).Scan(
					&isNtSDepthRef,
				); err != nil {
					return nil, err
				}
				if !isNtSDepthRef {
					feedback.Warn(
						"Reference level code '%s' is not in line "+
							"with the NtS reference_code table",
						string(**wl.level))
				}

				if _, err := tx.StmtContext(
					ctx, insertWaterLevelStmt).ExecContext(ctx,
					code.CountryCode,
					code.LoCode,
					code.FairwaySection,
					code.Orc,
					code.Hectometre,
					&validity,
					string(**wl.level),
					int64(**wl.value),
				); err != nil {
					feedback.Error(pgxutils.ReadableError{Err: err}.Error())
					tx.Rollback()
					continue
				}
			}

			if err = tx.Commit(); err != nil {
				return nil, err
			}
		}
	}

	if len(gauges) == 0 {
		return nil, UnchangedError("No gauges returned from ERDMS")
	}

	var pgCountries, pgGauges pgtype.VarcharArray
	pgCountries.Set(countries)
	pgGauges.Set(gauges)
	obsGauges, err := conn.QueryContext(ctx,
		eraseObsoleteGaugesSQL,
		&pgCountries,
		&pgGauges)
	if err != nil {
		return nil, err
	}
	defer obsGauges.Close()
	for obsGauges.Next() {
		var isrs string
		if err := obsGauges.Scan(&isrs); err != nil {
			return nil, err
		}
		feedback.Info("Erased %s", isrs)
		unchanged--
	}
	if err := obsGauges.Err(); err != nil {
		return nil, err
	}

	if unchanged == len(gauges) {
		return nil, UnchangedError("All gauges unchanged")
	}

	feedback.Info("Importing gauges took %s",
		time.Since(start))

	return nil, err
}