view pkg/imports/wg.go @ 5670:b75d0b303328

Various fixes and improvements of gauges import: - Allow update of erased data (and thereby set erased to false) - Fix source_organization to work with ERDMS2 - Give ISRS of new and updated gauges in summary - Fixed reference of null pointers if revlevels are missing - Fixed reference of null pointer on update errors - Added ISRS to reference_code warning
author Sascha Wilde <wilde@sha-bang.de>
date Fri, 08 Dec 2023 17:29:56 +0100
parents cf1e8ffe1ed5
children 31973f6f5cca
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Tom Gottfried <tom.gottfried@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"errors"
	"strings"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/models"
	"gemma.intevation.de/gemma/pkg/pgxutils"
	erdms "gemma.intevation.de/gemma/pkg/soap/erdms2"
)

// WaterwayGauge is a Job to load gauge data from
// a specified NTS service and stores them into the database.
type WaterwayGauge struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Username is the username used to authenticate.
	Username string `json:"username"`
	// Passwort is the password to authenticate.
	Password string `json:"password"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

// Description gives a short info about relevant facts of this import.
func (wg *WaterwayGauge) Description([]string) (string, error) {
	return wg.URL, nil
}

// WGJobKind is the unique name of this import job type.
const WGJobKind JobKind = "wg"

type wgJobCreator struct{}

func init() { RegisterJobCreator(WGJobKind, wgJobCreator{}) }

func (wgJobCreator) Description() string { return "waterway gauges" }

func (wgJobCreator) AutoAccept() bool { return true }

func (wgJobCreator) Create() Job { return new(WaterwayGauge) }

func (wgJobCreator) Depends() [2][]string {
	return [2][]string{
		{"gauges_reference_water_levels", "gauges"},
		{"depth_references"},
	}
}

// StageDone does nothing as there is no staging for gauges.
func (wgJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error { return nil }

// CleanUp does nothing as there is nothing to cleanup with gauges.
func (*WaterwayGauge) CleanUp() error { return nil }

const (
	eraseObsoleteGaugesSQL = `
UPDATE waterway.gauges SET erased = true, validity = validity - '[now,)'
WHERE NOT erased
  AND (location).country_code = ANY($1)
  AND isrs_astext(location) <> ALL($2)
RETURNING isrs_astext(location)
`

	eraseGaugeSQL = `
WITH upd AS (
  UPDATE waterway.gauges SET
    erased = true
  WHERE isrs_astext(location) = $1
    AND NOT erased
    -- Don't touch old entry if new validity contains old: will be updated
    AND NOT validity <@ $2
  RETURNING 1
)
-- Decide whether a new version will be INSERTed
SELECT EXISTS(SELECT 1 FROM upd)
  OR NOT EXISTS(SELECT 1 FROM waterway.gauges WHERE isrs_astext(location) = $1)
`

	insertGaugeSQL = `
INSERT INTO waterway.gauges (
  location,
  objname,
  geom,
  applicability_from_km,
  applicability_to_km,
  validity,
  zero_point,
  geodref,
  date_info,
  source_organization,
  lastupdate
) VALUES (
  ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
  $6,
  ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14,
  $15,
  $16
)
`

	fixValiditySQL = `
UPDATE waterway.gauges SET
   -- Set enddate of old entry to new startdate in case of overlap:
  validity = validity - $2
WHERE isrs_astext(location) = $1
  AND validity && $2
  AND erased
`

	updateGaugeSQL = `
UPDATE waterway.gauges SET
  objname = $2,
  geom = ST_SetSRID(ST_MakePoint($3, $4), 4326),
  applicability_from_km = $5,
  applicability_to_km = $6,
  zero_point = $7,
  geodref = $8,
  date_info = $9,
  source_organization = $10,
  lastupdate = $11,
  validity = $12,
  erased = false
WHERE isrs_astext(location) = $1
  AND validity <@ $12
  AND $11 > lastupdate
RETURNING 1
`

	deleteReferenceWaterLevelsSQL = `
DELETE FROM waterway.gauges_reference_water_levels
WHERE isrs_astext(location) = $1
  AND validity = $2
  AND depth_reference <> ALL($3)
RETURNING depth_reference
`

	isNtSDepthRefSQL = `
SELECT EXISTS(SELECT 1 FROM depth_references WHERE depth_reference = $1)`

	insertReferenceWaterLevelsSQL = `
INSERT INTO waterway.gauges_reference_water_levels (
  location,
  validity,
  depth_reference,
  value
) VALUES (
  ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
  $6,
  $7,
  $8
) ON CONFLICT (location, validity, depth_reference) DO UPDATE SET
    value = EXCLUDED.value
`
)

var continueErr = errors.New("continue")

// Do implements the actual import.
func (wg *WaterwayGauge) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	responseData, countries, err := getRisData(
		ctx,
		conn,
		feedback,
		wg.Username,
		wg.Password,
		wg.URL,
		wg.Insecure,
		"wtwgag")
	if err != nil {
		return nil, err
	}

	var eraseGaugeStmt, insertStmt,
		fixValidityStmt, updateStmt,
		deleteReferenceWaterLevelsStmt,
		isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt
	for _, x := range []struct {
		sql  string
		stmt **sql.Stmt
	}{
		{eraseGaugeSQL, &eraseGaugeStmt},
		{insertGaugeSQL, &insertStmt},
		{fixValiditySQL, &fixValidityStmt},
		{updateGaugeSQL, &updateStmt},
		{deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt},
		{isNtSDepthRefSQL, &isNtSDepthRefStmt},
		{insertReferenceWaterLevelsSQL, &insertWaterLevelStmt},
	} {
		var err error
		if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil {
			return nil, err
		}
		defer (*x.stmt).Close()
	}

	var gauges []string
	var unchanged int

	var invalidISRS, startEndOrder, missingObjname, missingZeropoint, updatedGauges, newGauges []string

	databaseErrors := map[string][]string{}

	for _, data := range responseData {
		for _, dr := range data.RisdataReturn {

			isrs := string(*dr.RisidxCode)
			code, err := models.IsrsFromString(isrs)
			if err != nil {
				invalidISRS = append(invalidISRS, isrs)
				continue
			}
			gauges = append(gauges, isrs)

			// We need a valid, non-empty time range to identify gauge versions
			if dr.Enddate != nil && dr.Startdate != nil {
				ed := dr.Enddate.ToGoTime()
				sd := dr.Startdate.ToGoTime()
				// log.Debugf("start date: %v end date: %v\n", sd, ed)
				if !ed.After(sd) {
					startEndOrder = append(startEndOrder, isrs)
					unchanged++
					continue
				}
			}

			if dr.Zeropoint == nil {
				missingZeropoint = append(missingZeropoint, isrs)
				unchanged++
				continue
			}

			if dr.Objname.Loc == nil {
				missingObjname = append(missingObjname, isrs)
				unchanged++
				continue
			}

			var from, to sql.NullInt64

			if dr.Applicabilityfromkm != nil {
				from = sql.NullInt64{
					Int64: int64(*dr.Applicabilityfromkm),
					Valid: true,
				}
			}
			if dr.Applicabilitytokm != nil {
				to = sql.NullInt64{
					Int64: int64(*dr.Applicabilitytokm),
					Valid: true,
				}
			}

			var tfrom, tto, dateInfo pgtype.Timestamptz

			if dr.Startdate != nil {
				tfrom = pgtype.Timestamptz{
					Time:   dr.Startdate.ToGoTime(),
					Status: pgtype.Present,
				}
			} else {
				tfrom = pgtype.Timestamptz{
					Status: pgtype.Null,
				}
			}

			if dr.Enddate != nil {
				tto = pgtype.Timestamptz{
					Time:   dr.Enddate.ToGoTime(),
					Status: pgtype.Present,
				}
			} else {
				tto = pgtype.Timestamptz{
					Status: pgtype.Null,
				}
			}

			validity := pgtype.Tstzrange{
				Lower:     tfrom,
				Upper:     tto,
				LowerType: pgtype.Inclusive,
				UpperType: pgtype.Exclusive,
				Status:    pgtype.Present,
			}

			if dr.Infodate != nil {
				dateInfo = pgtype.Timestamptz{
					Time:   dr.Infodate.ToGoTime(),
					Status: pgtype.Present,
				}
			} else {
				dateInfo = pgtype.Timestamptz{
					Status: pgtype.Null,
				}
			}

			var geodref sql.NullString
			if dr.Geodref != nil {
				geodref = sql.NullString{
					String: string(*dr.Geodref),
					Valid:  true,
				}
			}

			var source sql.NullString
			if dr.RisidxSource != nil {
				source = sql.NullString{
					String: string(*dr.RisidxSource),
					Valid:  true,
				}
			}

			err = func() error {
				tx, err := conn.BeginTx(ctx, nil)
				if err != nil {
					return err
				}
				defer tx.Rollback()

				// Mark old entry of gauge as erased, if applicable
				var isNew bool
				err = tx.StmtContext(ctx, eraseGaugeStmt).QueryRowContext(ctx,
					code.String(),
					validity,
				).Scan(&isNew)
				switch {
				case err != nil:
					feedback.Error(pgxutils.ReadableError{Err: err}.Error())
					unchanged++
					return continueErr
				case isNew:
					var lu *time.Time
					if dr.Lastupdate != nil {
						t := dr.Lastupdate.ToGoTime()
						lu = &t
					}
					// insert gauge version entry
					if _, err = tx.StmtContext(ctx, insertStmt).ExecContext(ctx,
						code.CountryCode,
						code.LoCode,
						code.FairwaySection,
						code.Orc,
						code.Hectometre,
						dr.Objname.Loc,
						dr.Lon, dr.Lat,
						from,
						to,
						&validity,
						dr.Zeropoint,
						geodref,
						&dateInfo,
						source,
						lu,
					); err != nil {
						key := pgxutils.ReadableError{Err: err}.Error()
						databaseErrors[key] = append(databaseErrors[key], isrs)
						return continueErr
					}
					newGauges = append(newGauges, isrs)
					//feedback.Info("insert new version")
				case !isNew:
					var lu *time.Time
					if dr.Lastupdate != nil {
						t := dr.Lastupdate.ToGoTime()
						lu = &t
					}
					// try to update
					var dummy int
					err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx,
						code.String(),
						dr.Objname.Loc,
						dr.Lon, dr.Lat,
						from,
						to,
						dr.Zeropoint,
						geodref,
						&dateInfo,
						source,
						lu,
						&validity,
					).Scan(&dummy)
					switch {
					case err2 == sql.ErrNoRows:
						//feedback.Info("unchanged")
						unchanged++
						return continueErr
					case err2 != nil:
						key := pgxutils.ReadableError{Err: err2}.Error()
						databaseErrors[key] = append(databaseErrors[key], isrs)
						unchanged++
						return continueErr
					default:
						updatedGauges = append(updatedGauges, isrs)
						//feedback.Info("update")
					}

					// Remove obsolete reference water levels
					var currLevels pgtype.VarcharArray
					var reflevels []string
					if dr.Reflevel1code != nil {
						reflevels = append(reflevels,
							string(*dr.Reflevel1code))
					}
					if dr.Reflevel2code != nil {
						reflevels = append(reflevels,
							string(*dr.Reflevel2code))
					}
					if dr.Reflevel3code != nil {
						reflevels = append(reflevels,
							string(*dr.Reflevel3code))
					}
					currLevels.Set(reflevels)
					rwls, err := tx.StmtContext(ctx,
						deleteReferenceWaterLevelsStmt).QueryContext(ctx,
						code.String(),
						&validity,
						&currLevels,
					)
					if err != nil {
						return err
					}
					defer rwls.Close()
					for rwls.Next() {
						var delRef string
						if err := rwls.Scan(&delRef); err != nil {
							return err
						}
						feedback.Warn("Removed reference water level %s from %s",
							delRef, code)
					}
					if err := rwls.Err(); err != nil {
						return err
					}
				}

				// Set end of validity of old version to start of new version
				// in case of overlap
				if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext(
					ctx,
					code.String(),
					&validity,
				); err != nil {
					key := pgxutils.ReadableError{Err: err}.Error()
					databaseErrors[key] = append(databaseErrors[key], isrs)
					unchanged++
					return continueErr
				}

				// "Upsert" reference water levels
				for _, wl := range []struct {
					level **erdms.RisreflevelcodeType
					value **erdms.RisreflevelvalueType
				}{
					{&dr.Reflevel1code, &dr.Reflevel1value},
					{&dr.Reflevel2code, &dr.Reflevel2value},
					{&dr.Reflevel3code, &dr.Reflevel3value},
				} {
					if *wl.level == nil || *wl.value == nil {
						continue
					}

					var isNtSDepthRef bool
					if err := tx.StmtContext(
						ctx, isNtSDepthRefStmt).QueryRowContext(ctx,
						string(**wl.level),
					).Scan(
						&isNtSDepthRef,
					); err != nil {
						return err
					}
					if !isNtSDepthRef {
						feedback.Warn(
							"Reference level code '%s' of %s is not in line "+
								"with the NtS reference_code table",
							string(**wl.level), isrs)
					}

					if _, err := tx.StmtContext(
						ctx, insertWaterLevelStmt).ExecContext(ctx,
						code.CountryCode,
						code.LoCode,
						code.FairwaySection,
						code.Orc,
						code.Hectometre,
						&validity,
						string(**wl.level),
						int64(**wl.value),
					); err != nil {
						key := pgxutils.ReadableError{Err: err}.Error()
						databaseErrors[key] = append(databaseErrors[key], isrs)
						continue
					}
				}

				return tx.Commit()
			}()

			if err != nil && err != continueErr {
				return err, nil
			}
		}
	}

	if len(invalidISRS) > 0 {
		feedback.Error("Invalid ISRS code: '%s'", strings.Join(invalidISRS, "', '"))
	}

	if len(startEndOrder) > 0 {
		feedback.Error("start date not before end date: %s",
			strings.Join(startEndOrder, ", "))
	}

	if len(databaseErrors) > 0 {
		for err, iris := range databaseErrors {
			feedback.Error("%s: %s", err, strings.Join(iris, ", "))
		}
	}

	if len(missingZeropoint) > 0 {
		feedback.Error("Missing zeropoint: %s", strings.Join(missingZeropoint, ", "))
	}

	if len(missingObjname) > 0 {
		feedback.Error("Missing objname: %s", strings.Join(missingObjname, ", "))
	}

	if len(updatedGauges) > 0 {
		feedback.Info("Updated gauges: %s", strings.Join(updatedGauges, ", "))
	}

	if len(newGauges) > 0 {
		feedback.Info("New gauges: %s", strings.Join(newGauges, ", "))
	}

	if len(gauges) == 0 {
		return nil, UnchangedError("No gauges returned from ERDMS")
	}

	var pgCountries, pgGauges pgtype.VarcharArray
	pgCountries.Set(countries)
	pgGauges.Set(gauges)
	obsGauges, err := conn.QueryContext(ctx,
		eraseObsoleteGaugesSQL,
		&pgCountries,
		&pgGauges)
	if err != nil {
		return nil, err
	}
	defer obsGauges.Close()
	for obsGauges.Next() {
		var isrs string
		if err := obsGauges.Scan(&isrs); err != nil {
			return nil, err
		}
		feedback.Info("Erased %s", isrs)
		unchanged--
	}
	if err := obsGauges.Err(); err != nil {
		return nil, err
	}

	if unchanged == len(gauges) {
		return nil, UnchangedError("All gauges unchanged")
	} else {
		feedback.Info("Unchanged gauges: %d",
			unchanged)
	}

	feedback.Info("Importing gauges took %s",
		time.Since(start))

	return nil, err
}