view pkg/imports/gm.go @ 3277:232fc90e6ee2

Disentangle gauge measurements and predictions Representing both in one table has led to the necessity to make the distinction at many places such as statements, definitions of partial indexes and application code. At least in one place in the AGM import the distinction in application code was too late and measurements matching an approved measurement could have been missed.
author Tom Gottfried <tom@intevation.de>
date Wed, 15 May 2019 19:08:49 +0200
parents 3dee5cf16a58
children 831193935739
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Raimund Renkert <raimund.renkert@intevation.de>
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
//  * Tom Gottfried <tom.gottfried@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"fmt"
	"strings"
	"time"

	"gemma.intevation.de/gemma/pkg/models"
	"gemma.intevation.de/gemma/pkg/soap/nts"
	"github.com/jackc/pgx/pgtype"
)

// GaugeMeasurement is an import job to import
// gauges measurement data from a NtS SOAP service.
type GaugeMeasurement struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

// GMJobKind is the import queue type identifier.
const GMJobKind JobKind = "gm"

const (
	// Note: we do not expect corrections of data through this service.  So
	// any constraint conflicts are triggered by actual redundat data which
	// can be dropped.
	insertGMSQL = `
INSERT INTO waterway.gauge_measurements (
  fk_gauge_id,
  measure_date,
  sender,
  language_code,
  country_code,
  date_issue,
  reference_code,
  water_level,
  is_waterlevel,
  date_info,
  source_organization,
  staging_done
) VALUES(
  ($1, $2, $3, $4, $5),
  $6,
  $7,
  $8,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14,
  $15,
  true
)
ON CONFLICT DO NOTHING
RETURNING 1
`

	insertGPSQL = `
INSERT INTO waterway.gauge_predictions (
  fk_gauge_id,
  measure_date,
  sender,
  language_code,
  country_code,
  date_issue,
  reference_code,
  water_level,
  is_waterlevel,
  conf_interval,
  date_info,
  source_organization
) VALUES(
  ($1, $2, $3, $4, $5),
  $6,
  $7,
  $8,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14,
  $15,
  $16
)
ON CONFLICT DO NOTHING
RETURNING 1
`
)

type gmJobCreator struct{}

func init() {
	RegisterJobCreator(GMJobKind, gmJobCreator{})
}

func (gmJobCreator) Description() string { return "gauge measurements" }

func (gmJobCreator) Create() Job { return new(GaugeMeasurement) }

func (gmJobCreator) Depends() [2][]string {
	return [2][]string{
		{"gauge_measurements"},
		{"gauges"},
	}
}

func (gmJobCreator) AutoAccept() bool { return true }

// StageDone moves the imported gauge measurement out of the staging area.
// Currently doing nothing.
func (gmJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
	return nil
}

// CleanUp of a gauge measurement import is a NOP.
func (*GaugeMeasurement) CleanUp() error { return nil }

// Do executes the actual bottleneck import.
func (gm *GaugeMeasurement) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	fetch := func() ([]*nts.RIS_Message_Type, error) {
		client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil)

		mt := nts.Message_type_typeWRM

		req := &nts.Get_messages_query{
			Message_type: &mt,
		}

		resp, err := client.Get_messages(req)
		if err != nil {
			return nil, err
		}

		result := resp.Result_message
		if result == nil {
			for _, e := range resp.Result_error {
				if e != nil {
					feedback.Error("Error code: %s", *e)
				} else {
					feedback.Error("Unknown error")
				}
			}
		}
		return result, nil
	}

	return storeGaugeMeasurements(
		ctx,
		importID,
		fetch,
		conn,
		feedback,
	)
}

func storeGaugeMeasurements(
	ctx context.Context,
	importID int64,
	fetch func() ([]*nts.RIS_Message_Type, error),
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	// TODO get date_issue for selected gauges
	gids, err := doForGM(ctx, fetch, conn, feedback)
	if err != nil {
		feedback.Error("Error processing gauges: %v", err)
		return nil, err
	}

	if len(gids) == 0 {
		return nil, UnchangedError("No new gauge measurements found")
	}

	feedback.Info(
		"Importing gauge measurements took %s", time.Since(start))

	// TODO: needs to be filled more useful.
	summary := struct {
		GaugeMeasuremets []string `json:"gaugeMeasurements"`
	}{
		GaugeMeasuremets: gids,
	}
	return &summary, err
}

// rescale returns a scaling function to bring the unit all to cm.
func rescale(unit string) (func(*float32), error) {

	var scale float32

	switch strings.ToLower(unit) {
	case "mm":
		scale = 0.1
	case "cm":
		scale = 1.0
	case "dm":
		scale = 10.0
	case "m":
		scale = 100.0
	case "hm":
		scale = 10000.0
	case "km":
		scale = 100000.0
	default:
		return nil, fmt.Errorf("unknown unit '%s'", unit)
	}

	fn := func(x *float32) {
		if x != nil {
			*x *= scale
		}
	}
	return fn, nil
}

func doForGM(
	ctx context.Context,
	fetch func() ([]*nts.RIS_Message_Type, error),
	conn *sql.Conn,
	feedback Feedback,
) ([]string, error) {

	insertGPStmt, err := conn.PrepareContext(ctx, insertGPSQL)
	if err != nil {
		return nil, err
	}
	defer insertGPStmt.Close()

	insertGMStmt, err := conn.PrepareContext(ctx, insertGMSQL)
	if err != nil {
		return nil, err
	}
	defer insertGMStmt.Close()

	result, err := fetch()
	if err != nil {
		return nil, err
	}

	var gids []string
	for _, msg := range result {
		var dummy int
		for _, wrm := range msg.Wrm {
			curr := string(*wrm.Geo_object.Id)
			currIsrs, err := models.IsrsFromString(curr)
			if err != nil {
				feedback.Warn("Invalid ISRS code %v", err)
				continue
			}
			feedback.Info("Found measurements/predictions for %s", curr)

			var referenceCode string
			if wrm.Reference_code == nil {
				feedback.Info("'Reference_code' not specified. Assuming 'ZPG'")
				referenceCode = "ZPG"
			} else {
				referenceCode = string(*wrm.Reference_code)
			}

			newM, newP := 0, 0
			for _, measure := range wrm.Measure {
				var unit string
				if measure.Unit == nil {
					feedback.Info("'Unit' not specified. Assuming 'cm'")
					unit = "cm"
				} else {
					unit = string(*measure.Unit)
				}
				convert, err := rescale(unit)
				if err != nil {
					return nil, err
				}
				convert(measure.Value)
				convert(measure.Value_min)
				convert(measure.Value_max)

				isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL

				if measure.Predicted {
					var confInterval pgtype.Numrange
					if measure.Value_min != nil && measure.Value_max != nil {
						var valueMin, valueMax pgtype.Numeric
						valueMin.Set(measure.Value_min)
						valueMax.Set(measure.Value_max)
						confInterval = pgtype.Numrange{
							Lower:     valueMin,
							Upper:     valueMax,
							LowerType: pgtype.Inclusive,
							UpperType: pgtype.Inclusive,
							Status:    pgtype.Present,
						}
					}
					err = insertGPStmt.QueryRowContext(
						ctx,
						currIsrs.CountryCode,
						currIsrs.LoCode,
						currIsrs.FairwaySection,
						currIsrs.Orc,
						currIsrs.Hectometre,
						measure.Measuredate,
						msg.Identification.From,
						msg.Identification.Language_code,
						msg.Identification.Country_code,
						msg.Identification.Date_issue,
						referenceCode,
						measure.Value,
						isWaterlevel,
						&confInterval,
						msg.Identification.Date_issue,
						msg.Identification.Originator,
					).Scan(&dummy)
					switch {
					case err == sql.ErrNoRows:
						// thats expected, nothing to do
					case err != nil:
						feedback.Warn(handleError(err).Error())
					default:
						newP++
					}
				} else {
					err = insertGMStmt.QueryRowContext(
						ctx,
						currIsrs.CountryCode,
						currIsrs.LoCode,
						currIsrs.FairwaySection,
						currIsrs.Orc,
						currIsrs.Hectometre,
						measure.Measuredate,
						msg.Identification.From,
						msg.Identification.Language_code,
						msg.Identification.Country_code,
						msg.Identification.Date_issue,
						referenceCode,
						measure.Value,
						isWaterlevel,
						msg.Identification.Date_issue,
						msg.Identification.Originator,
					).Scan(&dummy)
					switch {
					case err == sql.ErrNoRows:
						// thats expected, nothing to do
					case err != nil:
						feedback.Warn(handleError(err).Error())
					default:
						newM++
					}
				}
			}
			feedback.Info("Inserted %d measurements for %s",
				newM, curr)
			feedback.Info("Inserted %d predictions for %s",
				newP, curr)
			gids = append(gids, curr)
		}
	}
	return gids, nil
}