view pkg/imports/wg.go @ 3163:d9903cb34842

Handle failing INSERTs gracefully during gauges import Using the special table EXCLUDED in INSERT statements makes functionally no difference, but makes editing of the statements easier. Since reference water levels are not deleted all at once before (re-)importing anymore, take the chance to report those that were deleted.
author Tom Gottfried <tom@intevation.de>
date Mon, 06 May 2019 13:25:49 +0200
parents eb1d119f253f
children 4acbee65275d
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Tom Gottfried <tom.gottfried@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/models"
	"gemma.intevation.de/gemma/pkg/soap/erdms"
)

type WaterwayGauge struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Username is the username used to authenticate.
	Username string `json:"username"`
	// Passwort is the password to authenticate.
	Password string `json:"password"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

const WGJobKind JobKind = "wg"

type wgJobCreator struct{}

func init() {
	RegisterJobCreator(WGJobKind, wgJobCreator{})
}

func (wgJobCreator) Description() string { return "waterway gauges" }

func (wgJobCreator) AutoAccept() bool { return true }

func (wgJobCreator) Create() Job { return new(WaterwayGauge) }

func (wgJobCreator) Depends() []string {
	return []string{
		"gauges",
		"gauges_reference_water_levels",
	}
}

// StageDone does nothing as there is no staging for gauges.
func (wgJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil }

// CleanUp does nothing as there is nothing to cleanup with gauges.
func (*WaterwayGauge) CleanUp() error { return nil }

const (
	deleteReferenceWaterLevelsSQL = `
DELETE FROM waterway.gauges_reference_water_levels
WHERE gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
  AND depth_reference <> ALL($6)
RETURNING depth_reference
`

	insertGaugeSQL = `
INSERT INTO waterway.gauges (
  location,
  objname,
  geom,
  applicability_from_km,
  applicability_to_km,
  validity,
  zero_point,
  geodref,
  date_info,
  source_organization
) VALUES (
  ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
  $6,
  ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14,
  $15
) ON CONFLICT (location) DO UPDATE SET
    objname = EXCLUDED.objname,
    geom = EXCLUDED.geom,
    applicability_from_km = EXCLUDED.applicability_from_km,
    applicability_to_km = EXCLUDED.applicability_to_km,
    validity = EXCLUDED.validity,
    zero_point = EXCLUDED.zero_point,
    geodref = EXCLUDED.geodref,
    date_info = EXCLUDED.date_info,
    source_organization = EXCLUDED.source_organization
`

	isNtSDepthRefSQL = `
SELECT EXISTS(SELECT 1 FROM depth_references WHERE depth_reference = $1)`

	insertReferenceWaterLevelsSQL = `
INSERT INTO waterway.gauges_reference_water_levels (
  gauge_id,
  depth_reference,
  value
) VALUES (
  ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
  $6,
  $7
) ON CONFLICT (gauge_id, depth_reference) DO UPDATE SET
    value = EXCLUDED.value
`
)

func (wg *WaterwayGauge) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	responseData, err := getRisData(
		ctx,
		conn,
		feedback,
		wg.Username,
		wg.Password,
		wg.URL,
		wg.Insecure,
		"wtwgag")
	if err != nil {
		return nil, err
	}

	var ignored int

	type idxCode struct {
		jdx  int
		idx  int
		code *models.Isrs
	}

	var gauges []idxCode

	for j, data := range responseData {
		for i, dr := range data.RisdataReturn {
			if dr.RisidxCode == nil {
				ignored++
				continue
			}
			code, err := models.IsrsFromString(string(*dr.RisidxCode))
			if err != nil {
				feedback.Warn("invalid ISRS code %v", err)
				ignored++
				continue
			}

			if dr.Objname.Loc == nil {
				feedback.Warn("missing objname: %s", code)
				ignored++
				continue
			}

			if dr.Lat == nil || dr.Lon == nil {
				feedback.Warn("missing lat/lon: %s", code)
				ignored++
				continue
			}

			if dr.Zeropoint == nil {
				feedback.Warn("missing zeropoint: %s", code)
				ignored++
				continue
			}

			gauges = append(gauges, idxCode{jdx: j, idx: i, code: code})
		}
	}
	feedback.Info("ignored gauges: %d", ignored)
	feedback.Info("insert/update gauges: %d", len(gauges))

	if len(gauges) == 0 {
		return nil, UnchangedError("nothing to do")
	}

	// insert/update the gauges
	var insertStmt, deleteReferenceWaterLevelsStmt,
		isNtSDepthRefStmt, insertWaterLevelStmt *sql.Stmt
	for _, x := range []struct {
		sql  string
		stmt **sql.Stmt
	}{
		{insertGaugeSQL, &insertStmt},
		{deleteReferenceWaterLevelsSQL, &deleteReferenceWaterLevelsStmt},
		{isNtSDepthRefSQL, &isNtSDepthRefStmt},
		{insertReferenceWaterLevelsSQL, &insertWaterLevelStmt},
	} {
		var err error
		if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil {
			return nil, err
		}
		defer (*x.stmt).Close()
	}

	for i := range gauges {
		ic := &gauges[i]
		dr := responseData[ic.jdx].RisdataReturn[ic.idx]

		feedback.Info("insert/update %s", ic.code)

		var from, to sql.NullInt64

		if dr.Applicabilityfromkm != nil {
			from = sql.NullInt64{
				Int64: int64(*dr.Applicabilityfromkm),
				Valid: true,
			}
		}
		if dr.Applicabilitytokm != nil {
			to = sql.NullInt64{
				Int64: int64(*dr.Applicabilitytokm),
				Valid: true,
			}
		}

		var tfrom, tto, dateInfo pgtype.Timestamptz

		if dr.Startdate != nil {
			tfrom = pgtype.Timestamptz{
				Time:   time.Time(*dr.Startdate),
				Status: pgtype.Present,
			}
		} else {
			tfrom = pgtype.Timestamptz{
				Status: pgtype.Null,
			}
		}

		if dr.Enddate != nil {
			tto = pgtype.Timestamptz{
				Time:   time.Time(*dr.Enddate),
				Status: pgtype.Present,
			}
		} else {
			tto = pgtype.Timestamptz{
				Status: pgtype.Null,
			}
		}

		validity := pgtype.Tstzrange{
			Lower:     tfrom,
			Upper:     tto,
			LowerType: pgtype.Inclusive,
			UpperType: pgtype.Inclusive,
			Status:    pgtype.Present,
		}

		if dr.Infodate != nil {
			dateInfo = pgtype.Timestamptz{
				Time:   time.Time(*dr.Infodate),
				Status: pgtype.Present,
			}
		} else {
			dateInfo = pgtype.Timestamptz{
				Status: pgtype.Null,
			}
		}

		var geodref sql.NullString
		if dr.Geodref != nil {
			geodref = sql.NullString{
				String: string(*dr.Geodref),
				Valid:  true,
			}
		}

		var source sql.NullString
		if dr.Source != nil {
			source = sql.NullString{
				String: string(*dr.Source),
				Valid:  true,
			}
		}

		tx, err := conn.BeginTx(ctx, nil)
		if err != nil {
			return nil, err
		}
		defer tx.Rollback()

		if _, err := tx.StmtContext(ctx, insertStmt).ExecContext(ctx,
			ic.code.CountryCode,
			ic.code.LoCode,
			ic.code.FairwaySection,
			ic.code.Orc,
			ic.code.Hectometre,
			string(*dr.Objname.Loc),
			float64(*dr.Lon), float64(*dr.Lat),
			from,
			to,
			&validity,
			float64(*dr.Zeropoint),
			geodref,
			&dateInfo,
			source,
		); err != nil {
			feedback.Warn(handleError(err).Error())
			tx.Rollback()
			continue
		}

		// Remove obsolete reference water levels
		var currLevels pgtype.VarcharArray
		currLevels.Set([]string{
			string(*dr.Reflevel1code),
			string(*dr.Reflevel2code),
			string(*dr.Reflevel3code),
		})
		var delRef string
		err = tx.StmtContext(
			ctx, deleteReferenceWaterLevelsStmt).QueryRowContext(ctx,
			ic.code.CountryCode,
			ic.code.LoCode,
			ic.code.FairwaySection,
			ic.code.Orc,
			ic.code.Hectometre,
			&currLevels,
		).Scan(&delRef)
		switch {
		case err == sql.ErrNoRows:
			// There was nothing to delete
		case err != nil:
			return nil, err
		default:
			feedback.Info("Removed reference water level %s from %s",
				delRef, ic.code)
		}

		// Insert/update reference water levels
		for _, wl := range []struct {
			level **erdms.RisreflevelcodeType
			value **erdms.RisreflevelvalueType
		}{
			{&dr.Reflevel1code, &dr.Reflevel1value},
			{&dr.Reflevel2code, &dr.Reflevel2value},
			{&dr.Reflevel3code, &dr.Reflevel3value},
		} {
			if *wl.level == nil || *wl.value == nil {
				continue
			}

			var isNtSDepthRef bool
			if err := tx.StmtContext(ctx, isNtSDepthRefStmt).QueryRowContext(
				ctx,
				string(**wl.level),
			).Scan(
				&isNtSDepthRef,
			); err != nil {
				return nil, err
			}
			if !isNtSDepthRef {
				feedback.Warn(
					"Reference level code '%s' is not in line "+
						"with the NtS reference_code table",
					string(**wl.level))
			}

			if _, err := tx.StmtContext(ctx, insertWaterLevelStmt).ExecContext(
				ctx,
				ic.code.CountryCode,
				ic.code.LoCode,
				ic.code.FairwaySection,
				ic.code.Orc,
				ic.code.Hectometre,
				string(**wl.level),
				int64(**wl.value),
			); err != nil {
				feedback.Warn(handleError(err).Error())
				tx.Rollback()
				continue
			}
		}

		if err = tx.Commit(); err != nil {
			return nil, err
		}
	}

	feedback.Info("Refreshing gauges took %s.",
		time.Since(start))

	return nil, err
}