view pkg/imports/gm.go @ 2130:f3aabc05f9b2

Fix constraints on waterway profiles staging_done in the UNIQUE constraint had no effect, because the exclusion constraint prevented two rows with equal location and validity anyhow. Adding staging_done to the exclusion constraint makes the UNIQUE constraint checking only a corner case of what the exclusion constraint checks. Thus, remove the UNIQUE constraint. Casting staging_done to int is needed because there is no appropriate operator class for booleans. Casting to smallint or even bit would have been better (i.e. should result in smaller index size), but that would have required creating such a CAST, in addition.
author Tom Gottfried <tom@intevation.de>
date Wed, 06 Feb 2019 15:42:32 +0100
parents 8a62ce2a5c70
children b868cb653c4d
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Raimund Renkert <raimund.renkert@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"fmt"
	"strings"
	"time"

	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/models"
	"gemma.intevation.de/gemma/pkg/soap/nts"
)

// GaugeMeasurement is an import job to import
// gauges measurement data from a NtS SOAP service.
type GaugeMeasurement struct {
	// URL is the URL of the SOAP service.
	URL string `json:"url"`
	// Insecure indicates if HTTPS traffic
	// should validate certificates or not.
	Insecure bool `json:"insecure"`
}

// gaugeMeasurement holds information about a gauge and the latest measurement
type gaugeMeasurement struct {
	Gauge           models.Isrs
	LatestDateIssue time.Time
}

// GMJobKind is the import queue type identifier.
const GMJobKind JobKind = "gm"

const (
	listGaugesSQL = `
SELECT
  (location).country_code,
  (location).locode,
  (location).fairway_section,
  (location).orc,
  (location).hectometre
FROM waterway.gauges
WHERE (location).country_code = users.current_user_country()`

	// TODO: Currently this statement updates existing data sets. In case we want
	// 'historization' we need to develop an other mechanism to keep existing
	// data.
	insertGMSQL = `
INSERT INTO waterway.gauge_measurements (
  fk_gauge_id,
  measure_date,
  sender,
  language_code,
  country_code,
  date_issue,
  reference_code,
  water_level,
  predicted,
  is_waterlevel,
  value_min,
  value_max,
  date_info,
  source_organization,
  staging_done
) VALUES(
  ($1, $2, $3, $4, $5),
  $6,
  $7,
  $8,
  $9,
  $10,
  $11,
  $12,
  $13,
  $14,
  $15,
  $16,
  $17,
  $18,
  $19
)
ON CONFLICT ON CONSTRAINT gauge_measurements_fk_gauge_id_measure_date_staging_done_key
DO UPDATE SET
country_code = EXCLUDED.country_code,
sender = EXCLUDED.sender,
language_code = EXCLUDED.language_code,
date_issue = EXCLUDED.date_issue,
reference_code= EXCLUDED.reference_code,
water_level = EXCLUDED.water_level,
predicted = EXCLUDED.predicted,
is_waterlevel = EXCLUDED.is_waterlevel,
value_min = EXCLUDED.value_min,
value_max = EXCLUDED.value_max,
date_info = EXCLUDED.date_info,
source_organization = EXCLUDED.source_organization
RETURNING id
`
)

type gmJobCreator struct{}

func init() {
	RegisterJobCreator(GMJobKind, gmJobCreator{})
}

func (gmJobCreator) Description() string {
	return "gauge measurements"
}

func (gmJobCreator) Create(_ JobKind, data string) (Job, error) {
	gm := new(GaugeMeasurement)
	if err := common.FromJSONString(data, gm); err != nil {
		return nil, err
	}
	return gm, nil
}

func (gmJobCreator) Depends() []string {
	return []string{
		"gauges",
		"gauge_measurements",
	}
}

func (gmJobCreator) AutoAccept() bool { return true }

// StageDone moves the imported gauge measurement out of the staging area.
// Currently doing nothing.
func (gmJobCreator) StageDone(context.Context, *sql.Tx, int64) error {
	return nil
}

// CleanUp of a gauge measurement import is a NOP.
func (*GaugeMeasurement) CleanUp() error { return nil }

// Do executes the actual bottleneck import.
func (gm *GaugeMeasurement) Do(
	ctx context.Context,
	importID int64,
	conn *sql.Conn,
	feedback Feedback,
) (interface{}, error) {

	// Get available gauges from database for use as filter in SOAP request
	var rows *sql.Rows

	rows, err := conn.QueryContext(ctx, listGaugesSQL)
	if err != nil {
		return nil, err
	}
	defer rows.Close()

	gauges := []gaugeMeasurement{}

	for rows.Next() {
		var g gaugeMeasurement
		if err = rows.Scan(
			&g.Gauge.CountryCode,
			&g.Gauge.LoCode,
			&g.Gauge.FairwaySection,
			&g.Gauge.Orc,
			&g.Gauge.Hectometre,
		); err != nil {
			return nil, err
		}
		gauges = append(gauges, g)
	}

	if err = rows.Err(); err != nil {
		return nil, err
	}

	// TODO get date_issue for selected gauges
	gids, err := gm.doForGM(ctx, gauges, conn, feedback)
	if err != nil {
		feedback.Error("Error processing %d gauges: %v", len(gauges), err)
		return nil, err
	}
	if len(gids) == 0 {
		feedback.Info("No new gauge measurements found")
		return nil, nil
	}
	// TODO: needs to be filled more useful.
	summary := struct {
		GaugeMeasuremets []string `json:"gaugeMeasurements"`
	}{
		GaugeMeasuremets: gids,
	}
	return &summary, err
}

// rescale returns a scaling function to bring the unit all to cm.
func rescale(unit string) (func(float32) float32, error) {

	var scale float32

	switch strings.ToLower(unit) {
	case "mm":
		scale = 0.1
	case "cm":
		scale = 1.0
	case "dm":
		scale = 10.0
	case "m":
		scale = 100.0
	case "hm":
		scale = 10000.0
	case "km":
		scale = 100000.0
	default:
		return nil, fmt.Errorf("unknown unit '%s'", unit)
	}

	fn := func(x float32) float32 { return scale * x }
	return fn, nil
}

func (gm *GaugeMeasurement) doForGM(
	ctx context.Context,
	gauges []gaugeMeasurement,
	conn *sql.Conn,
	feedback Feedback,
) ([]string, error) {
	start := time.Now()

	client := nts.NewINtSMessageService(gm.URL, gm.Insecure, nil)

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	mt := nts.Message_type_typeWRM

	var gids []string
	for _, g := range gauges {

		var idPair []*nts.Id_pair
		isrs := g.Gauge.String()
		isrsID := nts.Isrs_code_type(isrs)
		idPair = append(idPair, &nts.Id_pair{
			Id: &isrsID,
		})

		req := &nts.Get_messages_query{
			Message_type: &mt,
			Ids:          idPair,
		}

		resp, err := client.Get_messages(req)
		if err != nil {
			feedback.Error("%v", err)
			return nil, err
		}

		if resp.Result_message == nil {
			for _, e := range resp.Result_error {
				if e != nil {
					feedback.Error("No gauge measurements found for %s", g.Gauge.String())
				} else {
					feedback.Error("unknown")
				}
			}
			continue
		}
		result := resp.Result_message

		insertStmt, err := tx.PrepareContext(ctx, insertGMSQL)
		if err != nil {
			return nil, err
		}
		defer insertStmt.Close()

		for _, msg := range result {
			var gid int64
			feedback.Info("Found measurements for %s", g.Gauge.String())
			for _, wrm := range msg.Wrm {
				currIsrs, err := models.IsrsFromString(string(*wrm.Geo_object.Id))
				if err != nil {
					feedback.Warn("Invalid ISRS code %v", err)
					continue
				}
				var referenceCode string
				if wrm.Reference_code == nil {
					feedback.Info("'Reference_code' not specified. Assuming 'ZPG'")
					referenceCode = "ZPG"
				} else {
					referenceCode = string(*wrm.Reference_code)
				}
				for _, measure := range wrm.Measure {
					var unit string
					if measure.Unit == nil {
						feedback.Info("'Unit' not specified. Assuming 'cm'")
						unit = "cm"
					} else {
						unit = string(*measure.Unit)
					}
					convert, err := rescale(unit)
					if err != nil {
						return nil, err
					}
					isWaterlevel := *measure.Measure_code == nts.Measure_code_enumWAL
					err = insertStmt.QueryRowContext(
						ctx,
						currIsrs.CountryCode,
						currIsrs.LoCode,
						currIsrs.FairwaySection,
						currIsrs.Orc,
						currIsrs.Hectometre,
						measure.Measuredate,
						msg.Identification.From,
						msg.Identification.Language_code,
						msg.Identification.Country_code,
						msg.Identification.Date_issue,
						referenceCode,
						convert(measure.Value),
						measure.Predicted,
						isWaterlevel,
						convert(measure.Value_min),
						convert(measure.Value_max),
						msg.Identification.Date_issue,
						msg.Identification.Originator,
						true, // staging_done
					).Scan(&gid)
					if err != nil {
						return nil, err
					}
				}
				feedback.Info("Inserted %d measurements for %s",
					len(wrm.Measure), currIsrs)
				gids = append(gids, currIsrs.String())
			}
		}
	}
	feedback.Info("Storing gauge measurements took %s", time.Since(start))
	if err = tx.Commit(); err == nil {
		feedback.Info("Import of gauge measurements was successful")
	}

	return gids, nil
}