view pkg/imports/wg.go @ 2549:9bf6b767a56a

client: refactored and improved splitscreen for diagrams To make different diagrams possible, the splitscreen view needed to be decoupled from the cross profiles. Also the style has changed to make it more consistent with the rest of the app. The standard box header is now used and there are collapse and expand animations.
author Markus Kottlaender <markus@intevation.de>
date Fri, 08 Mar 2019 08:50:47 +0100
parents e199655809c1
children eb1d119f253f
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Tom Gottfried <tom.gottfried@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"errors"
	"fmt"
	"strings"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/models"
	"gemma.intevation.de/gemma/pkg/soap"
	"gemma.intevation.de/gemma/pkg/soap/erdms"
)

type WaterwayGauge struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Username is the username used to authenticate.
	Username string `json:"username"`
	// Passwort is the password to authenticate.
	Password string `json:"password"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

const WGJobKind JobKind = "wg"

type wgJobCreator struct{}

func init() {
	RegisterJobCreator(WGJobKind, wgJobCreator{})
}

func (wgJobCreator) Description() string { return "waterway gauges" }

func (wgJobCreator) AutoAccept() bool { return true }

func (wgJobCreator) Create() Job { return new(WaterwayGauge) }

func (wgJobCreator) Depends() []string {
	return []string{
		"gauges",
		"gauges_reference_water_levels",
	}
}

// StageDone does nothing as there is no staging for gauges.
func (wgJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil }

// CleanUp does nothing as there is nothing to cleanup with gauges.
func (*WaterwayGauge) CleanUp() error { return nil }

const (
	selectCurrentUserCountrySQL = `SELECT users.current_user_country()`

	hasGaugeSQL = `
SELECT true
FROM waterway.gauges
WHERE location = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)`

	deleteReferenceWaterLevelsSQL = `
DELETE FROM waterway.gauges_reference_water_levels
WHERE gauge_id = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)`

	insertGaugeSQL = `
INSERT INTO waterway.gauges (
  location,
  objname,
  geom,
  applicability_from_km,
  applicability_to_km,
  validity,
  zero_point,
  geodref,
  date_info,
  source_organization
) VALUES (
  ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
  $6,
  ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14,
  $15
) ON CONFLICT (location) DO UPDATE SET
  objname = $6,
  geom = ST_SetSRID(ST_MakePoint($7, $8), 4326)::geography,
  applicability_from_km = $9,
  applicability_to_km = $10,
  validity = $11,
  zero_point = $12,
  geodref = $13,
  date_info = $14,
  source_organization = $15
`
	isNtSDepthRefSQL = `
SELECT EXISTS(SELECT 1 FROM depth_references WHERE depth_reference = $1)`

	insertReferenceWaterLevelsSQL = `
INSERT INTO waterway.gauges_reference_water_levels (
  gauge_id,
  depth_reference,
  value
) VALUES (
  ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int),
  $6,
  $7
)
`
)

func (wg *WaterwayGauge) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	var country string
	err = tx.QueryRowContext(ctx, selectCurrentUserCountrySQL).Scan(&country)
	switch {
	case err == sql.ErrNoRows:
		return nil, errors.New("Cannot figure out user country")
	case err != nil:
		return nil, err
	}

	country = strings.ToUpper(country)
	feedback.Info("Using country '%s'.", country)

	var auth *soap.BasicAuth
	if wg.Username != "" {
		auth = &soap.BasicAuth{
			Login:    wg.Username,
			Password: wg.Password,
		}
	}

	client := erdms.NewRefService(wg.URL, wg.Insecure, auth)

	request := &erdms.GetRisDataXML{
		GetRisDataXMLType: &erdms.GetRisDataXMLType{
			Subcode: erdms.NoNS{Text: country + "%"},
			Funcode: erdms.NoNS{Text: "wtwgag"},
		},
	}

	data, err := client.GetRisDataXML(request)

	if err != nil {
		return nil, fmt.Errorf("Error requesting ERDMS service: %v", err)
	}

	hasGaugeStmt, err := tx.PrepareContext(ctx, hasGaugeSQL)
	if err != nil {
		return nil, err
	}
	defer hasGaugeStmt.Close()

	var ignored int

	type idxCode struct {
		idx  int
		code *models.Isrs
	}

	var news, olds []idxCode

	for i, dr := range data.RisdataReturn {
		if dr.RisidxCode == nil {
			ignored++
			continue
		}
		code, err := models.IsrsFromString(string(*dr.RisidxCode))
		if err != nil {
			feedback.Warn("invalid ISRS code %v", err)
			ignored++
			continue
		}

		if dr.Objname.Loc == nil {
			feedback.Warn("missing objname: %s", code)
			ignored++
			continue
		}

		if dr.Lat == nil || dr.Lon == nil {
			feedback.Warn("missing lat/lon: %s", code)
			ignored++
			continue
		}

		if dr.Zeropoint == nil {
			feedback.Warn("missing zeropoint: %s", code)
			ignored++
			continue
		}

		var dummy bool
		err = hasGaugeStmt.QueryRowContext(ctx,
			code.CountryCode,
			code.LoCode,
			code.FairwaySection,
			code.Orc,
			code.Hectometre,
		).Scan(&dummy)
		switch {
		case err == sql.ErrNoRows:
			news = append(news, idxCode{idx: i, code: code})
		case err != nil:
			return nil, err
		case !dummy:
			return nil, errors.New("Unexpected result")
		default:
			olds = append(olds, idxCode{idx: i, code: code})
		}
	}
	feedback.Info("ignored gauges: %d", ignored)
	feedback.Info("new gauges: %d", len(news))
	feedback.Info("update gauges: %d", len(olds))

	if len(news) == 0 && len(olds) == 0 {
		return nil, UnchangedError("nothing to do")
	}

	// delete reference water leves of the old.
	if len(olds) > 0 {
		deleteReferenceWaterLevelsStmt, err := tx.PrepareContext(
			ctx, deleteReferenceWaterLevelsSQL)
		if err != nil {
			return nil, err
		}
		defer deleteReferenceWaterLevelsStmt.Close()
		for i := range olds {
			code := olds[i].code
			if _, err := deleteReferenceWaterLevelsStmt.ExecContext(ctx,
				code.CountryCode,
				code.LoCode,
				code.FairwaySection,
				code.Orc,
				code.Hectometre,
			); err != nil {
				return nil, err
			}
		}
		// treat them as new
		news = append(news, olds...)
	}

	insertStmt, err := tx.PrepareContext(ctx, insertGaugeSQL)
	if err != nil {
		return nil, err
	}
	defer insertStmt.Close()

	insertWaterLevelStmt, err := tx.PrepareContext(
		ctx, insertReferenceWaterLevelsSQL)
	if err != nil {
		return nil, err
	}
	defer insertWaterLevelStmt.Close()

	isNtSDepthRefStmt, err := tx.PrepareContext(ctx, isNtSDepthRefSQL)
	if err != nil {
		return nil, err
	}
	defer isNtSDepthRefStmt.Close()

	// insert/update the gauges
	for i := range news {
		ic := &news[i]
		dr := data.RisdataReturn[ic.idx]

		feedback.Info("insert/update %s", ic.code)

		var from, to sql.NullInt64

		if dr.Applicabilityfromkm != nil {
			from = sql.NullInt64{
				Int64: int64(*dr.Applicabilityfromkm),
				Valid: true,
			}
		}
		if dr.Applicabilitytokm != nil {
			to = sql.NullInt64{
				Int64: int64(*dr.Applicabilitytokm),
				Valid: true,
			}
		}

		var tfrom, tto, dateInfo pgtype.Timestamptz

		if dr.Startdate != nil {
			tfrom = pgtype.Timestamptz{
				Time:   time.Time(*dr.Startdate),
				Status: pgtype.Present,
			}
		} else {
			tfrom = pgtype.Timestamptz{
				Status: pgtype.Null,
			}
		}

		if dr.Enddate != nil {
			tto = pgtype.Timestamptz{
				Time:   time.Time(*dr.Enddate),
				Status: pgtype.Present,
			}
		} else {
			tto = pgtype.Timestamptz{
				Status: pgtype.Null,
			}
		}

		validity := pgtype.Tstzrange{
			Lower:     tfrom,
			Upper:     tto,
			LowerType: pgtype.Inclusive,
			UpperType: pgtype.Inclusive,
			Status:    pgtype.Present,
		}

		if dr.Infodate != nil {
			dateInfo = pgtype.Timestamptz{
				Time:   time.Time(*dr.Infodate),
				Status: pgtype.Present,
			}
		} else {
			dateInfo = pgtype.Timestamptz{
				Status: pgtype.Null,
			}
		}

		var geodref sql.NullString
		if dr.Geodref != nil {
			geodref = sql.NullString{
				String: string(*dr.Geodref),
				Valid:  true,
			}
		}

		var source sql.NullString
		if dr.Source != nil {
			source = sql.NullString{
				String: string(*dr.Source),
				Valid:  true,
			}
		}

		if _, err := insertStmt.ExecContext(ctx,
			ic.code.CountryCode,
			ic.code.LoCode,
			ic.code.FairwaySection,
			ic.code.Orc,
			ic.code.Hectometre,
			string(*dr.Objname.Loc),
			float64(*dr.Lon), float64(*dr.Lat),
			from,
			to,
			&validity,
			float64(*dr.Zeropoint),
			geodref,
			&dateInfo,
			source,
		); err != nil {
			return nil, err
		}

		for _, wl := range []struct {
			level **erdms.RisreflevelcodeType
			value **erdms.RisreflevelvalueType
		}{
			{&dr.Reflevel1code, &dr.Reflevel1value},
			{&dr.Reflevel2code, &dr.Reflevel2value},
			{&dr.Reflevel3code, &dr.Reflevel3value},
		} {
			if *wl.level == nil || *wl.value == nil {
				continue
			}

			var isNtSDepthRef bool
			if err := isNtSDepthRefStmt.QueryRowContext(
				ctx,
				string(**wl.level),
			).Scan(
				&isNtSDepthRef,
			); err != nil {
				return nil, err
			}
			if !isNtSDepthRef {
				feedback.Warn(
					"Reference level code '%s' is not in line with the NtS reference_code table",
					string(**wl.level))
			}

			if _, err := insertWaterLevelStmt.ExecContext(
				ctx,
				ic.code.CountryCode,
				ic.code.LoCode,
				ic.code.FairwaySection,
				ic.code.Orc,
				ic.code.Hectometre,
				string(**wl.level),
				int64(**wl.value),
			); err != nil {
				return nil, err
			}
		}
	}

	if err = tx.Commit(); err == nil {
		feedback.Info("Refreshing gauges successfully took %s.",
			time.Since(start))
	} else {
		feedback.Error("Refreshing gauges failed after %s.",
			time.Since(start))
	}

	return nil, err
}