view pkg/imports/gm.go @ 2233:137addc77b1b

Gauge measurement imports: Do all database ops in one transaction.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 13 Feb 2019 12:39:56 +0100
parents 7c83b5277c1c
children 9b2f8e94671e
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Raimund Renkert <raimund.renkert@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"fmt"
	"strings"
	"time"

	"gemma.intevation.de/gemma/pkg/models"
	"gemma.intevation.de/gemma/pkg/soap/nts"
)

// GaugeMeasurement is an import job to import
// gauges measurement data from a NtS SOAP service.
type GaugeMeasurement struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

// gauge holds information about a gauge and the latest measurement
type gauge struct {
	location models.Isrs
	latest   time.Time
}

// GMJobKind is the import queue type identifier.
const GMJobKind JobKind = "gm"

const (
	listGaugesSQL = `
SELECT
  (location).country_code,
  (location).locode,
  (location).fairway_section,
  (location).orc,
  (location).hectometre
FROM waterway.gauges
WHERE (location).country_code = users.current_user_country()`

	// TODO: Currently this statement updates existing data sets. In case we want
	// 'historization' we need to develop an other mechanism to keep existing
	// data.
	insertGMSQL = `
INSERT INTO waterway.gauge_measurements (
  fk_gauge_id,
  measure_date,
  sender,
  language_code,
  country_code,
  date_issue,
  reference_code,
  water_level,
  predicted,
  is_waterlevel,
  value_min,
  value_max,
  date_info,
  source_organization,
  staging_done
) VALUES(
  ($1, $2, $3, $4, $5),
  $6,
  $7,
  $8,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14,
  $15,
  $16,
  $17,
  $18,
  $19
)
ON CONFLICT ON CONSTRAINT gauge_measurements_fk_gauge_id_measure_date_staging_done_key
DO UPDATE SET
country_code = EXCLUDED.country_code,
sender = EXCLUDED.sender,
language_code = EXCLUDED.language_code,
date_issue = EXCLUDED.date_issue,
reference_code= EXCLUDED.reference_code,
water_level = EXCLUDED.water_level,
predicted = EXCLUDED.predicted,
is_waterlevel = EXCLUDED.is_waterlevel,
value_min = EXCLUDED.value_min,
value_max = EXCLUDED.value_max,
date_info = EXCLUDED.date_info,
source_organization = EXCLUDED.source_organization
RETURNING id
`
)

type gmJobCreator struct{}

func init() {
	RegisterJobCreator(GMJobKind, gmJobCreator{})
}

func (gmJobCreator) Description() string { return "gauge measurements" }

func (gmJobCreator) Create() Job { return new(GaugeMeasurement) }

func (gmJobCreator) Depends() []string {
	return []string{
		"gauges",
		"gauge_measurements",
	}
}

func (gmJobCreator) AutoAccept() bool { return true }

// StageDone moves the imported gauge measurement out of the staging area.
// Currently doing nothing.
func (gmJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
	return nil
}

// CleanUp of a gauge measurement import is a NOP.
func (*GaugeMeasurement) CleanUp() error { return nil }

func loadGauges(ctx context.Context, tx *sql.Tx) ([]*gauge, error) {

	rows, err := tx.QueryContext(ctx, listGaugesSQL)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	var gauges []*gauge

	for rows.Next() {
		var g gauge
		if err = rows.Scan(
			&g.location.CountryCode,
			&g.location.LoCode,
			&g.location.FairwaySection,
			&g.location.Orc,
			&g.location.Hectometre,
		); err != nil {
			return nil, err
		}
		gauges = append(gauges, &g)
	}

	if err = rows.Err(); err != nil {
		return nil, err
	}

	return gauges, nil
}

// Do executes the actual bottleneck import.
func (gm *GaugeMeasurement) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	start := time.Now()

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	// Get available gauges from database for use as filter in SOAP request
	gauges, err := loadGauges(ctx, tx)
	if err != nil {
		return nil, err
	}

	// TODO get date_issue for selected gauges
	gids, err := gm.doForGM(ctx, gauges, tx, feedback)
	if err != nil {
		feedback.Error("Error processing %d gauges: %v", len(gauges), err)
		return nil, err
	}

	if len(gids) == 0 {
		feedback.Info("No new gauge measurements found")
		return nil, UnchangedError("No new gauge measurements found")
	}

	if err = tx.Commit(); err != nil {
		feedback.Info(
			"Importing gauge measurements failed after %s", time.Since(start))
		return nil, err
	}
	feedback.Info(
		"Importing gauge measurements successfully took %s", time.Since(start))

	// TODO: needs to be filled more useful.
	summary := struct {
		GaugeMeasuremets []string `json:"gaugeMeasurements"`
	}{
		GaugeMeasuremets: gids,
	}
	return &summary, err
}

// rescale returns a scaling function to bring the unit all to cm.
func rescale(unit string) (func(float32) float32, error) {

	var scale float32

	switch strings.ToLower(unit) {
	case "mm":
		scale = 0.1
	case "cm":
		scale = 1.0
	case "dm":
		scale = 10.0
	case "m":
		scale = 100.0
	case "hm":
		scale = 10000.0
	case "km":
		scale = 100000.0
	default:
		return nil, fmt.Errorf("unknown unit '%s'", unit)
	}

	fn := func(x float32) float32 { return scale * x }
	return fn, nil
}

func (gm *GaugeMeasurement) doForGM(
	ctx context.Context,
	gauges []*gauge,
	tx *sql.Tx,
	feedback Feedback,
) ([]string, error) {

	client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil)

	mt := nts.Message_type_typeWRM

	var gids []string
	for _, g := range gauges {

		var idPair []*nts.Id_pair
		isrs := g.location.String()
		isrsID := nts.Isrs_code_type(isrs)
		idPair = append(idPair, &nts.Id_pair{
			Id: &isrsID,
		})

		req := &nts.Get_messages_query{
			Message_type: &mt,
			Ids:          idPair,
		}

		resp, err := client.Get_messages(req)
		if err != nil {
			feedback.Error("%v", err)
			return nil, err
		}

		if resp.Result_message == nil {
			for _, e := range resp.Result_error {
				if e != nil {
					feedback.Error("No gauge measurements found for %s", g.location.String())
				} else {
					feedback.Error("unknown")
				}
			}
			continue
		}
		result := resp.Result_message

		insertStmt, err := tx.PrepareContext(ctx, insertGMSQL)
		if err != nil {
			return nil, err
		}
		defer insertStmt.Close()

		for _, msg := range result {
			var gid int64
			feedback.Info("Found measurements for %s", g.location.String())
			for _, wrm := range msg.Wrm {
				currIsrs, err := models.IsrsFromString(string(*wrm.Geo_object.Id))
				if err != nil {
					feedback.Warn("Invalid ISRS code %v", err)
					continue
				}
				var referenceCode string
				if wrm.Reference_code == nil {
					feedback.Info("'Reference_code' not specified. Assuming 'ZPG'")
					referenceCode = "ZPG"
				} else {
					referenceCode = string(*wrm.Reference_code)
				}
				for _, measure := range wrm.Measure {
					var unit string
					if measure.Unit == nil {
						feedback.Info("'Unit' not specified. Assuming 'cm'")
						unit = "cm"
					} else {
						unit = string(*measure.Unit)
					}
					convert, err := rescale(unit)
					if err != nil {
						return nil, err
					}
					isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL
					err = insertStmt.QueryRowContext(
						ctx,
						currIsrs.CountryCode,
						currIsrs.LoCode,
						currIsrs.FairwaySection,
						currIsrs.Orc,
						currIsrs.Hectometre,
						measure.Measuredate,
						msg.Identification.From,
						msg.Identification.Language_code,
						msg.Identification.Country_code,
						msg.Identification.Date_issue,
						referenceCode,
						convert(measure.Value),
						measure.Predicted,
						isWaterlevel,
						convert(measure.Value_min),
						convert(measure.Value_max),
						msg.Identification.Date_issue,
						msg.Identification.Originator,
						true, // staging_done
					).Scan(&gid)
					if err != nil {
						return nil, err
					}
				}
				feedback.Info("Inserted %d measurements for %s",
					len(wrm.Measure), currIsrs)
				gids = append(gids, currIsrs.String())
			}
		}
	}
	return gids, nil
}