view pkg/imports/wg.go @ 3678:8f58851927c0

client: make layer factory only return new layer config for individual maps instead of each time it is invoked. The purpose of the factory was to support multiple maps with individual layers. But returning a new config each time it is invoked leads to bugs that rely on the layer's state. Now this factory reuses the same objects it created before, per map.
author Markus Kottlaender <markus@intevation.de>
date Mon, 17 Jun 2019 17:31:35 +0200
parents 29ef6d41e4af
children 5fed2f5bc104 6c5c15b2fb64
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Tom Gottfried <tom.gottfried@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/models"
	"gemma.intevation.de/gemma/pkg/soap/erdms"
)

type WaterwayGauge struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Username is the username used to authenticate.
	Username string `json:"username"`
	// Passwort is the password to authenticate.
	Password string `json:"password"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

const WGJobKind JobKind = "wg"

type wgJobCreator struct{}

func init() {
	RegisterJobCreator(WGJobKind, wgJobCreator{})
}

func (wgJobCreator) Description() string { return "waterway gauges" }

func (wgJobCreator) AutoAccept() bool { return true }

func (wgJobCreator) Create() Job { return new(WaterwayGauge) }

func (wgJobCreator) Depends() [2][]string {
	return [2][]string{
		{"gauges_reference_water_levels", "gauges"},
		{"depth_references"},
	}
}

// StageDone does nothing as there is no staging for gauges.
func (wgJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil }

// CleanUp does nothing as there is nothing to cleanup with gauges.
func (*WaterwayGauge) CleanUp() error { return nil }

const (
	eraseObsoleteGaugesSQL = `
UPDATE waterway.gauges SET erased = true
WHERE NOT erased
  AND (location).country_code = ANY($1)
  AND isrs_astext(location) <> ALL($2)
RETURNING isrs_astext(location)
`

	eraseGaugeSQL = `
WITH upd AS (
  UPDATE waterway.gauges SET
    erased = true
  WHERE isrs_astext(location) = $1
    AND NOT erased
    -- Don't touch old entry if new validity contains old: will be updated
    AND NOT validity <@ $2
  RETURNING 1
)
-- Decide whether a new version will be INSERTed
SELECT EXISTS(SELECT 1 FROM upd)
  OR NOT EXISTS(SELECT 1 FROM waterway.gauges WHERE isrs_astext(location) = $1)
`

	insertGaugeSQL = `
INSERT INTO waterway.gauges (
  location,
  objname,
  geom,
  applicability_from_km,
  applicability_to_km,
  validity,
  zero_point,
  geodref,
  date_info,
  source_organization,
  lastupdate
) VALUES (
  ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
  $6,
  ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14,
  $15,
  $16
)
`

	fixValiditySQL = `
UPDATE waterway.gauges SET
   -- Set enddate of old entry to new startdate in case of overlap:
  validity = validity - $2
WHERE isrs_astext(location) = $1
  AND validity && $2
  AND erased
`

	updateGaugeSQL = `
UPDATE waterway.gauges SET
  objname = $6,
  geom = ST_SetSRID(ST_MakePoint($7, $8), 4326),
  applicability_from_km = $9,
  applicability_to_km = $10,
  zero_point = $11,
  geodref = $12,
  date_info = $13,
  source_organization = $14,
  lastupdate = $15,
  validity = $16
WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
  AND NOT erased
  AND $15 > lastupdate
RETURNING 1
`

	deleteReferenceWaterLevelsSQL = `
DELETE FROM waterway.gauges_reference_water_levels
WHERE isrs_astext(location) = $1
  AND validity = $2
  AND depth_reference <> ALL($3)
RETURNING depth_reference
`

	isNtSDepthRefSQL = `
SELECT EXISTS(SELECT 1 FROM depth_references WHERE depth_reference = $1)`

	insertReferenceWaterLevelsSQL = `
INSERT INTO waterway.gauges_reference_water_levels (
  location,
  validity,
  depth_reference,
  value
) VALUES (
  ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
  $6,
  $7,
  $8
) ON CONFLICT (location, validity, depth_reference) DO UPDATE SET
    value = EXCLUDED.value
`
)

func (wg *WaterwayGauge) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	responseData, countries, err := getRisData(
		ctx,
		conn,
		feedback,
		wg.Username,
		wg.Password,
		wg.URL,
		wg.Insecure,
		"wtwgag")
	if err != nil {
		return nil, err
	}

	var eraseGaugeStmt, insertStmt,
		fixValidityStmt, updateStmt,
		deleteReferenceWaterLevelsStmt,
		isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt
	for _, x := range []struct {
		sql  string
		stmt **sql.Stmt
	}{
		{eraseGaugeSQL, &eraseGaugeStmt},
		{insertGaugeSQL, &insertStmt},
		{fixValiditySQL, &fixValidityStmt},
		{updateGaugeSQL, &updateStmt},
		{deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt},
		{isNtSDepthRefSQL, &isNtSDepthRefStmt},
		{insertReferenceWaterLevelsSQL, &insertWaterLevelStmt},
	} {
		var err error
		if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil {
			return nil, err
		}
		defer (*x.stmt).Close()
	}

	var gauges []string
	var unchanged int

	for _, data := range responseData {
		for _, dr := range data.RisdataReturn {

			isrs := string(*dr.RisidxCode)
			code, err := models.IsrsFromString(isrs)
			if err != nil {
				feedback.Warn("Invalid ISRS code '%s': %v", isrs, err)
				continue
			}
			gauges = append(gauges, isrs)
			feedback.Info("Processing %s", code)

			// We need a valid, non-empty time range to identify gauge versions
			if dr.Enddate != nil && dr.Startdate != nil &&
				!time.Time(*dr.Enddate).After(time.Time(*dr.Startdate)) {
				feedback.Warn("End date not after start date")
				unchanged++
				continue
			}

			var from, to sql.NullInt64

			if dr.Applicabilityfromkm != nil {
				from = sql.NullInt64{
					Int64: int64(*dr.Applicabilityfromkm),
					Valid: true,
				}
			}
			if dr.Applicabilitytokm != nil {
				to = sql.NullInt64{
					Int64: int64(*dr.Applicabilitytokm),
					Valid: true,
				}
			}

			var tfrom, tto, dateInfo pgtype.Timestamptz

			if dr.Startdate != nil {
				tfrom = pgtype.Timestamptz{
					Time:   time.Time(*dr.Startdate),
					Status: pgtype.Present,
				}
			} else {
				tfrom = pgtype.Timestamptz{
					Status: pgtype.Null,
				}
			}

			if dr.Enddate != nil {
				tto = pgtype.Timestamptz{
					Time:   time.Time(*dr.Enddate),
					Status: pgtype.Present,
				}
			} else {
				tto = pgtype.Timestamptz{
					Status: pgtype.Null,
				}
			}

			validity := pgtype.Tstzrange{
				Lower:     tfrom,
				Upper:     tto,
				LowerType: pgtype.Inclusive,
				UpperType: pgtype.Exclusive,
				Status:    pgtype.Present,
			}

			if dr.Infodate != nil {
				dateInfo = pgtype.Timestamptz{
					Time:   time.Time(*dr.Infodate),
					Status: pgtype.Present,
				}
			} else {
				dateInfo = pgtype.Timestamptz{
					Status: pgtype.Null,
				}
			}

			var geodref sql.NullString
			if dr.Geodref != nil {
				geodref = sql.NullString{
					String: string(*dr.Geodref),
					Valid:  true,
				}
			}

			var source sql.NullString
			if dr.Source != nil {
				source = sql.NullString{
					String: string(*dr.Source),
					Valid:  true,
				}
			}

			tx, err := conn.BeginTx(ctx, nil)
			if err != nil {
				return nil, err
			}
			defer tx.Rollback()

			// Mark old entry of gauge as erased, if applicable
			var isNew bool
			err = tx.StmtContext(ctx, eraseGaugeStmt).QueryRowContext(ctx,
				code.String(),
				validity,
			).Scan(&isNew)
			switch {
			case err != nil:
				feedback.Warn(handleError(err).Error())
				if err2 := tx.Rollback(); err2 != nil {
					return nil, err2
				}
				unchanged++
				continue
			case isNew:
				// insert gauge version entry
				if _, err = tx.StmtContext(ctx, insertStmt).ExecContext(ctx,
					code.CountryCode,
					code.LoCode,
					code.FairwaySection,
					code.Orc,
					code.Hectometre,
					dr.Objname.Loc,
					dr.Lon, dr.Lat,
					from,
					to,
					&validity,
					dr.Zeropoint,
					geodref,
					&dateInfo,
					source,
					time.Time(*dr.Lastupdate),
				); err != nil {
					feedback.Warn(handleError(err).Error())
					if err2 := tx.Rollback(); err2 != nil {
						return nil, err2
					}
					unchanged++
					continue
				}
				feedback.Info("insert new version")
			case !isNew:
				// try to update
				var dummy int
				err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx,
					code.CountryCode,
					code.LoCode,
					code.FairwaySection,
					code.Orc,
					code.Hectometre,
					dr.Objname.Loc,
					dr.Lon, dr.Lat,
					from,
					to,
					dr.Zeropoint,
					geodref,
					&dateInfo,
					source,
					time.Time(*dr.Lastupdate),
					&validity,
				).Scan(&dummy)
				switch {
				case err2 == sql.ErrNoRows:
					feedback.Info("unchanged")
					if err3 := tx.Rollback(); err3 != nil {
						return nil, err3
					}
					unchanged++
					continue
				case err2 != nil:
					feedback.Warn(handleError(err2).Error())
					if err3 := tx.Rollback(); err3 != nil {
						return nil, err3
					}
					unchanged++
					continue
				default:
					feedback.Info("update")
				}

				// Remove obsolete reference water levels
				var currLevels pgtype.VarcharArray
				currLevels.Set([]string{
					string(*dr.Reflevel1code),
					string(*dr.Reflevel2code),
					string(*dr.Reflevel3code),
				})
				rwls, err := tx.StmtContext(ctx,
					deleteReferenceWaterLevelsStmt).QueryContext(ctx,
					code.String(),
					&validity,
					&currLevels,
				)
				if err != nil {
					return nil, err
				}
				defer rwls.Close()
				for rwls.Next() {
					var delRef string
					if err := rwls.Scan(&delRef); err != nil {
						return nil, err
					}
					feedback.Warn("Removed reference water level %s from %s",
						delRef, code)
				}
				if err := rwls.Err(); err != nil {
					return nil, err
				}
			}

			// Set end of validity of old version to start of new version
			// in case of overlap
			if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext(
				ctx,
				code.String(),
				&validity,
			); err != nil {
				feedback.Warn(handleError(err).Error())
				if err2 := tx.Rollback(); err2 != nil {
					return nil, err2
				}
				unchanged++
				continue
			}

			// "Upsert" reference water levels
			for _, wl := range []struct {
				level **erdms.RisreflevelcodeType
				value **erdms.RisreflevelvalueType
			}{
				{&dr.Reflevel1code, &dr.Reflevel1value},
				{&dr.Reflevel2code, &dr.Reflevel2value},
				{&dr.Reflevel3code, &dr.Reflevel3value},
			} {
				if *wl.level == nil || *wl.value == nil {
					continue
				}

				var isNtSDepthRef bool
				if err := tx.StmtContext(
					ctx, isNtSDepthRefStmt).QueryRowContext(ctx,
					string(**wl.level),
				).Scan(
					&isNtSDepthRef,
				); err != nil {
					return nil, err
				}
				if !isNtSDepthRef {
					feedback.Warn(
						"Reference level code '%s' is not in line "+
							"with the NtS reference_code table",
						string(**wl.level))
				}

				if _, err := tx.StmtContext(
					ctx, insertWaterLevelStmt).ExecContext(ctx,
					code.CountryCode,
					code.LoCode,
					code.FairwaySection,
					code.Orc,
					code.Hectometre,
					&validity,
					string(**wl.level),
					int64(**wl.value),
				); err != nil {
					feedback.Warn(handleError(err).Error())
					tx.Rollback()
					continue
				}
			}

			if err = tx.Commit(); err != nil {
				return nil, err
			}
		}
	}

	if len(gauges) == 0 {
		return nil, UnchangedError("No gauges returned from ERDMS")
	}

	var pgCountries, pgGauges pgtype.VarcharArray
	pgCountries.Set(countries)
	pgGauges.Set(gauges)
	obsGauges, err := conn.QueryContext(ctx,
		eraseObsoleteGaugesSQL,
		&pgCountries,
		&pgGauges)
	if err != nil {
		return nil, err
	}
	defer obsGauges.Close()
	for obsGauges.Next() {
		var isrs string
		if err := obsGauges.Scan(&isrs); err != nil {
			return nil, err
		}
		feedback.Info("Erased %s", isrs)
		unchanged--
	}
	if err := obsGauges.Err(); err != nil {
		return nil, err
	}

	if unchanged == len(gauges) {
		return nil, UnchangedError("All gauges unchanged")
	}

	feedback.Info("Importing gauges took %s",
		time.Since(start))

	return nil, err
}