view pkg/imports/gm.go @ 5591:0011f50cf216 surveysperbottleneckid

Removed no longer used alternative api for surveys/ endpoint. As bottlenecks in the summary for SR imports are now identified by their id and no longer by the (not guarantied to be unique!) name, there is no longer the need to request survey data by the name+date tuple (which isn't reliable anyway). So the workaround was now reversed.
author Sascha Wilde <wilde@sha-bang.de>
date Wed, 06 Apr 2022 13:30:29 +0200
parents f2204f91d286
children 7724bb582e80
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Raimund Renkert <raimund.renkert@intevation.de>
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Tom Gottfried <tom.gottfried@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"fmt"
	"sort"
	"strconv"
	"strings"
	"time"

	"github.com/jackc/pgx/pgtype"

	"gemma.intevation.de/gemma/pkg/log"
	"gemma.intevation.de/gemma/pkg/models"
	"gemma.intevation.de/gemma/pkg/pgxutils"
	"gemma.intevation.de/gemma/pkg/soap/nts"
)

// GaugeMeasurement is an import job to import
// gauges measurement data from a NtS SOAP service.
type GaugeMeasurement struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

// Description gives a short info about relevant facts of this import.
func (gm *GaugeMeasurement) Description([]string) (string, error) {
	return gm.URL, nil
}

// GMJobKind is the import queue type identifier.
const GMJobKind JobKind = "gm"

const (
	listGaugesSQL = `
SELECT
  (location).country_code,
  (location).locode,
  (location).fairway_section,
  (location).orc,
  (location).hectometre
FROM waterway.gauges
WHERE (location).country_code = (
    SELECT country FROM users.list_users WHERE username = current_user)
  OR pg_has_role('sys_admin', 'MEMBER')
`

	// Note: we do not expect corrections of data through this service.  So
	// any constraint conflicts are triggered by redundant data which
	// can be dropped.
	insertGMSQL = `
INSERT INTO waterway.gauge_measurements (
  location,
  measure_date,
  sender,
  language_code,
  country_code,
  date_issue,
  reference_code,
  water_level,
  date_info,
  source_organization,
  staging_done
) VALUES (
  ($1, $2, $3, $4, $5),
  $6,
  $7,
  $8,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14,
  true
)
ON CONFLICT DO NOTHING
RETURNING 1
`

	insertGPSQL = `
INSERT INTO waterway.gauge_predictions (
  location,
  measure_date,
  sender,
  language_code,
  country_code,
  date_issue,
  reference_code,
  water_level,
  conf_interval,
  date_info,
  source_organization
) VALUES(
  ($1, $2, $3, $4, $5),
  $6,
  $7,
  $8,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14,
  $15
)
ON CONFLICT DO NOTHING
RETURNING 1
`
)

type gmJobCreator struct{}

func init() {
	RegisterJobCreator(GMJobKind, gmJobCreator{})
}

func (gmJobCreator) Description() string { return "gauge measurements" }

func (gmJobCreator) Create() Job { return new(GaugeMeasurement) }

func (gmJobCreator) Depends() [2][]string {
	return [2][]string{
		{"gauge_measurements"},
		{"gauges"},
	}
}

func (gmJobCreator) AutoAccept() bool { return true }

// StageDone is a NOP for gauge measurements imports.
func (gmJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error {
	return nil
}

// CleanUp of a gauge measurement import is a NOP.
func (*GaugeMeasurement) CleanUp() error { return nil }

// Do executes the actual import.
func (gm *GaugeMeasurement) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	fetch := func() ([]*nts.RIS_Message_Type, error) {
		client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil)

		mt := nts.Message_type_typeWRM
		now := time.Now()
		dis := []*nts.Date_pair{{
			Date_start: nts.Date{Time: now.AddDate(0, 0, -1)},
			Date_end:   nts.Date{Time: now.AddDate(0, 0, +1)},
		}}

		req := &nts.Get_messages_query{
			Message_type: &mt,
			Dates_issue:  dis,
		}

		const maxTries = 3

		tries := 0

	again:
		resp, err := client.Get_messages(req)

		if err != nil {
			if t, ok := err.(interface{ Timeout() bool }); ok && t.Timeout() && tries < maxTries {
				log.Warnln("NtS SOAP request timed out. Trying again.")
				tries++
				goto again
			}
			return nil, fmt.Errorf(
				"Error requesting NtS service: %v", err)
		}

		result := resp.Result_message
		if result == nil {
			for _, e := range resp.Result_error {
				if e != nil {
					feedback.Error("Error code: %s", *e)
				} else {
					feedback.Error("Unknown error")
				}
			}
		}
		return result, nil
	}

	return storeGaugeMeasurements(
		ctx,
		importID,
		fetch,
		conn,
		feedback,
	)
}

func loadGauges(ctx context.Context, conn *sql.Conn) ([]string, error) {

	rows, err := conn.QueryContext(ctx, listGaugesSQL)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var gauges []string

	for rows.Next() {
		var g models.Isrs
		if err = rows.Scan(
			&g.CountryCode,
			&g.LoCode,
			&g.FairwaySection,
			&g.Orc,
			&g.Hectometre,
		); err != nil {
			return nil, err
		}
		gauges = append(gauges, g.String())
	}

	if err = rows.Err(); err != nil {
		return nil, err
	}

	if len(gauges) == 0 {
		return nil, UnchangedError(
			"No gauges for which measurements can be imported in database")
	}

	sort.Strings(gauges)

	return gauges, nil
}

func storeGaugeMeasurements(
	ctx context.Context,
	importID int64,
	fetch func() ([]*nts.RIS_Message_Type, error),
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	// Get gauges from database, for which user is allowed to import data
	gauges, err := loadGauges(ctx, conn)
	if err != nil {
		return nil, err
	}

	gids, err := doForGM(ctx, gauges, fetch, conn, feedback)
	if err != nil {
		return nil, err
	}

	if len(gids) == 0 {
		return nil, UnchangedError("No new gauge measurements found")
	}

	feedback.Info(
		"Importing gauge measurements took %s", time.Since(start))

	// TODO: needs to be filled more useful.
	summary := struct {
		GaugeMeasuremets []string `json:"gaugeMeasurements"`
	}{
		GaugeMeasuremets: gids,
	}
	return &summary, err
}

// rescale returns a scaling function to bring the unit all to cm.
func rescale(unit string) (func(*float32), error) {

	var scale float32

	switch strings.ToLower(unit) {
	case "mm":
		scale = 0.1
	case "cm":
		scale = 1.0
	case "dm":
		scale = 10.0
	case "m":
		scale = 100.0
	case "hm":
		scale = 10000.0
	case "km":
		scale = 100000.0
	default:
		return nil, fmt.Errorf("unknown unit '%s'", unit)
	}

	fn := func(x *float32) {
		if x != nil {
			*x *= scale
		}
	}
	return fn, nil
}

type gmLog struct {
	gid                 string
	unknown             bool
	assumedZPG          bool
	ignoredMeasureCodes []string
	rescaleErrors       []string
	missingValues       []string
	assumedCM           int
	badValues           int
	measurements        int
	predictions         int
}

type gmLogs []*gmLog

func extend(haystack []string, needle string) []string {
	for _, straw := range haystack {
		if straw == needle {
			return haystack
		}
	}
	return append(haystack, needle)
}

func (gl *gmLog) addRescaleError(err error) {
	gl.rescaleErrors = extend(gl.rescaleErrors, err.Error())
}

func (gl *gmLog) ignoreMeasureCode(mc nts.Measure_code_enum) {
	gl.ignoredMeasureCodes = extend(gl.ignoredMeasureCodes, string(mc))
}

func (gls gmLogs) find(gid string) *gmLog {
	for _, gl := range gls {
		if gl.gid == gid {
			return gl
		}
	}
	return nil
}

func (gls gmLogs) logging(feedback Feedback) {

	gls.logBool(
		(*gmLog).getUnknown,
		"Cannot find following gauges: ",
		feedback.Warn)

	gls.logBool(
		(*gmLog).getAssumedZPG,
		"'Reference_code' not specified. Assuming 'ZPG': ",
		feedback.Warn)

	gls.logInt(
		(*gmLog).getAssumedCM,
		"'Unit' not specified. Assuming 'cm': ",
		feedback.Warn)

	gls.logInt(
		(*gmLog).getBadValues,
		"Ignored measurements with value -99999: ",
		feedback.Warn)

	gls.logString(
		(*gmLog).getMissingValues,
		"Missing mandatory values: ",
		feedback.Warn)

	gls.logString(
		(*gmLog).getRescaleErrors,
		"Cannot convert units: ",
		feedback.Error)

	gls.logString(
		(*gmLog).getRescaleErrors,
		"Ignored measure codes: ",
		feedback.Warn)

	gls.logInt(
		(*gmLog).getPredictions,
		"New predictions: ",
		feedback.Info)

	gls.logInt(
		(*gmLog).getMeasurements,
		"New measurements: ",
		feedback.Info)

	gls.logBool(
		(*gmLog).nothingChanged,
		"No changes for: ",
		feedback.Info)
}

func (gl *gmLog) getAssumedZPG() bool              { return gl.assumedZPG }
func (gl *gmLog) getUnknown() bool                 { return gl.unknown }
func (gl *gmLog) getIgnoredMeasureCodes() []string { return gl.ignoredMeasureCodes }
func (gl *gmLog) getRescaleErrors() []string       { return gl.rescaleErrors }
func (gl *gmLog) getMissingValues() []string       { return gl.missingValues }
func (gl *gmLog) getAssumedCM() int                { return gl.assumedCM }
func (gl *gmLog) getBadValues() int                { return gl.badValues }
func (gl *gmLog) getPredictions() int              { return gl.predictions }
func (gl *gmLog) getMeasurements() int             { return gl.measurements }
func (gl *gmLog) nothingChanged() bool             { return gl.measurements == 0 && gl.predictions == 0 }

func (gls gmLogs) logBool(
	access func(*gmLog) bool,
	header string,
	log func(string, ...interface{}),
) {
	var sb strings.Builder
	for _, gl := range gls {
		if access(gl) {
			if sb.Len() == 0 {
				sb.WriteString(header)
			} else {
				sb.WriteString(", ")
			}
			sb.WriteString(gl.gid)
		}
	}
	if sb.Len() > 0 {
		log(sb.String())
	}
}

func (gls gmLogs) logInt(
	access func(*gmLog) int,
	header string,
	log func(string, ...interface{}),
) {
	gs := make(gmLogs, 0, len(gls))
	for _, g := range gls {
		if access(g) > 0 {
			gs = append(gs, g)
		}
	}

	if len(gs) == 0 {
		return
	}

	sort.SliceStable(gs, func(i, j int) bool {
		return access(gs[i]) < access(gs[j])
	})

	var sb strings.Builder
	var last int

	for _, g := range gs {
		if c := access(g); c != last {
			if sb.Len() == 0 {
				sb.WriteString(header)
			} else {
				sb.WriteString("); ")
			}
			sb.WriteString(strconv.Itoa(c))
			sb.WriteString(" (")
			last = c
		} else {
			sb.WriteString(", ")
		}
		sb.WriteString(g.gid)
	}

	sb.WriteByte(')')
	log(sb.String())
}

func (gls gmLogs) logString(
	access func(*gmLog) []string,
	header string,
	log func(string, ...interface{}),
) {
	var sb strings.Builder
	for _, gl := range gls {
		if s := access(gl); len(s) > 0 {
			if sb.Len() == 0 {
				sb.WriteString(header)
			} else {
				sb.WriteString(", ")
			}
			sb.WriteString(gl.gid)
			sb.WriteString(" (")
			for i, v := range s {
				if i > 0 {
					sb.WriteString("; ")
				}
				sb.WriteString(v)
			}
			sb.WriteByte(')')
		}
	}
	if sb.Len() > 0 {
		log(sb.String())
	}
}

// logFinder is a helper to search recently used logs
// or create a new one if no log for a given gauge
// existed before.
func logFinder(logs *gmLogs) func(string) *gmLog {
	var lastLog *gmLog
	return func(gid string) *gmLog {
		if lastLog != nil && lastLog.gid == gid {
			return lastLog
		}
		if ll := logs.find(gid); ll != nil {
			lastLog = ll
			return ll
		}
		lastLog = &gmLog{gid: gid}
		*logs = append(*logs, lastLog)
		return lastLog
	}
}

func doForGM(
	ctx context.Context,
	gauges []string,
	fetch func() ([]*nts.RIS_Message_Type, error),
	conn *sql.Conn,
	feedback Feedback,
) ([]string, error) {

	insertGPStmt, err := conn.PrepareContext(ctx, insertGPSQL)
	if err != nil {
		return nil, err
	}
	defer insertGPStmt.Close()

	insertGMStmt, err := conn.PrepareContext(ctx, insertGMSQL)
	if err != nil {
		return nil, err
	}
	defer insertGMStmt.Close()

	// lookup to see if data can be imported for gauge
	isKnown := func(s string) bool {
		idx := sort.SearchStrings(gauges, s)
		return idx < len(gauges) && gauges[idx] == s
	}

	result, err := fetch()
	if err != nil {
		return nil, err
	}

	var gids []string

	// To prevent spamming the log actual logging
	// is defered to be presented in an aggregated way.
	var logs gmLogs
	defer func() { logs.logging(feedback) }()

	findLog := logFinder(&logs)

	for _, msg := range result {
		for _, wrm := range msg.Wrm {
			curr := string(*wrm.Geo_object.Id)
			curr = strings.TrimSpace(curr)
			currIsrs, err := models.IsrsFromString(curr)
			if err != nil {
				feedback.Warn("Invalid ISRS code %v", err)
				continue
			}
			logger := findLog(curr)
			gids = append(gids, curr)

			if !isKnown(curr) {
				logger.unknown = true
				continue
			}

			var referenceCode string
			if wrm.Reference_code == nil {
				logger.assumedZPG = true
				referenceCode = "ZPG"
			} else {
				referenceCode = string(*wrm.Reference_code)
			}

			for _, measure := range wrm.Measure {
				var unit string
				if *measure.Measure_code != nts.Measure_code_enumWAL {
					logger.ignoreMeasureCode(*measure.Measure_code)
					continue
				}
				if measure.Unit == nil {
					logger.assumedCM++
					unit = "cm"
				} else {
					unit = string(*measure.Unit)
				}

				if measure.Value == nil {
					logger.missingValues = append(
						logger.missingValues,
						measure.Measuredate.Time.Format(time.RFC3339))
					continue
				}

				convert, err := rescale(unit)
				if err != nil {
					logger.addRescaleError(err)
					continue
				}
				convert(measure.Value)
				convert(measure.Value_min)
				convert(measure.Value_max)

				// -99999 is used by some gauges to signal an error
				if *measure.Value == -99999 {
					logger.badValues++
					continue
				}

				var dummy int
				if measure.Predicted {
					confInterval := pgtype.Numrange{
						Lower:     pgtype.Numeric{Status: pgtype.Null},
						Upper:     pgtype.Numeric{Status: pgtype.Null},
						LowerType: pgtype.Inclusive,
						UpperType: pgtype.Inclusive,
						Status:    pgtype.Null,
					}
					if measure.Value_min != nil && measure.Value_max != nil {
						valueMin := pgtype.Numeric{Status: pgtype.Null}
						valueMax := pgtype.Numeric{Status: pgtype.Null}
						valueMin.Set(measure.Value_min)
						valueMax.Set(measure.Value_max)
						confInterval = pgtype.Numrange{
							Lower:     valueMin,
							Upper:     valueMax,
							LowerType: pgtype.Inclusive,
							UpperType: pgtype.Inclusive,
							Status:    pgtype.Present,
						}
					}
					err = insertGPStmt.QueryRowContext(
						ctx,
						currIsrs.CountryCode,
						currIsrs.LoCode,
						currIsrs.FairwaySection,
						currIsrs.Orc,
						currIsrs.Hectometre,
						measure.Measuredate.Time,
						msg.Identification.From,
						msg.Identification.Language_code,
						msg.Identification.Country_code,
						msg.Identification.Date_issue.Time,
						referenceCode,
						measure.Value,
						&confInterval,
						msg.Identification.Date_issue.Time,
						msg.Identification.Originator,
					).Scan(&dummy)
					switch {
					case err == sql.ErrNoRows:
						// thats expected, nothing to do
					case err != nil:
						feedback.Error(pgxutils.ReadableError{Err: err}.Error())
					default:
						logger.predictions++
					}
				} else {
					err = insertGMStmt.QueryRowContext(
						ctx,
						currIsrs.CountryCode,
						currIsrs.LoCode,
						currIsrs.FairwaySection,
						currIsrs.Orc,
						currIsrs.Hectometre,
						measure.Measuredate.Time,
						msg.Identification.From,
						msg.Identification.Language_code,
						msg.Identification.Country_code,
						msg.Identification.Date_issue.Time,
						referenceCode,
						measure.Value,
						msg.Identification.Date_issue.Time,
						msg.Identification.Originator,
					).Scan(&dummy)
					switch {
					case err == sql.ErrNoRows:
						// thats expected, nothing to do
					case err != nil:
						feedback.Error(pgxutils.ReadableError{Err: err}.Error())
					default:
						logger.measurements++
					}
				}
			}
		}
	}
	return gids, nil
}