Mercurial > gemma
view pkg/imports/fa.go @ 2130:f3aabc05f9b2
Fix constraints on waterway profiles
staging_done in the UNIQUE constraint had no effect, because the
exclusion constraint prevented two rows with equal location and
validity anyhow. Adding staging_done to the exclusion constraint
makes the UNIQUE constraint checking only a corner case of what
the exclusion constraint checks. Thus, remove the UNIQUE constraint.
Casting staging_done to int is needed because there is no appropriate
operator class for booleans. Casting to smallint or even bit would have
been better (i.e. should result in smaller index size), but that would
have required creating such a CAST, in addition.
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Wed, 06 Feb 2019 15:42:32 +0100 |
parents | aeff01c6edec |
children | b868cb653c4d |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Raimund Renkert <raimund.renkert@intevation.de> package imports import ( "context" "database/sql" "errors" "time" "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/soap/ifaf" ) // FairwayAvailability imports fairway availability data // from an IFAF SOAP service. type FairwayAvailability struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } type uniqueFairwayAvailability struct { BottleneckId string Surdat time.Time } // FAJobKind is import queue type identifier. const FAJobKind JobKind = "fa" const ( listBottlenecksSQL = ` SELECT bottleneck_id, responsible_country FROM waterway.bottlenecks WHERE responsible_country = users.current_user_country() AND staging_done = true ` latestMeasureDateSQL = ` SELECT measure_date FROM waterway.effective_fairway_availability ORDER BY measure_date DESC LIMIT 1 ` listFairwayAvailabilitySQL = ` SELECT fa.id, bn.bottleneck_id, fa.surdat FROM waterway.fairway_availability fa JOIN waterway.bottlenecks bn ON bn.id = fa.bottleneck_id ` insertFASQL = ` INSERT INTO waterway.fairway_availability ( position_code, bottleneck_id, surdat, critical, date_info, source_organization ) VALUES ( $1, (SELECT id FROM waterway.bottlenecks WHERE bottleneck_id = $2), $3, $4, $5, $6 ) RETURNING id` insertBnPdfsSQL = ` INSERT INTO waterway.bottleneck_pdfs ( fairway_availability_id, profile_pdf_filename, profile_pdf_url, pdf_generation_date, source_organization ) VALUES ( $1, $2, $3, $4, $5 ) ON CONFLICT ON CONSTRAINT bottleneck_pdfs_pkey DO NOTHING` insertEFASQL = ` INSERT INTO waterway.effective_fairway_availability ( fairway_availability_id, measure_date, level_of_service, available_depth_value, available_width_value, water_level_value, measure_type, source_organization, forecast_generation_time, value_lifetime ) VALUES ( $1, $2, (SELECT level_of_service FROM levels_of_service WHERE name = $3), $4, $5, $6, $7, $8, $9, $10 ) ON CONFLICT ON CONSTRAINT effective_fairway_availability_pkey DO NOTHING` insertFAVSQL = ` INSERT INTO waterway.fa_reference_values ( fairway_availability_id, level_of_service, fairway_depth, fairway_width, fairway_radius, shallowest_spot ) VALUES ( $1, (SELECT level_of_service FROM levels_of_service WHERE name = $2), $3, $4, $5, ST_MakePoint($6, $7)::geography )ON CONFLICT ON CONSTRAINT fa_reference_values_pkey DO NOTHING` ) type faJobCreator struct{} func init() { RegisterJobCreator(FAJobKind, faJobCreator{}) } func (faJobCreator) Description() string { return "fairway availability" } func (faJobCreator) Create(_ JobKind, data string) (Job, error) { fa := new(FairwayAvailability) if err := common.FromJSONString(data, fa); err != nil { return nil, err } return fa, nil } func (faJobCreator) Depends() []string { return []string{ "bottlenecks", "fairway_availability", "bottleneck_pdfs", "effective_fairway_availability", "fa_reference_values", "levels_of_service", } } func (faJobCreator) AutoAccept() bool { return true } // StageDone moves the imported fairway availablities out of the staging area. // Currently doing nothing. func (faJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil } // CleanUp of a fairway availablities import is a NOP. func (*FairwayAvailability) CleanUp() error { return nil } type bottleneckCountry struct { ID string ResponsibleCountry string } // Do executes the actual fairway availability import. func (fa *FairwayAvailability) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { // Get available bottlenecks from database for use as filter in SOAP request var rows *sql.Rows rows, err := conn.QueryContext(ctx, listBottlenecksSQL) if err != nil { return nil, err } defer rows.Close() bottlenecks := []bottleneckCountry{} for rows.Next() { var bn bottleneckCountry if err = rows.Scan( &bn.ID, &bn.ResponsibleCountry, ); err != nil { return nil, err } bottlenecks = append(bottlenecks, bn) } if err = rows.Err(); err != nil { return nil, err } var faRows *sql.Rows faRows, err = conn.QueryContext(ctx, listFairwayAvailabilitySQL) if err != nil { return nil, err } fairwayAvailabilities := map[uniqueFairwayAvailability]int64{} for faRows.Next() { var id int64 var bnId string var sd time.Time if err = faRows.Scan( &id, &bnId, &sd, ); err != nil { return nil, err } key := uniqueFairwayAvailability{ BottleneckId: bnId, Surdat: sd, } fairwayAvailabilities[key] = id } if err = faRows.Err(); err != nil { return nil, err } var latestDate pgtype.Timestamp err = conn.QueryRowContext(ctx, latestMeasureDateSQL).Scan(&latestDate) switch { case err == sql.ErrNoRows: latestDate = pgtype.Timestamp{ // Fill Database with data of the last 5 days. Change this to a more useful value. Time: time.Now().AddDate(0, 0, -5), } case err != nil: return nil, err } faids, err := fa.doForFAs(ctx, bottlenecks, fairwayAvailabilities, latestDate, conn, feedback) if err != nil { feedback.Error("Error processing data: %s", err) } if len(faids) == 0 { feedback.Info("No new fairway availablity data found") return nil, nil } feedback.Info("Processed %d fairway availabilities", len(faids)) // TODO: needs to be filled more useful. summary := struct { FairwayAvailabilities []string `json:"fairwayAvailabilities"` }{ FairwayAvailabilities: faids, } return &summary, err } func (fa *FairwayAvailability) doForFAs( ctx context.Context, bottlenecks []bottleneckCountry, fairwayAvailabilities map[uniqueFairwayAvailability]int64, latestDate pgtype.Timestamp, conn *sql.Conn, feedback Feedback, ) ([]string, error) { start := time.Now() client := ifaf.NewFairwayAvailabilityService(fa.URL, fa.Insecure, nil) var bnIds []string for _, bn := range bottlenecks { bnIds = append(bnIds, bn.ID) } var period ifaf.RequestedPeriod period.Date_start = latestDate.Time period.Date_end = time.Now() ids := ifaf.ArrayOfString{ String: bnIds, } req := &ifaf.Get_bottleneck_fa{ Bottleneck_id: &ids, Period: &period, } resp, err := client.Get_bottleneck_fa(req) if err != nil { feedback.Error("%v", err) return nil, err } if resp.Get_bottleneck_faResult == nil { err := errors.New("no fairway availabilities found") return nil, err } result := resp.Get_bottleneck_faResult tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() insertFAStmt, err := tx.PrepareContext(ctx, insertFASQL) if err != nil { return nil, err } defer insertFAStmt.Close() insertBnPdfsStmt, err := tx.PrepareContext(ctx, insertBnPdfsSQL) if err != nil { return nil, err } defer insertBnPdfsStmt.Close() insertEFAStmt, err := tx.PrepareContext(ctx, insertEFASQL) if err != nil { return nil, err } defer insertEFAStmt.Close() insertFAVStmt, err := tx.PrepareContext(ctx, insertFAVSQL) if err != nil { return nil, err } defer insertFAVStmt.Close() var faIDs []string var faID int64 feedback.Info("Found %d fairway availabilities", len(result.FairwayAvailability)) for _, faRes := range result.FairwayAvailability { uniqueFa := uniqueFairwayAvailability{ BottleneckId: faRes.Bottleneck_id, Surdat: faRes.SURDAT, } var found bool if faID, found = fairwayAvailabilities[uniqueFa]; !found { err = insertFAStmt.QueryRowContext( ctx, faRes.POSITION, faRes.Bottleneck_id, faRes.SURDAT, faRes.Critical, faRes.Date_Info, faRes.Source, ).Scan(&faID) if err != nil { return nil, err } fairwayAvailabilities[uniqueFa] = faID } feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id) faIDs = append(faIDs, faRes.Bottleneck_id) if faRes.Bottleneck_PDFs != nil { bnPdfCount := 0 for _, bnPdfs := range faRes.Bottleneck_PDFs.PdfInfo { res, err := insertBnPdfsStmt.ExecContext( ctx, faID, bnPdfs.ProfilePdfFilename, bnPdfs.ProfilePdfURL, bnPdfs.PDF_Generation_Date, bnPdfs.Source, ) if err != nil { return nil, err } affected, err := res.RowsAffected() if err == nil { bnPdfCount += int(affected) } else { bnPdfCount++ } } feedback.Info("Add %d Pdfs", bnPdfCount) } if faRes.Effective_fairway_availability != nil { efaCount := 0 for _, efa := range faRes.Effective_fairway_availability.EffectiveFairwayAvailability { los := efa.Level_of_Service fgt := efa.Forecast_generation_time if efa.Forecast_generation_time.Status == pgtype.Undefined { fgt = pgtype.Timestamp{ Status: pgtype.Null, } } res, err := insertEFAStmt.ExecContext( ctx, faID, efa.Measure_date, string(*los), efa.Available_depth_value, efa.Available_width_value, efa.Water_level_value, efa.Measure_type, efa.Source, fgt.Get(), efa.Value_lifetime, ) if err != nil { return nil, err } affected, err := res.RowsAffected() if err == nil { efaCount += int(affected) } else { efaCount++ } } feedback.Info("Add %d Effective Fairway Availability", efaCount) } if faRes.Reference_values != nil { rvCount := 0 for _, fav := range faRes.Reference_values.ReferenceValue { res, err := insertFAVStmt.ExecContext( ctx, faID, fav.Level_of_Service, fav.Fairway_depth, fav.Fairway_width, fav.Fairway_radius, fav.Shallowest_spot_Lat, fav.Shallowest_spot_Lon, ) if err != nil { return nil, err } affected, err := res.RowsAffected() if err == nil { rvCount += int(affected) } else { rvCount++ } } feedback.Info("Add %d Reference Values", rvCount) } } feedback.Info("Storing fairway availabilities took %s", time.Since(start)) if err = tx.Commit(); err == nil { feedback.Info("Import of fairway availabilities was successful") } return faIDs, nil }