Mercurial > gemma
view pkg/imports/fa.go @ 1664:819f67c31dfb
Imports: Stripped schema prefixes from table dependencies.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Sun, 23 Dec 2018 12:57:28 +0100 |
parents | 10e3dd3b9363 |
children | 4407ecaa2192 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Raimund Renkert <raimund.renkert@intevation.de> package imports import ( "context" "database/sql" "errors" "time" "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/soap/ifaf" ) type FairwayAvailability struct { URL string `json:"url"` Insecure bool `json:"insecure"` } const FAJobKind JobKind = "fa" const ( listBottlenecksSQL = ` SELECT bottleneck_id, responsible_country FROM waterway.bottlenecks WHERE responsible_country = users.current_user_country() AND staging_done = true ` insertFASQL = ` INSERT INTO waterway.fairway_availability ( position_code, bottleneck_id, surdat, critical, date_info, source_organization ) VALUES ( $1, (SELECT id FROM waterway.bottlenecks WHERE bottleneck_id = $2), $3, $4, $5, $6 ) RETURNING id` insertBnPdfsSQL = ` INSERT INTO waterway.bottleneck_pdfs ( fairway_availability_id, profile_pdf_filename, profile_pdf_url, pdf_generation_date, source_organization ) VALUES ( $1, $2, $3, $4, $5 )` insertEFASQL = ` INSERT INTO waterway.effective_fairway_availability ( fairway_availability_id, measure_date, level_of_service, available_depth_value, available_width_value, water_level_value, measure_type, source_organization, forecast_generation_time, value_lifetime ) VALUES ( $1, $2, (SELECT level_of_service FROM levels_of_service WHERE name = $3), $4, $5, $6, $7, $8, $9, $10 )` insertFAVSQL = ` INSERT INTO waterway.fa_reference_values ( fairway_availability_id, level_of_service, fairway_depth, fairway_width, fairway_radius, shallowest_spot ) VALUES ( $1, (SELECT level_of_service FROM levels_of_service WHERE name = $2), $3, $4, $5, ST_MakePoint($6, $7)::geography )` ) type faJobCreator struct{} func init() { RegisterJobCreator(FAJobKind, faJobCreator{}) } func (faJobCreator) Description() string { return "fairway availability" } func (faJobCreator) Create(_ JobKind, data string) (Job, error) { fa := new(FairwayAvailability) if err := common.FromJSONString(data, fa); err != nil { return nil, err } return fa, nil } func (faJobCreator) Depends() []string { return []string{ "bottlenecks", "fairway_availability", "bottleneck_pdfs", "effective_fairway_availability", "fa_reference_values", "levels_of_service", } } // StageDone moves the imported fairway availablities out of the staging area. // Currently doing nothing. func (faJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, ) error { return nil } // CleanUp of a fairway availablities import is a NOP. func (fa *FairwayAvailability) CleanUp() error { return nil } // Do executes the actual fairway availability import. func (fa *FairwayAvailability) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { // Get available bottlenecks from database for use as filter in SOAP request var rows *sql.Rows rows, err := conn.QueryContext(ctx, listBottlenecksSQL) if err != nil { return nil, err } defer rows.Close() bottlenecks := []models.Bottleneck{} for rows.Next() { var bn models.Bottleneck if err = rows.Scan( &bn.ID, &bn.ResponsibleCountry, ); err != nil { return nil, err } bottlenecks = append(bottlenecks, bn) } if err = rows.Err(); err != nil { return nil, err } faids, err := fa.doForFAs(ctx, bottlenecks, conn, feedback) if err != nil { feedback.Error("Error processing data: %s", err) } if len(faids) == 0 { feedback.Info("No new fairway availablity data found") return nil, nil } feedback.Info("Processed %d of %d bottlenecks", len(faids), len(bottlenecks)) // TODO: needs to be filled more useful. summary := struct { FairwayAvailabilities []string `json:"fairwayAvailabilities"` }{ FairwayAvailabilities: faids, } return &summary, err } func (fa *FairwayAvailability) doForFAs( ctx context.Context, bottlenecks []models.Bottleneck, conn *sql.Conn, feedback Feedback, ) ([]string, error) { start := time.Now() client := ifaf.NewFairwayAvailabilityService(fa.URL, fa.Insecure, nil) var bnIds []string for _, bn := range bottlenecks { bnIds = append(bnIds, bn.ID) } ids := ifaf.ArrayOfString{ String: bnIds, } // TODO: Filter by period. Period should start after latest measurement date. req := &ifaf.Get_bottleneck_fa{ Bottleneck_id: &ids, } resp, err := client.Get_bottleneck_fa(req) if err != nil { feedback.Error("%v", err) return nil, err } if resp.Get_bottleneck_faResult == nil { err := errors.New("no fairway availabilities found") return nil, err } result := resp.Get_bottleneck_faResult tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() insertFAStmt, err := tx.PrepareContext(ctx, insertFASQL) if err != nil { return nil, err } defer insertFAStmt.Close() insertBnPdfsStmt, err := tx.PrepareContext(ctx, insertBnPdfsSQL) if err != nil { return nil, err } defer insertBnPdfsStmt.Close() insertEFAStmt, err := tx.PrepareContext(ctx, insertEFASQL) if err != nil { return nil, err } defer insertEFAStmt.Close() insertFAVStmt, err := tx.PrepareContext(ctx, insertFAVSQL) if err != nil { return nil, err } defer insertFAVStmt.Close() var faids []string var faId int64 feedback.Info("Found %d fairway availabilities", len(result.FairwayAvailability)) for _, faRes := range result.FairwayAvailability { // TODO: high frequent requests lead to "duplicate key value violates unique constraint "fairway_availability_bottleneck_id_surdat_key" // in the database. This has to be resolved. // All data subsets can also ocure as duplicates! err = insertFAStmt.QueryRowContext( ctx, faRes.POSITION, faRes.Bottleneck_id, faRes.SURDAT, faRes.Critical, faRes.Date_Info, faRes.Source, ).Scan(&faId) if err != nil { return nil, err } feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id) faids = append(faids, faRes.Bottleneck_id) for _, bnPdfs := range faRes.Bottleneck_PDFs.PdfInfo { _, err = insertBnPdfsStmt.ExecContext( ctx, faId, bnPdfs.ProfilePdfFilename, bnPdfs.ProfilePdfURL, bnPdfs.PDF_Generation_Date, bnPdfs.Source, ) if err != nil { return nil, err } feedback.Info("Add %d Pdfs", len(faRes.Bottleneck_PDFs.PdfInfo)) } for _, efa := range faRes.Effective_fairway_availability.EffectiveFairwayAvailability { los := efa.Level_of_Service fgt := efa.Forecast_generation_time if efa.Forecast_generation_time.Status == pgtype.Undefined { fgt = pgtype.Timestamp{ Status: pgtype.Null, } } _, err = insertEFAStmt.ExecContext( ctx, faId, efa.Measure_date, string(*los), efa.Available_depth_value, efa.Available_width_value, efa.Water_level_value, efa.Measure_type, efa.Source, fgt, efa.Value_lifetime, ) if err != nil { return nil, err } feedback.Info("Add %d Effective Fairway Availability", len( faRes.Effective_fairway_availability.EffectiveFairwayAvailability)) } for _, fav := range faRes.Reference_values.ReferenceValue { _, err = insertFAVStmt.ExecContext( ctx, faId, fav.Level_of_Service, fav.Fairway_depth, fav.Fairway_width, fav.Fairway_radius, fav.Shallowest_spot_Lat, fav.Shallowest_spot_Lon, ) if err != nil { return nil, err } feedback.Info("Add %d Reference Values", len(faRes.Reference_values.ReferenceValue)) } } feedback.Info("Storing fairway availabilities took %s", time.Since(start)) if err = tx.Commit(); err == nil { feedback.Info("Import of fairway availabilities was successful") } return faids, nil }