view pkg/imports/gm.go @ 5490:5f47eeea988d logging

Use own logging package.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 20 Sep 2021 17:45:39 +0200
parents 56c589f7435d
children 133dc5b3076a
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Raimund Renkert <raimund.renkert@intevation.de>
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Tom Gottfried <tom.gottfried@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"fmt"
	"sort"
	"strings"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/log"
	"gemma.intevation.de/gemma/pkg/models"
	"gemma.intevation.de/gemma/pkg/pgxutils"
	"gemma.intevation.de/gemma/pkg/soap/nts"
)

// GaugeMeasurement is an import job to import
// gauges measurement data from a NtS SOAP service.
type GaugeMeasurement struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

// Description gives a short info about relevant facts of this import.
func (gm *GaugeMeasurement) Description() (string, error) {
	return gm.URL, nil
}

// GMJobKind is the import queue type identifier.
const GMJobKind JobKind = "gm"

const (
	listGaugesSQL = `
SELECT
  (location).country_code,
  (location).locode,
  (location).fairway_section,
  (location).orc,
  (location).hectometre
FROM waterway.gauges
WHERE (location).country_code = (
    SELECT country FROM users.list_users WHERE username = current_user)
  OR pg_has_role('sys_admin', 'MEMBER')
`

	// Note: we do not expect corrections of data through this service.  So
	// any constraint conflicts are triggered by redundant data which
	// can be dropped.
	insertGMSQL = `
INSERT INTO waterway.gauge_measurements (
  location,
  measure_date,
  sender,
  language_code,
  country_code,
  date_issue,
  reference_code,
  water_level,
  date_info,
  source_organization,
  staging_done
) VALUES (
  ($1, $2, $3, $4, $5),
  $6,
  $7,
  $8,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14,
  true
)
ON CONFLICT DO NOTHING
RETURNING 1
`

	insertGPSQL = `
INSERT INTO waterway.gauge_predictions (
  location,
  measure_date,
  sender,
  language_code,
  country_code,
  date_issue,
  reference_code,
  water_level,
  conf_interval,
  date_info,
  source_organization
) VALUES(
  ($1, $2, $3, $4, $5),
  $6,
  $7,
  $8,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14,
  $15
)
ON CONFLICT DO NOTHING
RETURNING 1
`
)

type gmJobCreator struct{}

func init() {
	RegisterJobCreator(GMJobKind, gmJobCreator{})
}

func (gmJobCreator) Description() string { return "gauge measurements" }

func (gmJobCreator) Create() Job { return new(GaugeMeasurement) }

func (gmJobCreator) Depends() [2][]string {
	return [2][]string{
		{"gauge_measurements"},
		{"gauges"},
	}
}

func (gmJobCreator) AutoAccept() bool { return true }

// StageDone is a NOP for gauge measurements imports.
func (gmJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error {
	return nil
}

// CleanUp of a gauge measurement import is a NOP.
func (*GaugeMeasurement) CleanUp() error { return nil }

// Do executes the actual import.
func (gm *GaugeMeasurement) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	fetch := func() ([]*nts.RIS_Message_Type, error) {
		client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil)

		mt := nts.Message_type_typeWRM
		now := time.Now()
		dis := []*nts.Date_pair{{
			Date_start: nts.Date{Time: now.AddDate(0, 0, -1)},
			Date_end:   nts.Date{Time: now.AddDate(0, 0, +1)},
		}}

		req := &nts.Get_messages_query{
			Message_type: &mt,
			Dates_issue:  dis,
		}

		const maxTries = 3

		tries := 0

	again:
		resp, err := client.Get_messages(req)

		if err != nil {
			if t, ok := err.(interface{ Timeout() bool }); ok && t.Timeout() && tries < maxTries {
				log.Warnln("NtS SOAP request timed out. Trying again.")
				tries++
				goto again
			}
			return nil, fmt.Errorf(
				"Error requesting NtS service: %v", err)
		}

		result := resp.Result_message
		if result == nil {
			for _, e := range resp.Result_error {
				if e != nil {
					feedback.Error("Error code: %s", *e)
				} else {
					feedback.Error("Unknown error")
				}
			}
		}
		return result, nil
	}

	return storeGaugeMeasurements(
		ctx,
		importID,
		fetch,
		conn,
		feedback,
	)
}

func loadGauges(ctx context.Context, conn *sql.Conn) ([]string, error) {

	rows, err := conn.QueryContext(ctx, listGaugesSQL)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var gauges []string

	for rows.Next() {
		var g models.Isrs
		if err = rows.Scan(
			&g.CountryCode,
			&g.LoCode,
			&g.FairwaySection,
			&g.Orc,
			&g.Hectometre,
		); err != nil {
			return nil, err
		}
		gauges = append(gauges, g.String())
	}

	if err = rows.Err(); err != nil {
		return nil, err
	}

	if len(gauges) == 0 {
		return nil, UnchangedError(
			"No gauges for which measurements can be imported in database")
	}

	sort.Strings(gauges)

	return gauges, nil
}

func storeGaugeMeasurements(
	ctx context.Context,
	importID int64,
	fetch func() ([]*nts.RIS_Message_Type, error),
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	// Get gauges from database, for which user is allowed to import data
	gauges, err := loadGauges(ctx, conn)
	if err != nil {
		return nil, err
	}

	gids, err := doForGM(ctx, gauges, fetch, conn, feedback)
	if err != nil {
		return nil, err
	}

	if len(gids) == 0 {
		return nil, UnchangedError("No new gauge measurements found")
	}

	feedback.Info(
		"Importing gauge measurements took %s", time.Since(start))

	// TODO: needs to be filled more useful.
	summary := struct {
		GaugeMeasuremets []string `json:"gaugeMeasurements"`
	}{
		GaugeMeasuremets: gids,
	}
	return &summary, err
}

// rescale returns a scaling function to bring the unit all to cm.
func rescale(unit string) (func(*float32), error) {

	var scale float32

	switch strings.ToLower(unit) {
	case "mm":
		scale = 0.1
	case "cm":
		scale = 1.0
	case "dm":
		scale = 10.0
	case "m":
		scale = 100.0
	case "hm":
		scale = 10000.0
	case "km":
		scale = 100000.0
	default:
		return nil, fmt.Errorf("unknown unit '%s'", unit)
	}

	fn := func(x *float32) {
		if x != nil {
			*x *= scale
		}
	}
	return fn, nil
}

func doForGM(
	ctx context.Context,
	gauges []string,
	fetch func() ([]*nts.RIS_Message_Type, error),
	conn *sql.Conn,
	feedback Feedback,
) ([]string, error) {

	insertGPStmt, err := conn.PrepareContext(ctx, insertGPSQL)
	if err != nil {
		return nil, err
	}
	defer insertGPStmt.Close()

	insertGMStmt, err := conn.PrepareContext(ctx, insertGMSQL)
	if err != nil {
		return nil, err
	}
	defer insertGMStmt.Close()

	// lookup to see if data can be imported for gauge
	isKnown := func(s string) bool {
		idx := sort.SearchStrings(gauges, s)
		return idx < len(gauges) && gauges[idx] == s
	}

	result, err := fetch()
	if err != nil {
		return nil, err
	}

	var gids []string
	for _, msg := range result {
		for _, wrm := range msg.Wrm {
			curr := string(*wrm.Geo_object.Id)
			curr = strings.TrimSpace(curr)
			currIsrs, err := models.IsrsFromString(curr)
			if err != nil {
				feedback.Warn("Invalid ISRS code %v", err)
				continue
			}
			feedback.Info("Found measurements/predictions for %s", curr)
			if !isKnown(curr) {
				feedback.Warn("Cannot find gauge %q for import", curr)
				continue
			}

			var referenceCode string
			if wrm.Reference_code == nil {
				feedback.Info("'Reference_code' not specified. Assuming 'ZPG'")
				referenceCode = "ZPG"
			} else {
				referenceCode = string(*wrm.Reference_code)
			}

			badValue := 0
			newM, newP := 0, 0
			for _, measure := range wrm.Measure {
				var unit string
				if *measure.Measure_code != nts.Measure_code_enumWAL {
					feedback.Warn("Ignored message with measure_code %s",
						*measure.Measure_code)
					continue
				}
				if measure.Unit == nil {
					feedback.Info("'Unit' not specified. Assuming 'cm'")
					unit = "cm"
				} else {
					unit = string(*measure.Unit)
				}

				if measure.Value == nil {
					feedback.Warn("Missing mandatory value at %s. Ignored (bad service)",
						measure.Measuredate.Format(time.RFC3339))
					continue
				}

				convert, err := rescale(unit)
				if err != nil {
					feedback.Error(err.Error())
					continue
				}
				convert(measure.Value)
				convert(measure.Value_min)
				convert(measure.Value_max)

				// -99999 is used by some gauges to signal an error
				if *measure.Value == -99999 {
					badValue++
					continue
				}

				var dummy int
				if measure.Predicted {
					confInterval := pgtype.Numrange{
						Lower:     pgtype.Numeric{Status: pgtype.Null},
						Upper:     pgtype.Numeric{Status: pgtype.Null},
						LowerType: pgtype.Inclusive,
						UpperType: pgtype.Inclusive,
						Status:    pgtype.Null,
					}
					if measure.Value_min != nil && measure.Value_max != nil {
						valueMin := pgtype.Numeric{Status: pgtype.Null}
						valueMax := pgtype.Numeric{Status: pgtype.Null}
						valueMin.Set(measure.Value_min)
						valueMax.Set(measure.Value_max)
						confInterval = pgtype.Numrange{
							Lower:     valueMin,
							Upper:     valueMax,
							LowerType: pgtype.Inclusive,
							UpperType: pgtype.Inclusive,
							Status:    pgtype.Present,
						}
					}
					err = insertGPStmt.QueryRowContext(
						ctx,
						currIsrs.CountryCode,
						currIsrs.LoCode,
						currIsrs.FairwaySection,
						currIsrs.Orc,
						currIsrs.Hectometre,
						measure.Measuredate.Time,
						msg.Identification.From,
						msg.Identification.Language_code,
						msg.Identification.Country_code,
						msg.Identification.Date_issue.Time,
						referenceCode,
						measure.Value,
						&confInterval,
						msg.Identification.Date_issue.Time,
						msg.Identification.Originator,
					).Scan(&dummy)
					switch {
					case err == sql.ErrNoRows:
						// thats expected, nothing to do
					case err != nil:
						feedback.Error(pgxutils.ReadableError{Err: err}.Error())
					default:
						newP++
					}
				} else {
					err = insertGMStmt.QueryRowContext(
						ctx,
						currIsrs.CountryCode,
						currIsrs.LoCode,
						currIsrs.FairwaySection,
						currIsrs.Orc,
						currIsrs.Hectometre,
						measure.Measuredate.Time,
						msg.Identification.From,
						msg.Identification.Language_code,
						msg.Identification.Country_code,
						msg.Identification.Date_issue.Time,
						referenceCode,
						measure.Value,
						msg.Identification.Date_issue.Time,
						msg.Identification.Originator,
					).Scan(&dummy)
					switch {
					case err == sql.ErrNoRows:
						// thats expected, nothing to do
					case err != nil:
						feedback.Error(pgxutils.ReadableError{Err: err}.Error())
					default:
						newM++
					}
				}
			}
			if badValue > 0 {
				feedback.Warn("Ignored %d measurements with value -99999",
					badValue)
			}
			feedback.Info("Inserted %d measurements for %s",
				newM, curr)
			feedback.Info("Inserted %d predictions for %s",
				newP, curr)
			gids = append(gids, curr)
		}
	}
	return gids, nil
}