view pkg/imports/wg.go @ 3305:5932f9574493

Follow-up for 3350:e640f51b5a4e (Fix reporting of removed reference water levels). Be more precise with error handling: added forgotten .Err() after looping over rows.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Fri, 17 May 2019 10:03:07 +0200
parents ec6163c6687d
children e0dabe7b2fcf
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Tom Gottfried <tom.gottfried@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/models"
	"gemma.intevation.de/gemma/pkg/soap/erdms"
)

type WaterwayGauge struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Username is the username used to authenticate.
	Username string `json:"username"`
	// Passwort is the password to authenticate.
	Password string `json:"password"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

const WGJobKind JobKind = "wg"

type wgJobCreator struct{}

func init() {
	RegisterJobCreator(WGJobKind, wgJobCreator{})
}

func (wgJobCreator) Description() string { return "waterway gauges" }

func (wgJobCreator) AutoAccept() bool { return true }

func (wgJobCreator) Create() Job { return new(WaterwayGauge) }

func (wgJobCreator) Depends() [2][]string {
	return [2][]string{
		{"gauges_reference_water_levels", "gauges"},
		{"depth_references"},
	}
}

// StageDone does nothing as there is no staging for gauges.
func (wgJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil }

// CleanUp does nothing as there is nothing to cleanup with gauges.
func (*WaterwayGauge) CleanUp() error { return nil }

const (
	eraseGaugeSQL = `
UPDATE waterway.gauges SET
  erased = true,
   -- Set enddate of old entry to new startdate in case of overlap:
  validity = validity - $2
WHERE isrs_astext(location) = $1
  AND NOT erased
  -- Don't touch old entry if validity did not change: will be updated
  AND validity <> $2
`

	insertGaugeSQL = `
INSERT INTO waterway.gauges (
  location,
  objname,
  geom,
  applicability_from_km,
  applicability_to_km,
  validity,
  zero_point,
  geodref,
  date_info,
  source_organization,
  lastupdate
) VALUES (
  ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
  $6,
  ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14,
  $15,
  $16
-- Exclusion constraints are not supported as arbiters.
-- Thus we need to DO NOTHING here and use an extra UPDATE statement
) ON CONFLICT DO NOTHING
RETURNING 1
`
	updateGaugeSQL = `
UPDATE waterway.gauges SET
  objname = $6,
  geom = ST_SetSRID(ST_MakePoint($7, $8), 4326),
  applicability_from_km = $9,
  applicability_to_km = $10,
  zero_point = $11,
  geodref = $12,
  date_info = $13,
  source_organization = $14,
  lastupdate = $15
WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
  AND NOT erased
  AND $15 > lastupdate
RETURNING 1
`

	deleteReferenceWaterLevelsSQL = `
DELETE FROM waterway.gauges_reference_water_levels
WHERE isrs_astext(location) = $1
  AND validity = $2
  AND depth_reference <> ALL($3)
RETURNING depth_reference
`

	isNtSDepthRefSQL = `
SELECT EXISTS(SELECT 1 FROM depth_references WHERE depth_reference = $1)`

	insertReferenceWaterLevelsSQL = `
INSERT INTO waterway.gauges_reference_water_levels (
  location,
  validity,
  depth_reference,
  value
) VALUES (
  ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
  $6,
  $7,
  $8
) ON CONFLICT (location, validity, depth_reference) DO UPDATE SET
    value = EXCLUDED.value
`
)

func (wg *WaterwayGauge) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	responseData, err := getRisData(
		ctx,
		conn,
		feedback,
		wg.Username,
		wg.Password,
		wg.URL,
		wg.Insecure,
		"wtwgag")
	if err != nil {
		return nil, err
	}

	var ignored int

	type idxCode struct {
		jdx  int
		idx  int
		code *models.Isrs
	}

	var gauges []idxCode

	for j, data := range responseData {
		for i, dr := range data.RisdataReturn {
			if dr.RisidxCode == nil {
				ignored++
				continue
			}
			code, err := models.IsrsFromString(string(*dr.RisidxCode))
			if err != nil {
				feedback.Warn("invalid ISRS code %v", err)
				ignored++
				continue
			}

			if dr.Objname.Loc == nil {
				feedback.Warn("missing objname: %s", code)
				ignored++
				continue
			}

			if dr.Lat == nil || dr.Lon == nil {
				feedback.Warn("missing lat/lon: %s", code)
				ignored++
				continue
			}

			if dr.Zeropoint == nil {
				feedback.Warn("missing zeropoint: %s", code)
				ignored++
				continue
			}

			gauges = append(gauges, idxCode{jdx: j, idx: i, code: code})
		}
	}
	feedback.Info("Ignored gauges: %d", ignored)
	feedback.Info("Further process %d gauges", len(gauges))

	if len(gauges) == 0 {
		return nil, UnchangedError("Nothing to do")
	}

	// insert/update the gauges
	var eraseGaugeStmt, insertStmt, updateStmt,
		deleteReferenceWaterLevelsStmt,
		isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt
	for _, x := range []struct {
		sql  string
		stmt **sql.Stmt
	}{
		{eraseGaugeSQL, &eraseGaugeStmt},
		{insertGaugeSQL, &insertStmt},
		{updateGaugeSQL, &updateStmt},
		{deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt},
		{isNtSDepthRefSQL, &isNtSDepthRefStmt},
		{insertReferenceWaterLevelsSQL, &insertWaterLevelStmt},
	} {
		var err error
		if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil {
			return nil, err
		}
		defer (*x.stmt).Close()
	}

	var unchanged int

	for i := range gauges {
		ic := &gauges[i]
		dr := responseData[ic.jdx].RisdataReturn[ic.idx]

		feedback.Info("Processing %s", ic.code)

		var from, to sql.NullInt64

		if dr.Applicabilityfromkm != nil {
			from = sql.NullInt64{
				Int64: int64(*dr.Applicabilityfromkm),
				Valid: true,
			}
		}
		if dr.Applicabilitytokm != nil {
			to = sql.NullInt64{
				Int64: int64(*dr.Applicabilitytokm),
				Valid: true,
			}
		}

		var tfrom, tto, dateInfo pgtype.Timestamptz

		if dr.Startdate != nil {
			tfrom = pgtype.Timestamptz{
				Time:   time.Time(*dr.Startdate),
				Status: pgtype.Present,
			}
		} else {
			tfrom = pgtype.Timestamptz{
				Status: pgtype.Null,
			}
		}

		if dr.Enddate != nil {
			tto = pgtype.Timestamptz{
				Time:   time.Time(*dr.Enddate),
				Status: pgtype.Present,
			}
		} else {
			tto = pgtype.Timestamptz{
				Status: pgtype.Null,
			}
		}

		validity := pgtype.Tstzrange{
			Lower:     tfrom,
			Upper:     tto,
			LowerType: pgtype.Inclusive,
			UpperType: pgtype.Exclusive,
			Status:    pgtype.Present,
		}

		if dr.Infodate != nil {
			dateInfo = pgtype.Timestamptz{
				Time:   time.Time(*dr.Infodate),
				Status: pgtype.Present,
			}
		} else {
			dateInfo = pgtype.Timestamptz{
				Status: pgtype.Null,
			}
		}

		var geodref sql.NullString
		if dr.Geodref != nil {
			geodref = sql.NullString{
				String: string(*dr.Geodref),
				Valid:  true,
			}
		}

		var source sql.NullString
		if dr.Source != nil {
			source = sql.NullString{
				String: string(*dr.Source),
				Valid:  true,
			}
		}

		tx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return nil, err
		}
		defer tx.Rollback()

		// Mark old entries of gauge as erased, if applicable
		if _, err := tx.StmtContext(ctx, eraseGaugeStmt).ExecContext(ctx,
			ic.code.String(),
			validity,
		); err != nil {
			feedback.Warn(handleError(err).Error())
			if err2 := tx.Rollback(); err2 != nil {
				return nil, err2
			}
			unchanged++
			continue
		}

		// Try to insert gauge entry
		var dummy int
		err = tx.StmtContext(ctx, insertStmt).QueryRowContext(ctx,
			ic.code.CountryCode,
			ic.code.LoCode,
			ic.code.FairwaySection,
			ic.code.Orc,
			ic.code.Hectometre,
			string(*dr.Objname.Loc),
			float64(*dr.Lon), float64(*dr.Lat),
			from,
			to,
			&validity,
			float64(*dr.Zeropoint),
			geodref,
			&dateInfo,
			source,
			time.Time(*dr.Lastupdate),
		).Scan(&dummy)
		switch {
		case err == sql.ErrNoRows:
			// Assume constraint conflict, try to update
			err2 := tx.StmtContext(ctx, updateStmt).QueryRowContext(ctx,
				ic.code.CountryCode,
				ic.code.LoCode,
				ic.code.FairwaySection,
				ic.code.Orc,
				ic.code.Hectometre,
				string(*dr.Objname.Loc),
				float64(*dr.Lon), float64(*dr.Lat),
				from,
				to,
				float64(*dr.Zeropoint),
				geodref,
				&dateInfo,
				source,
				time.Time(*dr.Lastupdate),
			).Scan(&dummy)
			switch {
			case err2 == sql.ErrNoRows:
				feedback.Info("unchanged")
				if err3 := tx.Rollback(); err3 != nil {
					return nil, err3
				}
				unchanged++
				continue
			case err2 != nil:
				feedback.Warn(handleError(err2).Error())
				if err3 := tx.Rollback(); err3 != nil {
					return nil, err3
				}
				unchanged++
				continue
			default:
				feedback.Info("update")
			}

			// Remove obsolete reference water levels
			var currLevels pgtype.VarcharArray
			currLevels.Set([]string{
				string(*dr.Reflevel1code),
				string(*dr.Reflevel2code),
				string(*dr.Reflevel3code),
			})
			rwls, err := tx.StmtContext(ctx,
				deleteReferenceWaterLevelsStmt).QueryContext(ctx,
				ic.code.String(),
				&validity,
				&currLevels,
			)
			if err != nil {
				return nil, err
			}
			defer rwls.Close()
			for rwls.Next() {
				var delRef string
				if err := rwls.Scan(&delRef); err != nil {
					return nil, err
				}
				feedback.Warn("Removed reference water level %s from %s",
					delRef, ic.code)
			}
			if err := rwls.Err(); err != nil {
				return nil, err
			}
		case err != nil:
			feedback.Warn(handleError(err).Error())
			if err2 := tx.Rollback(); err2 != nil {
				return nil, err2
			}
			unchanged++
			continue
		default:
			feedback.Info("insert new version")
		}

		// "Upsert" reference water levels
		for _, wl := range []struct {
			level **erdms.RisreflevelcodeType
			value **erdms.RisreflevelvalueType
		}{
			{&dr.Reflevel1code, &dr.Reflevel1value},
			{&dr.Reflevel2code, &dr.Reflevel2value},
			{&dr.Reflevel3code, &dr.Reflevel3value},
		} {
			if *wl.level == nil || *wl.value == nil {
				continue
			}

			var isNtSDepthRef bool
			if err := tx.StmtContext(ctx, isNtSDepthRefStmt).QueryRowContext(
				ctx,
				string(**wl.level),
			).Scan(
				&isNtSDepthRef,
			); err != nil {
				return nil, err
			}
			if !isNtSDepthRef {
				feedback.Warn(
					"Reference level code '%s' is not in line "+
						"with the NtS reference_code table",
					string(**wl.level))
			}

			if _, err := tx.StmtContext(ctx, insertWaterLevelStmt).ExecContext(
				ctx,
				ic.code.CountryCode,
				ic.code.LoCode,
				ic.code.FairwaySection,
				ic.code.Orc,
				ic.code.Hectometre,
				&validity,
				string(**wl.level),
				int64(**wl.value),
			); err != nil {
				feedback.Warn(handleError(err).Error())
				tx.Rollback()
				continue
			}
		}

		if err = tx.Commit(); err != nil {
			return nil, err
		}
	}

	feedback.Info("Importing gauges took %s",
		time.Since(start))

	if unchanged == len(gauges) {
		return nil, UnchangedError("All gauges unchanged")
	}

	return nil, err
}