Mercurial > gemma
view pkg/imports/fa.go @ 3326:98ce6d101e01
available_fairway_depth: omit unit
author | Thomas Junk <thomas.junk@intevation.de> |
---|---|
date | Mon, 20 May 2019 12:32:10 +0200 |
parents | 4acbee65275d |
children | 02951a62e8c6 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Raimund Renkert <raimund.renkert@intevation.de> // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "context" "database/sql" "errors" "fmt" "sort" "time" "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/soap/ifaf" ) // FairwayAvailability imports fairway availability data // from an IFAF SOAP service. type FairwayAvailability struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } type uniqueFairwayAvailability struct { BottleneckId string Surdat time.Time } // FAJobKind is import queue type identifier. const FAJobKind JobKind = "fa" const ( listBottlenecksSQL = ` SELECT bottleneck_id FROM waterway.bottlenecks WHERE responsible_country = users.current_user_country() AND staging_done = true ORDER BY bottleneck_id ` latestMeasureDateSQL = ` SELECT measure_date FROM waterway.effective_fairway_availability ORDER BY measure_date DESC LIMIT 1 ` listFairwayAvailabilitySQL = ` SELECT fa.id, bn.bottleneck_id, fa.surdat FROM waterway.fairway_availability fa JOIN waterway.bottlenecks bn ON bn.id = fa.bottleneck_id ` insertFASQL = ` INSERT INTO waterway.fairway_availability ( position_code, bottleneck_id, surdat, critical, date_info, source_organization ) VALUES ( $1, (SELECT id FROM waterway.bottlenecks WHERE bottleneck_id = $2), $3, $4, $5, $6 ) RETURNING id` insertBnPdfsSQL = ` INSERT INTO waterway.bottleneck_pdfs ( fairway_availability_id, profile_pdf_filename, profile_pdf_url, pdf_generation_date, source_organization ) VALUES ( $1, $2, $3, $4, $5 ) ON CONFLICT ON CONSTRAINT bottleneck_pdfs_pkey DO NOTHING` insertEFASQL = ` INSERT INTO waterway.effective_fairway_availability ( fairway_availability_id, measure_date, level_of_service, available_depth_value, available_width_value, water_level_value, measure_type, source_organization, forecast_generation_time, value_lifetime ) VALUES ( $1, $2, (SELECT level_of_service FROM levels_of_service WHERE name = $3), $4, $5, $6, $7, $8, $9, $10 ) ON CONFLICT ON CONSTRAINT effective_fairway_availability_pkey DO NOTHING` insertFAVSQL = ` INSERT INTO waterway.fa_reference_values ( fairway_availability_id, level_of_service, fairway_depth, fairway_width, fairway_radius, shallowest_spot ) VALUES ( $1, (SELECT level_of_service FROM levels_of_service WHERE name = $2), $3, $4, $5, ST_MakePoint($6, $7)::geography )ON CONFLICT ON CONSTRAINT fa_reference_values_pkey DO NOTHING` ) type faJobCreator struct{} func init() { RegisterJobCreator(FAJobKind, faJobCreator{}) } func (faJobCreator) Description() string { return "fairway availability" } func (faJobCreator) Create() Job { return new(FairwayAvailability) } func (faJobCreator) Depends() [2][]string { return [2][]string{ {"effective_fairway_availability", "fa_reference_values", "bottleneck_pdfs", "fairway_availability"}, {"bottlenecks", "levels_of_service"}, } } func (faJobCreator) AutoAccept() bool { return true } // StageDone moves the imported fairway availablities out of the staging area. // Currently doing nothing. func (faJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil } // CleanUp of a fairway availablities import is a NOP. func (*FairwayAvailability) CleanUp() error { return nil } type bottlenecks []string func (bns bottlenecks) contains(bn string) bool { idx := sort.SearchStrings(bns, bn) return idx < len(bns) && bns[idx] == bn } func loadBottleneckCountries(ctx context.Context, tx *sql.Tx) (bottlenecks, error) { // Get available bottlenecks from database for use as filter in SOAP request rows, err := tx.QueryContext(ctx, listBottlenecksSQL) if err != nil { return nil, err } defer rows.Close() var bns bottlenecks for rows.Next() { var bn string if err = rows.Scan(&bn); err != nil { return nil, err } bns = append(bns, bn) } if err = rows.Err(); err != nil { return nil, err } return bns, nil } func loadFairwayAvailabilities(ctx context.Context, tx *sql.Tx) (map[uniqueFairwayAvailability]int64, error) { rows, err := tx.QueryContext(ctx, listFairwayAvailabilitySQL) if err != nil { return nil, err } defer rows.Close() fairwayAvailabilities := map[uniqueFairwayAvailability]int64{} for rows.Next() { var id int64 var bnId string var sd time.Time if err = rows.Scan( &id, &bnId, &sd, ); err != nil { return nil, err } key := uniqueFairwayAvailability{ BottleneckId: bnId, Surdat: sd, } fairwayAvailabilities[key] = id } if err = rows.Err(); err != nil { return nil, err } return fairwayAvailabilities, nil } func latestDate(ctx context.Context, tx *sql.Tx) (pgtype.Timestamp, error) { var date pgtype.Timestamp err := tx.QueryRowContext(ctx, latestMeasureDateSQL).Scan(&date) switch { case err == sql.ErrNoRows: date = pgtype.Timestamp{ // Fill Database with data of the last 5 days. Change this to a more useful value. Time: time.Now().AddDate(0, 0, -5), } case err != nil: return pgtype.Timestamp{}, err } return date, nil } func storeFairwayAvailability( ctx context.Context, conn *sql.Conn, feedback Feedback, fetch func(context.Context, *sql.Tx, bottlenecks) ([]*ifaf.FairwayAvailability, error), ) (interface{}, error) { start := time.Now() tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() bns, err := loadBottleneckCountries(ctx, tx) if err != nil { return nil, err } fas, err := fetch(ctx, tx, bns) if err != nil { return nil, err } fairwayAvailabilities, err := loadFairwayAvailabilities(ctx, tx) if err != nil { return nil, err } faids, err := doForFAs(ctx, bns, fairwayAvailabilities, fas, tx, feedback) if err != nil { return nil, fmt.Errorf("Error processing data: %v", err) } if len(faids) == 0 { feedback.Info("No new fairway availablity data found") return nil, UnchangedError("No new fairway availablity data found") } feedback.Info("Processed %d fairway availabilities", len(faids)) if err = tx.Commit(); err != nil { feedback.Info( "Importing fairway availabilities failed after %s", time.Since(start)) return nil, err } feedback.Info( "Importing fairway availabilities successfully took %s", time.Since(start)) // TODO: needs to be filled more useful. summary := struct { FairwayAvailabilities []string `json:"fairwayAvailabilities"` }{ FairwayAvailabilities: faids, } return &summary, nil } func doForFAs( ctx context.Context, bnIds bottlenecks, fairwayAvailabilities map[uniqueFairwayAvailability]int64, fas []*ifaf.FairwayAvailability, tx *sql.Tx, feedback Feedback, ) ([]string, error) { insertFAStmt, err := tx.PrepareContext(ctx, insertFASQL) if err != nil { return nil, err } defer insertFAStmt.Close() insertBnPdfsStmt, err := tx.PrepareContext(ctx, insertBnPdfsSQL) if err != nil { return nil, err } defer insertBnPdfsStmt.Close() insertEFAStmt, err := tx.PrepareContext(ctx, insertEFASQL) if err != nil { return nil, err } defer insertEFAStmt.Close() insertFAVStmt, err := tx.PrepareContext(ctx, insertFAVSQL) if err != nil { return nil, err } defer insertFAVStmt.Close() var faIDs []string var faID int64 feedback.Info("Found %d fairway availabilities", len(fas)) for _, faRes := range fas { if !bnIds.contains(faRes.Bottleneck_id) { feedback.Warn("Bottleneck %s not found in database.", faRes.Bottleneck_id) continue } uniqueFa := uniqueFairwayAvailability{ BottleneckId: faRes.Bottleneck_id, Surdat: faRes.SURDAT, } var found bool if faID, found = fairwayAvailabilities[uniqueFa]; !found { err = insertFAStmt.QueryRowContext( ctx, faRes.POSITION, faRes.Bottleneck_id, faRes.SURDAT, faRes.Critical, faRes.Date_Info, faRes.Source, ).Scan(&faID) if err != nil { return nil, err } fairwayAvailabilities[uniqueFa] = faID } feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id) faIDs = append(faIDs, faRes.Bottleneck_id) if faRes.Bottleneck_PDFs != nil { bnPdfCount := 0 for _, bnPdfs := range faRes.Bottleneck_PDFs.PdfInfo { res, err := insertBnPdfsStmt.ExecContext( ctx, faID, bnPdfs.ProfilePdfFilename, bnPdfs.ProfilePdfURL, bnPdfs.PDF_Generation_Date, bnPdfs.Source, ) if err != nil { return nil, err } affected, err := res.RowsAffected() if err == nil { bnPdfCount += int(affected) } else { bnPdfCount++ } } feedback.Info("Add %d Pdfs", bnPdfCount) } if faRes.Effective_fairway_availability != nil { efaCount := 0 for _, efa := range faRes.Effective_fairway_availability.EffectiveFairwayAvailability { los := efa.Level_of_Service fgt := efa.Forecast_generation_time if efa.Forecast_generation_time.Status == pgtype.Undefined { fgt = pgtype.Timestamp{ Status: pgtype.Null, } } res, err := insertEFAStmt.ExecContext( ctx, faID, efa.Measure_date, string(*los), efa.Available_depth_value, efa.Available_width_value, efa.Water_level_value, efa.Measure_type, efa.Source, fgt.Get(), efa.Value_lifetime, ) if err != nil { return nil, err } affected, err := res.RowsAffected() if err == nil { efaCount += int(affected) } else { efaCount++ } } feedback.Info("Add %d Effective Fairway Availability", efaCount) } if faRes.Reference_values != nil { rvCount := 0 for _, fav := range faRes.Reference_values.ReferenceValue { res, err := insertFAVStmt.ExecContext( ctx, faID, fav.Level_of_Service, fav.Fairway_depth, fav.Fairway_width, fav.Fairway_radius, fav.Shallowest_spot_Lat, fav.Shallowest_spot_Lon, ) if err != nil { return nil, err } affected, err := res.RowsAffected() if err == nil { rvCount += int(affected) } else { rvCount++ } } feedback.Info("Add %d Reference Values", rvCount) } } return faIDs, nil } // Do executes the actual fairway availability import. func (fa *FairwayAvailability) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { fetch := func( ctx context.Context, tx *sql.Tx, bns bottlenecks, ) ([]*ifaf.FairwayAvailability, error) { latest, err := latestDate(ctx, tx) if err != nil { return nil, err } client := ifaf.NewFairwayAvailabilityService(fa.URL, fa.Insecure, nil) period := ifaf.RequestedPeriod{ Date_start: latest.Time, Date_end: time.Now(), } ids := ifaf.ArrayOfString{String: bns} req := &ifaf.Get_bottleneck_fa{ Bottleneck_id: &ids, Period: &period, } resp, err := client.Get_bottleneck_fa(req) if err != nil { return nil, err } if resp.Get_bottleneck_faResult == nil { return nil, errors.New("no fairway availabilities found") } result := resp.Get_bottleneck_faResult return result.FairwayAvailability, nil } return storeFairwayAvailability(ctx, conn, feedback, fetch) }