view pkg/imports/wg.go @ 2130:f3aabc05f9b2

Fix constraints on waterway profiles staging_done in the UNIQUE constraint had no effect, because the exclusion constraint prevented two rows with equal location and validity anyhow. Adding staging_done to the exclusion constraint makes the UNIQUE constraint checking only a corner case of what the exclusion constraint checks. Thus, remove the UNIQUE constraint. Casting staging_done to int is needed because there is no appropriate operator class for booleans. Casting to smallint or even bit would have been better (i.e. should result in smaller index size), but that would have required creating such a CAST, in addition.
author Tom Gottfried <tom@intevation.de>
date Wed, 06 Feb 2019 15:42:32 +0100
parents d966f03ea819
children b868cb653c4d
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"errors"
	"fmt"
	"strings"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/models"
	"gemma.intevation.de/gemma/pkg/soap"
	"gemma.intevation.de/gemma/pkg/soap/erdms"
)

type WaterwayGauge struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Username is the username used to authenticate.
	Username string `json:"username"`
	// Passwort is the password to authenticate.
	Password string `json:"password"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

const WGJobKind JobKind = "wg"

type wgJobCreator struct{}

func init() {
	RegisterJobCreator(WGJobKind, wgJobCreator{})
}

func (wgJobCreator) Description() string { return "waterway gauges" }

func (wgJobCreator) AutoAccept() bool { return true }

func (wgJobCreator) Create(_ JobKind, data string) (Job, error) {
	wg := new(WaterwayGauge)
	if err := common.FromJSONString(data, wg); err != nil {
		return nil, err
	}
	return wg, nil
}

func (wgJobCreator) Depends() []string {
	return []string{
		"gauges",
		"gauges_reference_water_levels",
	}
}

// StageDone does nothing as there is no staging for gauges.
func (wgJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil }

// CleanUp does nothing as there is nothing to cleanup with gauges.
func (*WaterwayGauge) CleanUp() error { return nil }

const (
	selectCurrentUserCountrySQL = `SELECT users.current_user_country()`

	hasGaugeSQL = `
SELECT true
FROM waterway.gauges
WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)`

	deleteReferenceWaterLevelsSQL = `
DELETE FROM waterway.gauges_reference_water_levels
WHERE gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)`

	insertGaugeSQL = `
INSERT INTO waterway.gauges (
  location,
  objname,
  geom,
  applicability_from_km,
  applicability_to_km,
  validity,
  zero_point,
  geodref,
  date_info,
  source_organization
) VALUES (
  ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
  $6,
  ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14,
  $15
) ON CONFLICT (location) DO UPDATE SET
  objname = $6,
  geom = ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography,
  applicability_from_km = $9,
  applicability_to_km = $10,
  validity = $11,
  zero_point = $12,
  geodref = $13,
  date_info = $14,
  source_organization = $15
`
	insertReferenceWaterLevelsSQL = `
INSERT INTO waterway.gauges_reference_water_levels (
  gauge_id,
  reference_water_level,
  value
) VALUES (
  ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
  $6,
  $7
)
`
)

func (wg *WaterwayGauge) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	var country string
	err = tx.QueryRowContext(ctx, selectCurrentUserCountrySQL).Scan(&country)
	switch {
	case err == sql.ErrNoRows:
		return nil, errors.New("Cannot figure out user country")
	case err != nil:
		return nil, err
	}

	country = strings.ToUpper(country)
	feedback.Info("Using country '%s'.", country)

	var auth *soap.BasicAuth
	if wg.Username != "" {
		auth = &soap.BasicAuth{
			Login:    wg.Username,
			Password: wg.Password,
		}
	}

	client := erdms.NewRefService(wg.URL, wg.Insecure, auth)

	request := &erdms.GetRisDataXML{
		GetRisDataXMLType: &erdms.GetRisDataXMLType{
			Subcode: erdms.NoNS{Text: country + "%"},
			Funcode: erdms.NoNS{Text: "wtwgag"},
		},
	}

	data, err := client.GetRisDataXML(request)

	if err != nil {
		return nil, fmt.Errorf("Error requesting ERDMS service: %v", err)
	}

	hasGaugeStmt, err := tx.PrepareContext(ctx, hasGaugeSQL)
	if err != nil {
		return nil, err
	}
	defer hasGaugeStmt.Close()

	var ignored int

	type idxCode struct {
		idx  int
		code *models.Isrs
	}

	var news, olds []idxCode

	for i, dr := range data.RisdataReturn {
		if dr.RisidxCode == nil {
			ignored++
			continue
		}
		code, err := models.IsrsFromString(string(*dr.RisidxCode))
		if err != nil {
			feedback.Warn("invalid ISRS code %v", err)
			ignored++
			continue
		}

		if dr.Objname.Loc == nil {
			feedback.Warn("missing objname: %s", code)
			ignored++
			continue
		}

		if dr.Lat == nil || dr.Lon == nil {
			feedback.Warn("missing lat/lon: %s", code)
			ignored++
			continue
		}

		if dr.Zeropoint == nil {
			feedback.Warn("missing zeropoint: %s", code)
			ignored++
			continue
		}

		var dummy bool
		err = hasGaugeStmt.QueryRowContext(ctx,
			code.CountryCode,
			code.LoCode,
			code.FairwaySection,
			code.Orc,
			code.Hectometre,
		).Scan(&dummy)
		switch {
		case err == sql.ErrNoRows:
			news = append(news, idxCode{idx: i, code: code})
		case err != nil:
			return nil, err
		case !dummy:
			return nil, errors.New("Unexpected result")
		default:
			olds = append(olds, idxCode{idx: i, code: code})
		}
	}
	feedback.Info("ignored gauges: %d", ignored)
	feedback.Info("new gauges: %d", len(news))
	feedback.Info("update gauges: %d", len(olds))

	if len(news) == 0 && len(olds) == 0 {
		return nil, UnchangedError("nothing to do")
	}

	// delete reference water leves of the old.
	if len(olds) > 0 {
		deleteReferenceWaterLevelsStmt, err := tx.PrepareContext(
			ctx, deleteReferenceWaterLevelsSQL)
		if err != nil {
			return nil, err
		}
		defer deleteReferenceWaterLevelsStmt.Close()
		for i := range olds {
			code := olds[i].code
			if _, err := deleteReferenceWaterLevelsStmt.ExecContext(ctx,
				code.CountryCode,
				code.LoCode,
				code.FairwaySection,
				code.Orc,
				code.Hectometre,
			); err != nil {
				return nil, err
			}
		}
		// treat them as new
		news = append(news, olds...)
	}

	insertStmt, err := tx.PrepareContext(ctx, insertGaugeSQL)
	if err != nil {
		return nil, err
	}
	defer insertStmt.Close()

	insertWaterLevelStmt, err := tx.PrepareContext(
		ctx, insertReferenceWaterLevelsSQL)
	if err != nil {
		return nil, err
	}
	defer insertWaterLevelStmt.Close()

	// insert/update the gauges
	for i := range news {
		ic := &news[i]
		dr := data.RisdataReturn[ic.idx]

		feedback.Info("insert/update %s", ic.code)

		var from, to sql.NullInt64

		if dr.Applicabilityfromkm != nil {
			from = sql.NullInt64{
				Int64: int64(*dr.Applicabilityfromkm),
				Valid: true,
			}
		}
		if dr.Applicabilitytokm != nil {
			to = sql.NullInt64{
				Int64: int64(*dr.Applicabilitytokm),
				Valid: true,
			}
		}

		var tfrom, tto, dateInfo pgtype.Timestamptz

		if dr.Startdate != nil {
			tfrom = pgtype.Timestamptz{
				Time:   time.Time(*dr.Startdate),
				Status: pgtype.Present,
			}
		} else {
			tfrom = pgtype.Timestamptz{
				Status: pgtype.Null,
			}
		}

		if dr.Enddate != nil {
			tto = pgtype.Timestamptz{
				Time:   time.Time(*dr.Enddate),
				Status: pgtype.Present,
			}
		} else {
			tto = pgtype.Timestamptz{
				Status: pgtype.Null,
			}
		}

		validity := pgtype.Tstzrange{
			Lower:     tfrom,
			Upper:     tto,
			LowerType: pgtype.Inclusive,
			UpperType: pgtype.Inclusive,
			Status:    pgtype.Present,
		}

		if dr.Infodate != nil {
			dateInfo = pgtype.Timestamptz{
				Time:   time.Time(*dr.Infodate),
				Status: pgtype.Present,
			}
		} else {
			dateInfo = pgtype.Timestamptz{
				Status: pgtype.Null,
			}
		}

		var geodref sql.NullString
		if dr.Geodref != nil {
			geodref = sql.NullString{
				String: string(*dr.Geodref),
				Valid:  true,
			}
		}

		var source sql.NullString
		if dr.Source != nil {
			source = sql.NullString{
				String: string(*dr.Source),
				Valid:  true,
			}
		}

		if _, err := insertStmt.ExecContext(ctx,
			ic.code.CountryCode,
			ic.code.LoCode,
			ic.code.FairwaySection,
			ic.code.Orc,
			ic.code.Hectometre,
			string(*dr.Objname.Loc),
			int64(*dr.Lon), int64(*dr.Lat),
			from,
			to,
			&validity,
			float64(*dr.Zeropoint),
			geodref,
			&dateInfo,
			source,
		); err != nil {
			return nil, err
		}

		for _, wl := range []struct {
			level **erdms.RisreflevelcodeType
			value **erdms.RisreflevelvalueType
		}{
			{&dr.Reflevel1code, &dr.Reflevel1value},
			{&dr.Reflevel2code, &dr.Reflevel2value},
			{&dr.Reflevel3code, &dr.Reflevel3value},
		} {
			if *wl.level == nil || *wl.value == nil {
				continue
			}
			if _, err := insertWaterLevelStmt.ExecContext(
				ctx,
				ic.code.CountryCode,
				ic.code.LoCode,
				ic.code.FairwaySection,
				ic.code.Orc,
				ic.code.Hectometre,
				string(**wl.level),
				int64(**wl.value),
			); err != nil {
				return nil, err
			}
		}
	}

	if err = tx.Commit(); err == nil {
		feedback.Info("Refreshing gauges successfully took %s.",
			time.Since(start))
	} else {
		feedback.Error("Refreshing gauges failed after %s.",
			time.Since(start))
	}

	return nil, err
}