view pkg/imports/gm.go @ 3733:ec86a7155377 concave-hull

Estimated too large triangles as triangles which have an edge which is at least 3.5 times as long as the standard dev of the longest egde per inner triangle.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 24 Jun 2019 11:39:09 +0200
parents 9759355d7b90
children d7b9d5c0ebad
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Raimund Renkert <raimund.renkert@intevation.de>
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Tom Gottfried <tom.gottfried@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"fmt"
	"log"
	"sort"
	"strings"
	"time"

	"gemma.intevation.de/gemma/pkg/models"
	"gemma.intevation.de/gemma/pkg/soap/nts"
	"github.com/jackc/pgx/pgtype"
)

// GaugeMeasurement is an import job to import
// gauges measurement data from a NtS SOAP service.
type GaugeMeasurement struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

// GMJobKind is the import queue type identifier.
const GMJobKind JobKind = "gm"

const (
	listGaugesSQL = `
SELECT
  (location).country_code,
  (location).locode,
  (location).fairway_section,
  (location).orc,
  (location).hectometre
FROM waterway.gauges
WHERE (location).country_code = users.current_user_country()
  OR pg_has_role('sys_admin', 'MEMBER')
`

	// Note: we do not expect corrections of data through this service.  So
	// any constraint conflicts are triggered by redundant data which
	// can be dropped.
	insertGMSQL = `
INSERT INTO waterway.gauge_measurements (
  location,
  validity,
  measure_date,
  sender,
  language_code,
  country_code,
  date_issue,
  reference_code,
  water_level,
  date_info,
  source_organization,
  staging_done
) VALUES (
  ($1, $2, $3, $4, $5),
  COALESCE(
    (SELECT validity FROM waterway.gauges
       WHERE location
            = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
         AND validity @> CAST($6 AS timestamp with time zone)),
    tstzrange(NULL, NULL)),
  $6,
  $7,
  $8,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14,
  true
)
ON CONFLICT DO NOTHING
RETURNING 1
`

	insertGPSQL = `
INSERT INTO waterway.gauge_predictions (
  location,
  validity,
  measure_date,
  sender,
  language_code,
  country_code,
  date_issue,
  reference_code,
  water_level,
  conf_interval,
  date_info,
  source_organization
) VALUES(
  ($1, $2, $3, $4, $5),
  COALESCE(
    (SELECT validity FROM waterway.gauges
       WHERE location
            = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
         AND validity @> CAST($6 AS timestamp with time zone)),
    tstzrange(NULL, NULL)),
  $6,
  $7,
  $8,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14,
  $15
)
ON CONFLICT DO NOTHING
RETURNING 1
`
)

type gmJobCreator struct{}

func init() {
	RegisterJobCreator(GMJobKind, gmJobCreator{})
}

func (gmJobCreator) Description() string { return "gauge measurements" }

func (gmJobCreator) Create() Job { return new(GaugeMeasurement) }

func (gmJobCreator) Depends() [2][]string {
	return [2][]string{
		{"gauge_measurements"},
		{"gauges"},
	}
}

func (gmJobCreator) AutoAccept() bool { return true }

// StageDone moves the imported gauge measurement out of the staging area.
// Currently doing nothing.
func (gmJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
	return nil
}

// CleanUp of a gauge measurement import is a NOP.
func (*GaugeMeasurement) CleanUp() error { return nil }

// Do executes the actual bottleneck import.
func (gm *GaugeMeasurement) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	fetch := func() ([]*nts.RIS_Message_Type, error) {
		client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil)

		mt := nts.Message_type_typeWRM
		var dis []*nts.Date_pair
		dis = append(dis, &nts.Date_pair{
			Date_start: nts.Date{time.Now().Add(time.Duration(-24) * time.Hour)},
			Date_end:   nts.Date{time.Now()},
		})

		req := &nts.Get_messages_query{
			Message_type: &mt,
			Dates_issue:  dis,
		}

		const maxTries = 3

		tries := 0

	again:
		resp, err := client.Get_messages(req)

		if err != nil {
			if t, ok := err.(interface{ Timeout() bool }); ok && t.Timeout() && tries < maxTries {
				log.Println("warn: NtS SOAP request timed out. Trying again.")
				tries++
				goto again
			}
			return nil, fmt.Errorf(
				"Error requesting NtS service: %v", err)
		}

		result := resp.Result_message
		if result == nil {
			for _, e := range resp.Result_error {
				if e != nil {
					feedback.Error("Error code: %s", *e)
				} else {
					feedback.Error("Unknown error")
				}
			}
		}
		return result, nil
	}

	return storeGaugeMeasurements(
		ctx,
		importID,
		fetch,
		conn,
		feedback,
	)
}

func loadGauges(ctx context.Context, conn *sql.Conn) ([]string, error) {

	rows, err := conn.QueryContext(ctx, listGaugesSQL)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var gauges []string

	for rows.Next() {
		var g models.Isrs
		if err = rows.Scan(
			&g.CountryCode,
			&g.LoCode,
			&g.FairwaySection,
			&g.Orc,
			&g.Hectometre,
		); err != nil {
			return nil, err
		}
		gauges = append(gauges, g.String())
	}

	if err = rows.Err(); err != nil {
		return nil, err
	}

	if len(gauges) == 0 {
		return nil, UnchangedError(
			"No gauges for which measurements can be imported in database")
	}

	sort.Strings(gauges)

	return gauges, nil
}

func storeGaugeMeasurements(
	ctx context.Context,
	importID int64,
	fetch func() ([]*nts.RIS_Message_Type, error),
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	// Get gauges from database, for which user is allowed to import data
	gauges, err := loadGauges(ctx, conn)
	if err != nil {
		return nil, err
	}

	gids, err := doForGM(ctx, gauges, fetch, conn, feedback)
	if err != nil {
		return nil, err
	}

	if len(gids) == 0 {
		return nil, UnchangedError("No new gauge measurements found")
	}

	feedback.Info(
		"Importing gauge measurements took %s", time.Since(start))

	// TODO: needs to be filled more useful.
	summary := struct {
		GaugeMeasuremets []string `json:"gaugeMeasurements"`
	}{
		GaugeMeasuremets: gids,
	}
	return &summary, err
}

// rescale returns a scaling function to bring the unit all to cm.
func rescale(unit string) (func(*float32), error) {

	var scale float32

	switch strings.ToLower(unit) {
	case "mm":
		scale = 0.1
	case "cm":
		scale = 1.0
	case "dm":
		scale = 10.0
	case "m":
		scale = 100.0
	case "hm":
		scale = 10000.0
	case "km":
		scale = 100000.0
	default:
		return nil, fmt.Errorf("unknown unit '%s'", unit)
	}

	fn := func(x *float32) {
		if x != nil {
			*x *= scale
		}
	}
	return fn, nil
}

func doForGM(
	ctx context.Context,
	gauges []string,
	fetch func() ([]*nts.RIS_Message_Type, error),
	conn *sql.Conn,
	feedback Feedback,
) ([]string, error) {

	insertGPStmt, err := conn.PrepareContext(ctx, insertGPSQL)
	if err != nil {
		return nil, err
	}
	defer insertGPStmt.Close()

	insertGMStmt, err := conn.PrepareContext(ctx, insertGMSQL)
	if err != nil {
		return nil, err
	}
	defer insertGMStmt.Close()

	// lookup to see if data can be imported for gauge
	isKnown := func(s string) bool {
		idx := sort.SearchStrings(gauges, s)
		return idx < len(gauges) && gauges[idx] == s
	}

	result, err := fetch()
	if err != nil {
		return nil, err
	}

	var gids []string
	for _, msg := range result {
		for _, wrm := range msg.Wrm {
			curr := string(*wrm.Geo_object.Id)
			curr = strings.TrimSpace(curr)
			currIsrs, err := models.IsrsFromString(curr)
			if err != nil {
				feedback.Warn("Invalid ISRS code %v", err)
				continue
			}
			feedback.Info("Found measurements/predictions for %s", curr)
			if !isKnown(curr) {
				feedback.Warn("Cannot find gauge %q for import", curr)
				continue
			}

			var referenceCode string
			if wrm.Reference_code == nil {
				feedback.Info("'Reference_code' not specified. Assuming 'ZPG'")
				referenceCode = "ZPG"
			} else {
				referenceCode = string(*wrm.Reference_code)
			}

			newM, newP := 0, 0
			for _, measure := range wrm.Measure {
				var unit string
				if *measure.Measure_code != nts.Measure_code_enumWAL {
					feedback.Warn("Ignored message with measure_code %s",
						*measure.Measure_code)
					continue
				}
				if measure.Unit == nil {
					feedback.Info("'Unit' not specified. Assuming 'cm'")
					unit = "cm"
				} else {
					unit = string(*measure.Unit)
				}
				convert, err := rescale(unit)
				if err != nil {
					return nil, err
				}
				convert(measure.Value)
				convert(measure.Value_min)
				convert(measure.Value_max)

				var dummy int
				if measure.Predicted {
					confInterval := pgtype.Numrange{
						Lower:     pgtype.Numeric{Status: pgtype.Null},
						Upper:     pgtype.Numeric{Status: pgtype.Null},
						LowerType: pgtype.Inclusive,
						UpperType: pgtype.Inclusive,
						Status:    pgtype.Null,
					}
					if measure.Value_min != nil && measure.Value_max != nil {
						valueMin := pgtype.Numeric{Status: pgtype.Null}
						valueMax := pgtype.Numeric{Status: pgtype.Null}
						valueMin.Set(measure.Value_min)
						valueMax.Set(measure.Value_max)
						confInterval = pgtype.Numrange{
							Lower:     valueMin,
							Upper:     valueMax,
							LowerType: pgtype.Inclusive,
							UpperType: pgtype.Inclusive,
							Status:    pgtype.Present,
						}
					}
					err = insertGPStmt.QueryRowContext(
						ctx,
						currIsrs.CountryCode,
						currIsrs.LoCode,
						currIsrs.FairwaySection,
						currIsrs.Orc,
						currIsrs.Hectometre,
						measure.Measuredate.Time,
						msg.Identification.From,
						msg.Identification.Language_code,
						msg.Identification.Country_code,
						msg.Identification.Date_issue.Time,
						referenceCode,
						measure.Value,
						&confInterval,
						msg.Identification.Date_issue.Time,
						msg.Identification.Originator,
					).Scan(&dummy)
					switch {
					case err == sql.ErrNoRows:
						// thats expected, nothing to do
					case err != nil:
						feedback.Warn(handleError(err).Error())
					default:
						newP++
					}
				} else {
					if measure.Value == nil {
						feedback.Info("Missing value at %s. Ignored",
							measure.Measuredate.Format(time.RFC3339))
						continue
					}
					err = insertGMStmt.QueryRowContext(
						ctx,
						currIsrs.CountryCode,
						currIsrs.LoCode,
						currIsrs.FairwaySection,
						currIsrs.Orc,
						currIsrs.Hectometre,
						measure.Measuredate.Time,
						msg.Identification.From,
						msg.Identification.Language_code,
						msg.Identification.Country_code,
						msg.Identification.Date_issue.Time,
						referenceCode,
						measure.Value,
						msg.Identification.Date_issue.Time,
						msg.Identification.Originator,
					).Scan(&dummy)
					switch {
					case err == sql.ErrNoRows:
						// thats expected, nothing to do
					case err != nil:
						feedback.Warn(handleError(err).Error())
					default:
						newM++
					}
				}
			}
			feedback.Info("Inserted %d measurements for %s",
				newM, curr)
			feedback.Info("Inserted %d predictions for %s",
				newP, curr)
			gids = append(gids, curr)
		}
	}
	return gids, nil
}