view pkg/imports/gm.go @ 2248:cce158db02b0

Input area as multipolygons to generate area from stretch Doing so is more resilient against invalid geometries and gives more plausible results if tributaries are involved (i.e. does not include the adjacent area of the tributary in the result).
author Tom Gottfried <tom@intevation.de>
date Wed, 13 Feb 2019 16:48:52 +0100
parents 7d784840a9a7
children b8972e4671fa
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Raimund Renkert <raimund.renkert@intevation.de>
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"fmt"
	"sort"
	"strings"
	"time"

	"gemma.intevation.de/gemma/pkg/models"
	"gemma.intevation.de/gemma/pkg/soap/nts"
)

// GaugeMeasurement is an import job to import
// gauges measurement data from a NtS SOAP service.
type GaugeMeasurement struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

// GMJobKind is the import queue type identifier.
const GMJobKind JobKind = "gm"

const (
	listGaugesSQL = `
SELECT
  (location).country_code,
  (location).locode,
  (location).fairway_section,
  (location).orc,
  (location).hectometre
FROM waterway.gauges
WHERE (location).country_code = users.current_user_country()`

	// TODO: Currently this statement updates existing data sets. In case we want
	// 'historization' we need to develop an other mechanism to keep existing
	// data.
	insertGMSQL = `
INSERT INTO waterway.gauge_measurements (
  fk_gauge_id,
  measure_date,
  sender,
  language_code,
  country_code,
  date_issue,
  reference_code,
  water_level,
  predicted,
  is_waterlevel,
  value_min,
  value_max,
  date_info,
  source_organization,
  staging_done
) VALUES(
  ($1, $2, $3, $4, $5),
  $6,
  $7,
  $8,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14,
  $15,
  $16,
  $17,
  $18,
  $19
)
ON CONFLICT ON CONSTRAINT gauge_measurements_fk_gauge_id_measure_date_staging_done_key
DO UPDATE SET
country_code = EXCLUDED.country_code,
sender = EXCLUDED.sender,
language_code = EXCLUDED.language_code,
date_issue = EXCLUDED.date_issue,
reference_code= EXCLUDED.reference_code,
water_level = EXCLUDED.water_level,
predicted = EXCLUDED.predicted,
is_waterlevel = EXCLUDED.is_waterlevel,
value_min = EXCLUDED.value_min,
value_max = EXCLUDED.value_max,
date_info = EXCLUDED.date_info,
source_organization = EXCLUDED.source_organization
RETURNING id
`
)

type gmJobCreator struct{}

func init() {
	RegisterJobCreator(GMJobKind, gmJobCreator{})
}

func (gmJobCreator) Description() string { return "gauge measurements" }

func (gmJobCreator) Create() Job { return new(GaugeMeasurement) }

func (gmJobCreator) Depends() []string {
	return []string{
		"gauges",
		"gauge_measurements",
	}
}

func (gmJobCreator) AutoAccept() bool { return true }

// StageDone moves the imported gauge measurement out of the staging area.
// Currently doing nothing.
func (gmJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
	return nil
}

// CleanUp of a gauge measurement import is a NOP.
func (*GaugeMeasurement) CleanUp() error { return nil }

// Do executes the actual bottleneck import.
func (gm *GaugeMeasurement) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	fetch := func() ([]*nts.RIS_Message_Type, error) {
		client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil)

		mt := nts.Message_type_typeWRM

		req := &nts.Get_messages_query{
			Message_type: &mt,
		}

		resp, err := client.Get_messages(req)
		if err != nil {
			return nil, err
		}

		result := resp.Result_message
		if result == nil {
			for _, e := range resp.Result_error {
				if e != nil {
					feedback.Error("Error code: %s", *e)
				} else {
					feedback.Error("Unknown error")
				}
			}
		}
		return result, nil
	}

	return storeGaugeMeasurements(
		ctx,
		importID,
		fetch,
		conn,
		feedback,
	)
}

func loadGauges(ctx context.Context, tx *sql.Tx) ([]string, error) {

	rows, err := tx.QueryContext(ctx, listGaugesSQL)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var gauges []string

	for rows.Next() {
		var g models.Isrs
		if err = rows.Scan(
			&g.CountryCode,
			&g.LoCode,
			&g.FairwaySection,
			&g.Orc,
			&g.Hectometre,
		); err != nil {
			return nil, err
		}
		gauges = append(gauges, g.String())
	}

	if err = rows.Err(); err != nil {
		return nil, err
	}

	sort.Strings(gauges)

	return gauges, nil
}

func storeGaugeMeasurements(
	ctx context.Context,
	importID int64,
	fetch func() ([]*nts.RIS_Message_Type, error),
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	// Get available gauges from database for use as filter in SOAP request
	gauges, err := loadGauges(ctx, tx)
	if err != nil {
		return nil, err
	}

	// TODO get date_issue for selected gauges
	gids, err := doForGM(ctx, gauges, fetch, tx, feedback)
	if err != nil {
		feedback.Error("Error processing %d gauges: %v", len(gauges), err)
		return nil, err
	}

	if len(gids) == 0 {
		feedback.Info("No new gauge measurements found")
		return nil, UnchangedError("No new gauge measurements found")
	}

	if err = tx.Commit(); err != nil {
		feedback.Info(
			"Importing gauge measurements failed after %s", time.Since(start))
		return nil, err
	}
	feedback.Info(
		"Importing gauge measurements successfully took %s", time.Since(start))

	// TODO: needs to be filled more useful.
	summary := struct {
		GaugeMeasuremets []string `json:"gaugeMeasurements"`
	}{
		GaugeMeasuremets: gids,
	}
	return &summary, err
}

// rescale returns a scaling function to bring the unit all to cm.
func rescale(unit string) (func(float32) float32, error) {

	var scale float32

	switch strings.ToLower(unit) {
	case "mm":
		scale = 0.1
	case "cm":
		scale = 1.0
	case "dm":
		scale = 10.0
	case "m":
		scale = 100.0
	case "hm":
		scale = 10000.0
	case "km":
		scale = 100000.0
	default:
		return nil, fmt.Errorf("unknown unit '%s'", unit)
	}

	fn := func(x float32) float32 { return scale * x }
	return fn, nil
}

func doForGM(
	ctx context.Context,
	gauges []string,
	fetch func() ([]*nts.RIS_Message_Type, error),
	tx *sql.Tx,
	feedback Feedback,
) ([]string, error) {

	insertStmt, err := tx.PrepareContext(ctx, insertGMSQL)
	if err != nil {
		return nil, err
	}
	defer insertStmt.Close()

	// lookup to see if we have gauges in the database.
	isKnown := func(s string) bool {
		idx := sort.SearchStrings(gauges, s)
		return idx < len(gauges) && gauges[idx] == s
	}

	result, err := fetch()
	if err != nil {
		return nil, err
	}

	var gids []string
	for _, msg := range result {
		var gid int64
		for _, wrm := range msg.Wrm {
			curr := string(*wrm.Geo_object.Id)
			currIsrs, err := models.IsrsFromString(curr)
			if err != nil {
				feedback.Warn("Invalid ISRS code %v", err)
				continue
			}
			feedback.Info("Found measurements for %s", curr)
			if !isKnown(curr) {
				feedback.Warn("Gauge '%s' is not in database.", curr)
				continue
			}

			var referenceCode string
			if wrm.Reference_code == nil {
				feedback.Info("'Reference_code' not specified. Assuming 'ZPG'")
				referenceCode = "ZPG"
			} else {
				referenceCode = string(*wrm.Reference_code)
			}
			for _, measure := range wrm.Measure {
				var unit string
				if measure.Unit == nil {
					feedback.Info("'Unit' not specified. Assuming 'cm'")
					unit = "cm"
				} else {
					unit = string(*measure.Unit)
				}
				convert, err := rescale(unit)
				if err != nil {
					return nil, err
				}
				isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL
				err = insertStmt.QueryRowContext(
					ctx,
					currIsrs.CountryCode,
					currIsrs.LoCode,
					currIsrs.FairwaySection,
					currIsrs.Orc,
					currIsrs.Hectometre,
					measure.Measuredate,
					msg.Identification.From,
					msg.Identification.Language_code,
					msg.Identification.Country_code,
					msg.Identification.Date_issue,
					referenceCode,
					convert(measure.Value),
					measure.Predicted,
					isWaterlevel,
					convert(measure.Value_min),
					convert(measure.Value_max),
					msg.Identification.Date_issue,
					msg.Identification.Originator,
					true, // staging_done
				).Scan(&gid)
				if err != nil {
					return nil, err
				}
			}
			feedback.Info("Inserted %d measurements for %s",
				len(wrm.Measure), curr)
			gids = append(gids, curr)
		}
	}
	return gids, nil
}