view pkg/imports/gm.go @ 5711:2dd155cc95ec revive-cleanup

Fix all revive issue (w/o machine generated stuff).
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 20 Feb 2024 22:22:57 +0100
parents 7724bb582e80
children 6270951dda28
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Raimund Renkert <raimund.renkert@intevation.de>
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Tom Gottfried <tom.gottfried@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"fmt"
	"sort"
	"strconv"
	"strings"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/log"
	"gemma.intevation.de/gemma/pkg/models"
	"gemma.intevation.de/gemma/pkg/pgxutils"
	"gemma.intevation.de/gemma/pkg/soap/nts"
)

// GaugeMeasurement is an import job to import
// gauges measurement data from a NtS SOAP service.
type GaugeMeasurement struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

// Description gives a short info about relevant facts of this import.
func (gm *GaugeMeasurement) Description([]string) (string, error) {
	return gm.URL, nil
}

// GMJobKind is the import queue type identifier.
const GMJobKind JobKind = "gm"

const (
	listGaugesSQL = `
SELECT
  (location).country_code,
  (location).locode,
  (location).fairway_section,
  (location).orc,
  (location).hectometre
FROM waterway.gauges
WHERE (location).country_code = (
    SELECT country FROM users.list_users WHERE username = current_user)
  OR pg_has_role('sys_admin', 'MEMBER')
`

	// Note: we do not expect corrections of data through this service.  So
	// any constraint conflicts are triggered by redundant data which
	// can be dropped.
	insertGMSQL = `
INSERT INTO waterway.gauge_measurements (
  location,
  measure_date,
  sender,
  language_code,
  country_code,
  date_issue,
  reference_code,
  water_level,
  date_info,
  source_organization,
  staging_done
) VALUES (
  ($1, $2, $3, $4, $5),
  $6,
  $7,
  $8,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14,
  true
)
ON CONFLICT DO NOTHING
RETURNING 1
`

	insertGPSQL = `
INSERT INTO waterway.gauge_predictions (
  location,
  measure_date,
  sender,
  language_code,
  country_code,
  date_issue,
  reference_code,
  water_level,
  conf_interval,
  date_info,
  source_organization
) VALUES(
  ($1, $2, $3, $4, $5),
  $6,
  $7,
  $8,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14,
  $15
)
ON CONFLICT DO NOTHING
RETURNING 1
`
)

type gmJobCreator struct{}

func init() {
	RegisterJobCreator(GMJobKind, gmJobCreator{})
}

func (gmJobCreator) Description() string { return "gauge measurements" }

func (gmJobCreator) Create() Job { return new(GaugeMeasurement) }

func (gmJobCreator) Depends() [2][]string {
	return [2][]string{
		{"gauge_measurements"},
		{"gauges"},
	}
}

func (gmJobCreator) AutoAccept() bool { return true }

// StageDone is a NOP for gauge measurements imports.
func (gmJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error {
	return nil
}

// CleanUp of a gauge measurement import is a NOP.
func (*GaugeMeasurement) CleanUp() error { return nil }

// Do executes the actual import.
func (gm *GaugeMeasurement) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	fetch := func() ([]*nts.RIS_Message_Type, error) {
		client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil)

		mt := nts.Message_type_typeWRM
		now := time.Now()
		dis := []*nts.Date_pair{{
			Date_start: nts.Date{Time: now.AddDate(0, 0, -1)},
			Date_end:   nts.Date{Time: now.AddDate(0, 0, +1)},
		}}

		req := &nts.Get_messages_query{
			Message_type: &mt,
			Dates_issue:  dis,
		}

		const maxTries = 3

		tries := 0

	again:
		resp, err := client.Get_messages(req)

		if err != nil {
			if t, ok := err.(interface{ Timeout() bool }); ok && t.Timeout() && tries < maxTries {
				log.Warnln("NtS SOAP request timed out. Trying again.")
				tries++
				goto again
			}
			return nil, fmt.Errorf(
				"Error requesting NtS service: %v", err)
		}

		result := resp.Result_message
		if result == nil {
			for _, e := range resp.Result_error {
				if e != nil {
					feedback.Error("Error code: %s", *e)
				} else {
					feedback.Error("Unknown error")
				}
			}
		}
		return result, nil
	}

	return storeGaugeMeasurements(
		ctx,
		importID,
		fetch,
		conn,
		feedback,
	)
}

func loadGauges(ctx context.Context, conn *sql.Conn) ([]string, error) {

	rows, err := conn.QueryContext(ctx, listGaugesSQL)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var gauges []string

	for rows.Next() {
		var g models.Isrs
		if err = rows.Scan(
			&g.CountryCode,
			&g.LoCode,
			&g.FairwaySection,
			&g.Orc,
			&g.Hectometre,
		); err != nil {
			return nil, err
		}
		gauges = append(gauges, g.String())
	}

	if err = rows.Err(); err != nil {
		return nil, err
	}

	if len(gauges) == 0 {
		return nil, UnchangedError(
			"No gauges for which measurements can be imported in database")
	}

	sort.Strings(gauges)

	return gauges, nil
}

func storeGaugeMeasurements(
	ctx context.Context,
	_ int64,
	fetch func() ([]*nts.RIS_Message_Type, error),
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	// Get gauges from database, for which user is allowed to import data
	gauges, err := loadGauges(ctx, conn)
	if err != nil {
		return nil, err
	}

	gids, err := doForGM(ctx, gauges, fetch, conn, feedback)
	if err != nil {
		return nil, err
	}

	if len(gids) == 0 {
		return nil, UnchangedError("No new gauge measurements found")
	}

	feedback.Info(
		"Importing gauge measurements took %s", time.Since(start))

	// TODO: needs to be filled more useful.
	summary := struct {
		GaugeMeasuremets []string `json:"gaugeMeasurements"`
	}{
		GaugeMeasuremets: gids,
	}
	return &summary, err
}

// rescale returns a scaling function to bring the unit all to cm.
func rescale(unit string) (func(*float32), error) {

	var scale float32

	switch strings.ToLower(unit) {
	case "mm":
		scale = 0.1
	case "cm":
		scale = 1.0
	case "dm":
		scale = 10.0
	case "m":
		scale = 100.0
	case "hm":
		scale = 10000.0
	case "km":
		scale = 100000.0
	default:
		return nil, fmt.Errorf("unknown unit '%s'", unit)
	}

	fn := func(x *float32) {
		if x != nil {
			*x *= scale
		}
	}
	return fn, nil
}

type gmLog struct {
	gid                 string
	unknown             bool
	assumedZPG          bool
	ignoredMeasureCodes []string
	rescaleErrors       []string
	missingValues       []string
	assumedCM           int
	badValues           int
	measurements        int
	predictions         int
}

type gmLogs []*gmLog

func extend(haystack []string, needle string) []string {
	for _, straw := range haystack {
		if straw == needle {
			return haystack
		}
	}
	return append(haystack, needle)
}

func (gl *gmLog) addRescaleError(err error) {
	gl.rescaleErrors = extend(gl.rescaleErrors, err.Error())
}

func (gl *gmLog) ignoreMeasureCode(mc nts.Measure_code_enum) {
	gl.ignoredMeasureCodes = extend(gl.ignoredMeasureCodes, string(mc))
}

func (gls gmLogs) find(gid string) *gmLog {
	for _, gl := range gls {
		if gl.gid == gid {
			return gl
		}
	}
	return nil
}

func (gls gmLogs) logging(feedback Feedback) {

	gls.logBool(
		(*gmLog).getUnknown,
		"Cannot find following gauges: ",
		feedback.Warn)

	gls.logBool(
		(*gmLog).getAssumedZPG,
		"'Reference_code' not specified. Assuming 'ZPG': ",
		feedback.Warn)

	gls.logInt(
		(*gmLog).getAssumedCM,
		"'Unit' not specified. Assuming 'cm': ",
		feedback.Warn)

	gls.logInt(
		(*gmLog).getBadValues,
		"Ignored measurements with value -99999: ",
		feedback.Warn)

	gls.logString(
		(*gmLog).getMissingValues,
		"Missing mandatory values: ",
		feedback.Warn)

	gls.logString(
		(*gmLog).getRescaleErrors,
		"Cannot convert units: ",
		feedback.Error)

	gls.logString(
		(*gmLog).getIgnoredMeasureCodes,
		"Ignored measure codes: ",
		feedback.Warn)

	gls.logInt(
		(*gmLog).getPredictions,
		"New predictions: ",
		feedback.Info)

	gls.logInt(
		(*gmLog).getMeasurements,
		"New measurements: ",
		feedback.Info)

	gls.logBool(
		(*gmLog).nothingChanged,
		"No changes for: ",
		feedback.Info)
}

func (gl *gmLog) getAssumedZPG() bool              { return gl.assumedZPG }
func (gl *gmLog) getUnknown() bool                 { return gl.unknown }
func (gl *gmLog) getIgnoredMeasureCodes() []string { return gl.ignoredMeasureCodes }
func (gl *gmLog) getRescaleErrors() []string       { return gl.rescaleErrors }
func (gl *gmLog) getMissingValues() []string       { return gl.missingValues }
func (gl *gmLog) getAssumedCM() int                { return gl.assumedCM }
func (gl *gmLog) getBadValues() int                { return gl.badValues }
func (gl *gmLog) getPredictions() int              { return gl.predictions }
func (gl *gmLog) getMeasurements() int             { return gl.measurements }
func (gl *gmLog) nothingChanged() bool             { return gl.measurements == 0 && gl.predictions == 0 }

func (gls gmLogs) logBool(
	access func(*gmLog) bool,
	header string,
	log func(string, ...interface{}),
) {
	var sb strings.Builder
	for _, gl := range gls {
		if access(gl) {
			if sb.Len() == 0 {
				sb.WriteString(header)
			} else {
				sb.WriteString(", ")
			}
			sb.WriteString(gl.gid)
		}
	}
	if sb.Len() > 0 {
		log(sb.String())
	}
}

func (gls gmLogs) logInt(
	access func(*gmLog) int,
	header string,
	log func(string, ...interface{}),
) {
	gs := make(gmLogs, 0, len(gls))
	for _, g := range gls {
		if access(g) > 0 {
			gs = append(gs, g)
		}
	}

	if len(gs) == 0 {
		return
	}

	sort.SliceStable(gs, func(i, j int) bool {
		return access(gs[i]) < access(gs[j])
	})

	var sb strings.Builder
	var last int

	for _, g := range gs {
		if c := access(g); c != last {
			if sb.Len() == 0 {
				sb.WriteString(header)
			} else {
				sb.WriteString("); ")
			}
			sb.WriteString(strconv.Itoa(c))
			sb.WriteString(" (")
			last = c
		} else {
			sb.WriteString(", ")
		}
		sb.WriteString(g.gid)
	}

	sb.WriteByte(')')
	log(sb.String())
}

func (gls gmLogs) logString(
	access func(*gmLog) []string,
	header string,
	log func(string, ...interface{}),
) {
	var sb strings.Builder
	for _, gl := range gls {
		if s := access(gl); len(s) > 0 {
			if sb.Len() == 0 {
				sb.WriteString(header)
			} else {
				sb.WriteString(", ")
			}
			sb.WriteString(gl.gid)
			sb.WriteString(" (")
			for i, v := range s {
				if i > 0 {
					sb.WriteString("; ")
				}
				sb.WriteString(v)
			}
			sb.WriteByte(')')
		}
	}
	if sb.Len() > 0 {
		log(sb.String())
	}
}

// logFinder is a helper to search recently used logs
// or create a new one if no log for a given gauge
// existed before.
func logFinder(logs *gmLogs) func(string) *gmLog {
	var lastLog *gmLog
	return func(gid string) *gmLog {
		if lastLog != nil && lastLog.gid == gid {
			return lastLog
		}
		if ll := logs.find(gid); ll != nil {
			lastLog = ll
			return ll
		}
		lastLog = &gmLog{gid: gid}
		*logs = append(*logs, lastLog)
		return lastLog
	}
}

func doForGM(
	ctx context.Context,
	gauges []string,
	fetch func() ([]*nts.RIS_Message_Type, error),
	conn *sql.Conn,
	feedback Feedback,
) ([]string, error) {

	insertGPStmt, err := conn.PrepareContext(ctx, insertGPSQL)
	if err != nil {
		return nil, err
	}
	defer insertGPStmt.Close()

	insertGMStmt, err := conn.PrepareContext(ctx, insertGMSQL)
	if err != nil {
		return nil, err
	}
	defer insertGMStmt.Close()

	// lookup to see if data can be imported for gauge
	isKnown := func(s string) bool {
		idx := sort.SearchStrings(gauges, s)
		return idx < len(gauges) && gauges[idx] == s
	}

	result, err := fetch()
	if err != nil {
		return nil, err
	}

	var gids []string

	// To prevent spamming the log actual logging
	// is defered to be presented in an aggregated way.
	var logs gmLogs
	defer func() { logs.logging(feedback) }()

	findLog := logFinder(&logs)

	for _, msg := range result {
		for _, wrm := range msg.Wrm {
			curr := string(*wrm.Geo_object.Id)
			curr = strings.TrimSpace(curr)
			currIsrs, err := models.IsrsFromString(curr)
			if err != nil {
				feedback.Warn("Invalid ISRS code %v", err)
				continue
			}
			logger := findLog(curr)
			gids = append(gids, curr)

			if !isKnown(curr) {
				logger.unknown = true
				continue
			}

			var referenceCode string
			if wrm.Reference_code == nil {
				logger.assumedZPG = true
				referenceCode = "ZPG"
			} else {
				referenceCode = string(*wrm.Reference_code)
			}

			for _, measure := range wrm.Measure {
				var unit string
				if *measure.Measure_code != nts.Measure_code_enumWAL {
					logger.ignoreMeasureCode(*measure.Measure_code)
					continue
				}
				if measure.Unit == nil {
					logger.assumedCM++
					unit = "cm"
				} else {
					unit = string(*measure.Unit)
				}

				if measure.Value == nil {
					logger.missingValues = append(
						logger.missingValues,
						measure.Measuredate.Time.Format(time.RFC3339))
					continue
				}

				convert, err := rescale(unit)
				if err != nil {
					logger.addRescaleError(err)
					continue
				}
				convert(measure.Value)
				convert(measure.Value_min)
				convert(measure.Value_max)

				// -99999 is used by some gauges to signal an error
				if *measure.Value == -99999 {
					logger.badValues++
					continue
				}

				var dummy int
				if measure.Predicted {
					confInterval := pgtype.Numrange{
						Lower:     pgtype.Numeric{Status: pgtype.Null},
						Upper:     pgtype.Numeric{Status: pgtype.Null},
						LowerType: pgtype.Inclusive,
						UpperType: pgtype.Inclusive,
						Status:    pgtype.Null,
					}
					if measure.Value_min != nil && measure.Value_max != nil {
						valueMin := pgtype.Numeric{Status: pgtype.Null}
						valueMax := pgtype.Numeric{Status: pgtype.Null}
						valueMin.Set(measure.Value_min)
						valueMax.Set(measure.Value_max)
						confInterval = pgtype.Numrange{
							Lower:     valueMin,
							Upper:     valueMax,
							LowerType: pgtype.Inclusive,
							UpperType: pgtype.Inclusive,
							Status:    pgtype.Present,
						}
					}
					err = insertGPStmt.QueryRowContext(
						ctx,
						currIsrs.CountryCode,
						currIsrs.LoCode,
						currIsrs.FairwaySection,
						currIsrs.Orc,
						currIsrs.Hectometre,
						measure.Measuredate.Time,
						msg.Identification.From,
						msg.Identification.Language_code,
						msg.Identification.Country_code,
						msg.Identification.Date_issue.Time,
						referenceCode,
						measure.Value,
						&confInterval,
						msg.Identification.Date_issue.Time,
						msg.Identification.Originator,
					).Scan(&dummy)
					switch {
					case err == sql.ErrNoRows:
						// thats expected, nothing to do
					case err != nil:
						feedback.Error(pgxutils.ReadableError{Err: err}.Error())
					default:
						logger.predictions++
					}
				} else {
					err = insertGMStmt.QueryRowContext(
						ctx,
						currIsrs.CountryCode,
						currIsrs.LoCode,
						currIsrs.FairwaySection,
						currIsrs.Orc,
						currIsrs.Hectometre,
						measure.Measuredate.Time,
						msg.Identification.From,
						msg.Identification.Language_code,
						msg.Identification.Country_code,
						msg.Identification.Date_issue.Time,
						referenceCode,
						measure.Value,
						msg.Identification.Date_issue.Time,
						msg.Identification.Originator,
					).Scan(&dummy)
					switch {
					case err == sql.ErrNoRows:
						// thats expected, nothing to do
					case err != nil:
						feedback.Error(pgxutils.ReadableError{Err: err}.Error())
					default:
						logger.measurements++
					}
				}
			}
		}
	}
	return gids, nil
}