Mercurial > gemma
view pkg/imports/fa.go @ 4158:5466562cca60
Remove utility function with possibly bad performance impact
Since the PostgreSQL planner will call functions used in a filter
condition once per row, even if the function is marked STABLE,
under some circumstances (e.g. the removed usage in an RLS policy),
remove the function from the database completely.
The DEFAULT on users.templates that used the function is unused,
thus remove it as a whole, too.
Recreate the function as a helper for tests in order to minimize
necessary changes there.
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Fri, 02 Aug 2019 16:10:42 +0200 |
parents | c9bef8c27685 |
children | c470d2202823 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018,2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Raimund Renkert <raimund.renkert@intevation.de> // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Sascha Wilde <wilde@intevation.de> package imports import ( "context" "database/sql" "errors" "fmt" "sort" "time" "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/soap/ifaf" ) // FairwayAvailability imports fairway availability data // from an IFAF SOAP service. type FairwayAvailability struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } // FAJobKind is import queue type identifier. const FAJobKind JobKind = "fa" const ( listBottlenecksSQL = ` SELECT DISTINCT bottleneck_id FROM waterway.bottlenecks WHERE responsible_country = ( SELECT country FROM users.list_users WHERE username = current_user) AND staging_done = true ORDER BY bottleneck_id ` latestMeasureDateSQL = ` SELECT measure_date FROM waterway.effective_fairway_availability ORDER BY measure_date DESC LIMIT 1 ` insertFASQL = ` INSERT INTO waterway.fairway_availability ( position_code, bottleneck_id, surdat, critical, date_info, source_organization ) VALUES ( $1, $2, $3, $4, $5, $6 ) ON CONFLICT (bottleneck_id, surdat) DO UPDATE SET position_code = EXCLUDED.position_code, critical = EXCLUDED.critical, date_info = EXCLUDED.date_info, source_organization = EXCLUDED.source_organization RETURNING id` insertBnPdfsSQL = ` INSERT INTO waterway.bottleneck_pdfs ( fairway_availability_id, profile_pdf_filename, profile_pdf_url, pdf_generation_date, source_organization ) VALUES ( $1, $2, $3, $4, $5 ) ON CONFLICT ON CONSTRAINT bottleneck_pdfs_pkey DO NOTHING` insertEFASQL = ` INSERT INTO waterway.effective_fairway_availability ( fairway_availability_id, measure_date, level_of_service, available_depth_value, available_width_value, water_level_value, measure_type, source_organization, forecast_generation_time, value_lifetime ) VALUES ( $1, $2, (SELECT level_of_service FROM levels_of_service WHERE name = $3), $4, $5, $6, $7, $8, $9, $10 ) ON CONFLICT ON CONSTRAINT effective_fairway_availability_pkey DO NOTHING` insertFAVSQL = ` INSERT INTO waterway.fa_reference_values ( fairway_availability_id, level_of_service, fairway_depth, fairway_width, fairway_radius, shallowest_spot ) VALUES ( $1, (SELECT level_of_service FROM levels_of_service WHERE name = $2), $3, $4, $5, ST_MakePoint($6, $7)::geography )ON CONFLICT ON CONSTRAINT fa_reference_values_pkey DO NOTHING` ) type faJobCreator struct{} func init() { RegisterJobCreator(FAJobKind, faJobCreator{}) } func (faJobCreator) Description() string { return "fairway availability" } func (faJobCreator) Create() Job { return new(FairwayAvailability) } func (faJobCreator) Depends() [2][]string { return [2][]string{ {"effective_fairway_availability", "fa_reference_values", "bottleneck_pdfs", "fairway_availability"}, {"bottlenecks", "levels_of_service"}, } } func (faJobCreator) AutoAccept() bool { return true } // StageDone moves the imported fairway availablities out of the staging area. // Currently doing nothing. func (faJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil } // CleanUp of a fairway availablities import is a NOP. func (*FairwayAvailability) CleanUp() error { return nil } type bottlenecks []string func (bns bottlenecks) contains(bn string) bool { idx := sort.SearchStrings(bns, bn) return idx < len(bns) && bns[idx] == bn } func loadBottleneckCountries(ctx context.Context, tx *sql.Tx) (bottlenecks, error) { // Get available bottlenecks from database for use as filter in SOAP request rows, err := tx.QueryContext(ctx, listBottlenecksSQL) if err != nil { return nil, err } defer rows.Close() var bns bottlenecks for rows.Next() { var bn string if err = rows.Scan(&bn); err != nil { return nil, err } bns = append(bns, bn) } if err = rows.Err(); err != nil { return nil, err } return bns, nil } func latestDate(ctx context.Context, tx *sql.Tx) (pgtype.Timestamp, error) { var date pgtype.Timestamp err := tx.QueryRowContext(ctx, latestMeasureDateSQL).Scan(&date) switch { case err == sql.ErrNoRows: date = pgtype.Timestamp{ // Fill Database with data of the last 5 days. Change this to a more useful value. Time: time.Now().AddDate(0, 0, -5), } case err != nil: return pgtype.Timestamp{}, err } return date, nil } func storeFairwayAvailability( ctx context.Context, conn *sql.Conn, feedback Feedback, fetch func(context.Context, *sql.Tx, bottlenecks) ([]*ifaf.FairwayAvailability, error), ) (interface{}, error) { start := time.Now() tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() bns, err := loadBottleneckCountries(ctx, tx) if err != nil { return nil, err } fas, err := fetch(ctx, tx, bns) if err != nil { return nil, err } faids, err := doForFAs(ctx, bns, fas, tx, feedback) if err != nil { return nil, fmt.Errorf("Error processing data: %v", err) } if len(faids) == 0 { feedback.Info("No new fairway availablity data found") return nil, UnchangedError("No new fairway availablity data found") } feedback.Info("Processed %d fairway availabilities", len(faids)) if err = tx.Commit(); err != nil { feedback.Info( "Importing fairway availabilities failed after %s", time.Since(start)) return nil, err } feedback.Info( "Importing fairway availabilities successfully took %s", time.Since(start)) // TODO: needs to be filled more useful. summary := struct { FairwayAvailabilities []string `json:"fairwayAvailabilities"` }{ FairwayAvailabilities: faids, } return &summary, nil } func doForFAs( ctx context.Context, bnIds bottlenecks, fas []*ifaf.FairwayAvailability, tx *sql.Tx, feedback Feedback, ) ([]string, error) { insertFAStmt, err := tx.PrepareContext(ctx, insertFASQL) if err != nil { return nil, err } defer insertFAStmt.Close() insertBnPdfsStmt, err := tx.PrepareContext(ctx, insertBnPdfsSQL) if err != nil { return nil, err } defer insertBnPdfsStmt.Close() insertEFAStmt, err := tx.PrepareContext(ctx, insertEFASQL) if err != nil { return nil, err } defer insertEFAStmt.Close() insertFAVStmt, err := tx.PrepareContext(ctx, insertFAVSQL) if err != nil { return nil, err } defer insertFAVStmt.Close() var faIDs []string var faID int64 feedback.Info("Found %d fairway availabilities", len(fas)) for _, faRes := range fas { // FIXME: The following test is propably unneccessary as already // done by DB constraints... [sw] if !bnIds.contains(faRes.Bottleneck_id) { feedback.Warn("Bottleneck %s not found in database.", faRes.Bottleneck_id) continue } err = insertFAStmt.QueryRowContext( ctx, faRes.POSITION, faRes.Bottleneck_id, faRes.SURDAT, faRes.Critical, faRes.Date_Info, faRes.Source, ).Scan(&faID) if err != nil { return nil, err } feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id) faIDs = append(faIDs, faRes.Bottleneck_id) if faRes.Bottleneck_PDFs != nil { bnPdfCount := 0 for _, bnPdfs := range faRes.Bottleneck_PDFs.PdfInfo { res, err := insertBnPdfsStmt.ExecContext( ctx, faID, bnPdfs.ProfilePdfFilename, bnPdfs.ProfilePdfURL, bnPdfs.PDF_Generation_Date, bnPdfs.Source, ) if err != nil { return nil, err } affected, err := res.RowsAffected() if err == nil { bnPdfCount += int(affected) } else { bnPdfCount++ } } feedback.Info("Add %d Pdfs", bnPdfCount) } if faRes.Effective_fairway_availability != nil { efaCount := 0 for _, efa := range faRes.Effective_fairway_availability.EffectiveFairwayAvailability { los := efa.Level_of_Service fgt := efa.Forecast_generation_time if efa.Forecast_generation_time.Status == pgtype.Undefined { fgt = pgtype.Timestamp{ Status: pgtype.Null, } } res, err := insertEFAStmt.ExecContext( ctx, faID, efa.Measure_date, string(*los), efa.Available_depth_value, efa.Available_width_value, efa.Water_level_value, efa.Measure_type, efa.Source, fgt.Get(), efa.Value_lifetime, ) if err != nil { return nil, err } affected, err := res.RowsAffected() if err == nil { efaCount += int(affected) } else { efaCount++ } } feedback.Info("Add %d Effective Fairway Availability", efaCount) } if faRes.Reference_values != nil { rvCount := 0 for _, fav := range faRes.Reference_values.ReferenceValue { res, err := insertFAVStmt.ExecContext( ctx, faID, fav.Level_of_Service, fav.Fairway_depth, fav.Fairway_width, fav.Fairway_radius, fav.Shallowest_spot_Lat, fav.Shallowest_spot_Lon, ) if err != nil { return nil, err } affected, err := res.RowsAffected() if err == nil { rvCount += int(affected) } else { rvCount++ } } feedback.Info("Add %d Reference Values", rvCount) } } return faIDs, nil } // Do executes the actual fairway availability import. func (fa *FairwayAvailability) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { fetch := func( ctx context.Context, tx *sql.Tx, bns bottlenecks, ) ([]*ifaf.FairwayAvailability, error) { latest, err := latestDate(ctx, tx) if err != nil { return nil, err } client := ifaf.NewFairwayAvailabilityService(fa.URL, fa.Insecure, nil) period := ifaf.RequestedPeriod{ Date_start: latest.Time, Date_end: time.Now(), } ids := ifaf.ArrayOfString{String: bns} req := &ifaf.Get_bottleneck_fa{ Bottleneck_id: &ids, Period: &period, } resp, err := client.Get_bottleneck_fa(req) if err != nil { return nil, err } if resp.Get_bottleneck_faResult == nil { return nil, errors.New("no fairway availabilities found") } result := resp.Get_bottleneck_faResult return result.FairwayAvailability, nil } return storeFairwayAvailability(ctx, conn, feedback, fetch) }