Mercurial > gemma
view pkg/imports/fa.go @ 5093:66270586031a
Adjusted message when review enqueuing is done.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Tue, 24 Mar 2020 21:32:10 +0100 |
parents | 56c589f7435d |
children | 4bc14bea3fc9 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018,2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Raimund Renkert <raimund.renkert@intevation.de> // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Sascha Wilde <wilde@intevation.de> package imports import ( "context" "database/sql" "errors" "fmt" "sort" "time" "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/soap/ifaf" ) // FairwayAvailability imports fairway availability data // from an IFAF SOAP service. type FairwayAvailability struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } const ( // FAJobKind is import queue type identifier. FAJobKind JobKind = "fa" ) const ( // maxHistoryDays is the numbers of days to look back. maxHistoryDays = 7 ) // Find bottlenecks of the current users country for a given time range const ( listBottlenecksSQL = ` SELECT DISTINCT bottleneck_id FROM waterway.bottlenecks WHERE responsible_country = ( SELECT country FROM users.list_users WHERE username = current_user) AND staging_done = true AND validity && $1 ORDER BY bottleneck_id ` latestMeasureDateSQL = ` SELECT CASE WHEN (SELECT array_agg(DISTINCT bottleneck_id) @> $1::varchar[] FROM waterway.fairway_availability) THEN (SELECT min(x.m) FROM (SELECT max(efa.measure_date) AS m FROM waterway.fairway_availability fa, waterway.effective_fairway_availability efa WHERE fa.bottleneck_id = ANY($1) AND efa.fairway_availability_id = fa.id GROUP BY fa.bottleneck_id) AS x) END ` insertFASQL = ` INSERT INTO waterway.fairway_availability ( position, bottleneck_id, surdat, critical, date_info, source_organization ) VALUES ( $1, $2, $3, $4, $5, $6 ) ON CONFLICT (bottleneck_id, surdat) DO UPDATE SET position = EXCLUDED.position, critical = EXCLUDED.critical, date_info = EXCLUDED.date_info, source_organization = EXCLUDED.source_organization RETURNING id` insertBnPdfsSQL = ` INSERT INTO waterway.bottleneck_pdfs ( fairway_availability_id, profile_pdf_filename, profile_pdf_url, pdf_generation_date, source_organization ) VALUES ( $1, $2, $3, $4, $5 ) ON CONFLICT ON CONSTRAINT bottleneck_pdfs_pkey DO NOTHING` insertEFASQL = ` INSERT INTO waterway.effective_fairway_availability ( fairway_availability_id, measure_date, level_of_service, available_depth_value, available_width_value, water_level_value, measure_type, source_organization, forecast_generation_time, value_lifetime ) VALUES ( $1, $2, (SELECT level_of_service FROM levels_of_service WHERE name = $3), $4, $5, $6, $7, $8, $9, $10 ) ON CONFLICT ON CONSTRAINT effective_fairway_availability_pkey DO NOTHING` insertFAVSQL = ` INSERT INTO waterway.fa_reference_values ( fairway_availability_id, level_of_service, fairway_depth, fairway_width, fairway_radius, shallowest_spot ) VALUES ( $1, (SELECT level_of_service FROM levels_of_service WHERE name = $2), $3, $4, $5, ST_MakePoint($6, $7)::geography )ON CONFLICT ON CONSTRAINT fa_reference_values_pkey DO NOTHING` ) // Description gives a short info about relevant facts of this import. func (fa *FairwayAvailability) Description() (string, error) { return fa.URL, nil } type faJobCreator struct{} func init() { RegisterJobCreator(FAJobKind, faJobCreator{}) } func (faJobCreator) Description() string { return "fairway availability" } func (faJobCreator) Create() Job { return new(FairwayAvailability) } func (faJobCreator) Depends() [2][]string { return [2][]string{ {"effective_fairway_availability", "fa_reference_values", "bottleneck_pdfs", "fairway_availability"}, {"bottlenecks", "levels_of_service"}, } } func (faJobCreator) AutoAccept() bool { return true } // StageDone is a NOP for fairway availability imports. func (faJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error { return nil } // CleanUp of a fairway availablities import is a NOP. func (*FairwayAvailability) CleanUp() error { return nil } type bottlenecks []string func (bns bottlenecks) contains(bn string) bool { idx := sort.SearchStrings(bns, bn) return idx < len(bns) && bns[idx] == bn } func loadBottleneckCountries(ctx context.Context, tx *sql.Tx) (bottlenecks, error) { // Get available bottlenecks from database for use as filter in SOAP request // We are only interested in bottlenecks which were valid in the last // maxHistoryDays var tfrom pgtype.Timestamptz tfrom.Set(time.Now().AddDate(0, 0, -maxHistoryDays)) trange := pgtype.Tstzrange{ Lower: tfrom, LowerType: pgtype.Inclusive, UpperType: pgtype.Unbounded, Status: pgtype.Present, } rows, err := tx.QueryContext(ctx, listBottlenecksSQL, trange) if err != nil { return nil, err } defer rows.Close() var bns bottlenecks for rows.Next() { var bn string if err = rows.Scan(&bn); err != nil { return nil, err } bns = append(bns, bn) } if err = rows.Err(); err != nil { return nil, err } return bns, nil } // Get the earliest of all the latest measure_dates for a list of bottlenecks. // We do not pick the latest of all dates, so that we don't miss data if one of // the bottlenecks we are taking into account was not active for some time... func latestDate( ctx context.Context, tx *sql.Tx, bns bottlenecks, ) (pgtype.Timestamp, error) { var ( date pgtype.Timestamp pgbns pgtype.TextArray ) pgbns.Set(bns) err := tx.QueryRowContext(ctx, latestMeasureDateSQL, &pgbns).Scan(&date) switch { case err == sql.ErrNoRows: date = pgtype.Timestamp{ Time: time.Now().AddDate(0, 0, -maxHistoryDays), } case err != nil: return pgtype.Timestamp{}, err } // Limit request range to MaxHistoryDays. Otherwise the Request might // fail, e.g. the AT service has an upper liimit of 10000 results. // // FIXME: the better solution would be to detect such errors and // dynamically and implement some kind of chunking... if time.Since(date.Time).Hours() > maxHistoryDays*24 { date = pgtype.Timestamp{ Time: time.Now().AddDate(0, 0, -maxHistoryDays), } } return date, nil } func storeFairwayAvailability( ctx context.Context, conn *sql.Conn, feedback Feedback, fetch func(context.Context, *sql.Tx, bottlenecks) ([]*ifaf.FairwayAvailability, error), ) (interface{}, error) { start := time.Now() tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() bns, err := loadBottleneckCountries(ctx, tx) if err != nil { return nil, err } fas, err := fetch(ctx, tx, bns) if err != nil { return nil, err } faids, err := doForFAs(ctx, bns, fas, tx, feedback) if err != nil { return nil, fmt.Errorf("Error processing data: %v", err) } if len(faids) == 0 { feedback.Info("No new fairway availablity data found") return nil, UnchangedError("No new fairway availablity data found") } feedback.Info("Processed %d fairway availabilities", len(faids)) if err = tx.Commit(); err != nil { feedback.Info( "Importing fairway availabilities failed after %s", time.Since(start)) return nil, err } feedback.Info( "Importing fairway availabilities successfully took %s", time.Since(start)) // TODO: needs to be filled more useful. summary := struct { FairwayAvailabilities []string `json:"fairwayAvailabilities"` }{ FairwayAvailabilities: faids, } return &summary, nil } // defaultLOS defaults to LOS3 when no expicit LOS is given. func defaultLOS(los *ifaf.LosEnum) ifaf.LosEnum { if los == nil { return ifaf.LosEnumLOS3 } return *los } func doForFAs( ctx context.Context, bnIds bottlenecks, fas []*ifaf.FairwayAvailability, tx *sql.Tx, feedback Feedback, ) ([]string, error) { insertFAStmt, err := tx.PrepareContext(ctx, insertFASQL) if err != nil { return nil, err } defer insertFAStmt.Close() insertBnPdfsStmt, err := tx.PrepareContext(ctx, insertBnPdfsSQL) if err != nil { return nil, err } defer insertBnPdfsStmt.Close() insertEFAStmt, err := tx.PrepareContext(ctx, insertEFASQL) if err != nil { return nil, err } defer insertEFAStmt.Close() insertFAVStmt, err := tx.PrepareContext(ctx, insertFAVSQL) if err != nil { return nil, err } defer insertFAVStmt.Close() var faIDs []string var faID int64 feedback.Info("Found %d fairway availabilities", len(fas)) for _, faRes := range fas { // FIXME: The following test is propably unneccessary as already // done by DB constraints... [sw] if !bnIds.contains(faRes.Bottleneck_id) { feedback.Warn("Bottleneck %s not found in database.", faRes.Bottleneck_id) continue } err = insertFAStmt.QueryRowContext( ctx, faRes.POSITION, faRes.Bottleneck_id, faRes.SURDAT, faRes.Critical, faRes.Date_Info, faRes.Source, ).Scan(&faID) if err != nil { return nil, err } feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id) faIDs = append(faIDs, faRes.Bottleneck_id) if faRes.Bottleneck_PDFs != nil { bnPdfCount := 0 for _, bnPdfs := range faRes.Bottleneck_PDFs.PdfInfo { res, err := insertBnPdfsStmt.ExecContext( ctx, faID, bnPdfs.ProfilePdfFilename, bnPdfs.ProfilePdfURL, bnPdfs.PDF_Generation_Date, bnPdfs.Source, ) if err != nil { return nil, err } affected, err := res.RowsAffected() if err == nil { bnPdfCount += int(affected) } else { bnPdfCount++ } } feedback.Info("Add %d Pdfs", bnPdfCount) } if faRes.Effective_fairway_availability != nil { efaCount := 0 for _, efa := range faRes.Effective_fairway_availability.EffectiveFairwayAvailability { fgt := efa.Forecast_generation_time if efa.Forecast_generation_time.Status == pgtype.Undefined { fgt = pgtype.Timestamp{ Status: pgtype.Null, } } res, err := insertEFAStmt.ExecContext( ctx, faID, efa.Measure_date, string(defaultLOS(efa.Level_of_Service)), efa.Available_depth_value, efa.Available_width_value, efa.Water_level_value, efa.Measure_type, efa.Source, fgt.Get(), efa.Value_lifetime, ) if err != nil { return nil, err } affected, err := res.RowsAffected() if err == nil { efaCount += int(affected) } else { efaCount++ } } feedback.Info("Add %d Effective Fairway Availability", efaCount) } if faRes.Reference_values != nil { rvCount := 0 for _, fav := range faRes.Reference_values.ReferenceValue { res, err := insertFAVStmt.ExecContext( ctx, faID, string(defaultLOS(fav.Level_of_Service)), fav.Fairway_depth, fav.Fairway_width, fav.Fairway_radius, fav.Shallowest_spot_Lat, fav.Shallowest_spot_Lon, ) if err != nil { return nil, err } affected, err := res.RowsAffected() if err == nil { rvCount += int(affected) } else { rvCount++ } } feedback.Info("Add %d Reference Values", rvCount) } } return faIDs, nil } // Do executes the actual fairway availability import. func (fa *FairwayAvailability) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { fetch := func( ctx context.Context, tx *sql.Tx, bns bottlenecks, ) ([]*ifaf.FairwayAvailability, error) { feedback.Info("Requesting data for: %v", bns) latest, err := latestDate(ctx, tx, bns) if err != nil { return nil, err } feedback.Info("Requesting data starting from %s", latest.Time) client := ifaf.NewFairwayAvailabilityService(fa.URL, fa.Insecure, nil) period := ifaf.RequestedPeriod{ Date_start: latest.Time, Date_end: time.Now(), } ids := ifaf.ArrayOfString{String: bns} req := &ifaf.Get_bottleneck_fa{ Bottleneck_id: &ids, Period: &period, } resp, err := client.Get_bottleneck_fa(req) if err != nil { return nil, err } if resp.Get_bottleneck_faResult == nil { return nil, errors.New("no fairway availabilities found") } result := resp.Get_bottleneck_faResult return result.FairwayAvailability, nil } return storeFairwayAvailability(ctx, conn, feedback, fetch) }