Mercurial > gemma
view pkg/imports/fa.go @ 1900:6a67cd819e93
To prepare stretch import made some model data types re-usable.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Fri, 18 Jan 2019 15:04:53 +0100 |
parents | 295c82c5bc3e |
children | 25967829cf00 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Raimund Renkert <raimund.renkert@intevation.de> package imports import ( "context" "database/sql" "errors" "time" "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/soap/ifaf" ) // FairwayAvailability imports fairway availability data // from an IFAF SOAP service. type FairwayAvailability struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } // FAJobKind is import queue type identifier. const FAJobKind JobKind = "fa" const ( listBottlenecksSQL = ` SELECT bottleneck_id, responsible_country FROM waterway.bottlenecks WHERE responsible_country = users.current_user_country() AND staging_done = true ` latestMeasureDateSQL = ` SELECT measure_date FROM waterway.effective_fairway_availability ORDER BY measure_date DESC LIMIT 1 ` listFairwayAvailabilitySQL = ` SELECT fa.id, bn.bottleneck_id, fa.surdat FROM waterway.fairway_availability fa JOIN waterway.bottlenecks bn ON bn.id = fa.bottleneck_id ` insertFASQL = ` INSERT INTO waterway.fairway_availability ( position_code, bottleneck_id, surdat, critical, date_info, source_organization ) VALUES ( $1, (SELECT id FROM waterway.bottlenecks WHERE bottleneck_id = $2), $3, $4, $5, $6 ) RETURNING id` insertBnPdfsSQL = ` INSERT INTO waterway.bottleneck_pdfs ( fairway_availability_id, profile_pdf_filename, profile_pdf_url, pdf_generation_date, source_organization ) VALUES ( $1, $2, $3, $4, $5 ) ON CONFLICT ON CONSTRAINT bottleneck_pdfs_pkey DO NOTHING` insertEFASQL = ` INSERT INTO waterway.effective_fairway_availability ( fairway_availability_id, measure_date, level_of_service, available_depth_value, available_width_value, water_level_value, measure_type, source_organization, forecast_generation_time, value_lifetime ) VALUES ( $1, $2, (SELECT level_of_service FROM levels_of_service WHERE name = $3), $4, $5, $6, $7, $8, $9, $10 ) ON CONFLICT ON CONSTRAINT effective_fairway_availability_pkey DO NOTHING` insertFAVSQL = ` INSERT INTO waterway.fa_reference_values ( fairway_availability_id, level_of_service, fairway_depth, fairway_width, fairway_radius, shallowest_spot ) VALUES ( $1, (SELECT level_of_service FROM levels_of_service WHERE name = $2), $3, $4, $5, ST_MakePoint($6, $7)::geography )ON CONFLICT ON CONSTRAINT fa_reference_values_pkey DO NOTHING` ) type faJobCreator struct{} func init() { RegisterJobCreator(FAJobKind, faJobCreator{}) } func (faJobCreator) Description() string { return "fairway availability" } func (faJobCreator) Create(_ JobKind, data string) (Job, error) { fa := new(FairwayAvailability) if err := common.FromJSONString(data, fa); err != nil { return nil, err } return fa, nil } func (faJobCreator) Depends() []string { return []string{ "bottlenecks", "fairway_availability", "bottleneck_pdfs", "effective_fairway_availability", "fa_reference_values", "levels_of_service", } } func (faJobCreator) AutoAccept() bool { return true } // StageDone moves the imported fairway availablities out of the staging area. // Currently doing nothing. func (faJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil } // CleanUp of a fairway availablities import is a NOP. func (*FairwayAvailability) CleanUp() error { return nil } // Do executes the actual fairway availability import. func (fa *FairwayAvailability) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { // Get available bottlenecks from database for use as filter in SOAP request var rows *sql.Rows rows, err := conn.QueryContext(ctx, listBottlenecksSQL) if err != nil { return nil, err } defer rows.Close() bottlenecks := []models.Bottleneck{} for rows.Next() { var bn models.Bottleneck if err = rows.Scan( &bn.ID, &bn.ResponsibleCountry, ); err != nil { return nil, err } bottlenecks = append(bottlenecks, bn) } if err = rows.Err(); err != nil { return nil, err } var faRows *sql.Rows faRows, err = conn.QueryContext(ctx, listFairwayAvailabilitySQL) if err != nil { return nil, err } fairwayAvailabilities := map[models.UniqueFairwayAvailability]int64{} for faRows.Next() { var id int64 var bnId string var sd time.Time if err = faRows.Scan( &id, &bnId, &sd, ); err != nil { return nil, err } key := models.UniqueFairwayAvailability{ BottleneckId: bnId, Surdat: sd, } fairwayAvailabilities[key] = id } if err = faRows.Err(); err != nil { return nil, err } var latestDate pgtype.Timestamp err = conn.QueryRowContext(ctx, latestMeasureDateSQL).Scan(&latestDate) switch { case err == sql.ErrNoRows: latestDate = pgtype.Timestamp{ // Fill Database with data of the last 5 days. Change this to a more useful value. Time: time.Now().AddDate(0, 0, -5), } case err != nil: return nil, err } faids, err := fa.doForFAs(ctx, bottlenecks, fairwayAvailabilities, latestDate, conn, feedback) if err != nil { feedback.Error("Error processing data: %s", err) } if len(faids) == 0 { feedback.Info("No new fairway availablity data found") return nil, nil } feedback.Info("Processed %d fairway availabilities", len(faids)) // TODO: needs to be filled more useful. summary := struct { FairwayAvailabilities []string `json:"fairwayAvailabilities"` }{ FairwayAvailabilities: faids, } return &summary, err } func (fa *FairwayAvailability) doForFAs( ctx context.Context, bottlenecks []models.Bottleneck, fairwayAvailabilities map[models.UniqueFairwayAvailability]int64, latestDate pgtype.Timestamp, conn *sql.Conn, feedback Feedback, ) ([]string, error) { start := time.Now() client := ifaf.NewFairwayAvailabilityService(fa.URL, fa.Insecure, nil) var bnIds []string for _, bn := range bottlenecks { bnIds = append(bnIds, bn.ID) } var period ifaf.RequestedPeriod period.Date_start = latestDate.Time period.Date_end = time.Now() ids := ifaf.ArrayOfString{ String: bnIds, } req := &ifaf.Get_bottleneck_fa{ Bottleneck_id: &ids, Period: &period, } resp, err := client.Get_bottleneck_fa(req) if err != nil { feedback.Error("%v", err) return nil, err } if resp.Get_bottleneck_faResult == nil { err := errors.New("no fairway availabilities found") return nil, err } result := resp.Get_bottleneck_faResult tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() insertFAStmt, err := tx.PrepareContext(ctx, insertFASQL) if err != nil { return nil, err } defer insertFAStmt.Close() insertBnPdfsStmt, err := tx.PrepareContext(ctx, insertBnPdfsSQL) if err != nil { return nil, err } defer insertBnPdfsStmt.Close() insertEFAStmt, err := tx.PrepareContext(ctx, insertEFASQL) if err != nil { return nil, err } defer insertEFAStmt.Close() insertFAVStmt, err := tx.PrepareContext(ctx, insertFAVSQL) if err != nil { return nil, err } defer insertFAVStmt.Close() var faIDs []string var faID int64 feedback.Info("Found %d fairway availabilities", len(result.FairwayAvailability)) for _, faRes := range result.FairwayAvailability { uniqueFa := models.UniqueFairwayAvailability{ BottleneckId: faRes.Bottleneck_id, Surdat: faRes.SURDAT, } var found bool if faID, found = fairwayAvailabilities[uniqueFa]; !found { err = insertFAStmt.QueryRowContext( ctx, faRes.POSITION, faRes.Bottleneck_id, faRes.SURDAT, faRes.Critical, faRes.Date_Info, faRes.Source, ).Scan(&faID) if err != nil { return nil, err } fairwayAvailabilities[uniqueFa] = faID } feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id) faIDs = append(faIDs, faRes.Bottleneck_id) if faRes.Bottleneck_PDFs != nil { bnPdfCount := 0 for _, bnPdfs := range faRes.Bottleneck_PDFs.PdfInfo { res, err := insertBnPdfsStmt.ExecContext( ctx, faID, bnPdfs.ProfilePdfFilename, bnPdfs.ProfilePdfURL, bnPdfs.PDF_Generation_Date, bnPdfs.Source, ) if err != nil { return nil, err } affected, err := res.RowsAffected() if err == nil { bnPdfCount += int(affected) } else { bnPdfCount++ } } feedback.Info("Add %d Pdfs", bnPdfCount) } if faRes.Effective_fairway_availability != nil { efaCount := 0 for _, efa := range faRes.Effective_fairway_availability.EffectiveFairwayAvailability { los := efa.Level_of_Service fgt := efa.Forecast_generation_time if efa.Forecast_generation_time.Status == pgtype.Undefined { fgt = pgtype.Timestamp{ Status: pgtype.Null, } } res, err := insertEFAStmt.ExecContext( ctx, faID, efa.Measure_date, string(*los), efa.Available_depth_value, efa.Available_width_value, efa.Water_level_value, efa.Measure_type, efa.Source, fgt.Get(), efa.Value_lifetime, ) if err != nil { return nil, err } affected, err := res.RowsAffected() if err == nil { efaCount += int(affected) } else { efaCount++ } } feedback.Info("Add %d Effective Fairway Availability", efaCount) } if faRes.Reference_values != nil { rvCount := 0 for _, fav := range faRes.Reference_values.ReferenceValue { res, err := insertFAVStmt.ExecContext( ctx, faID, fav.Level_of_Service, fav.Fairway_depth, fav.Fairway_width, fav.Fairway_radius, fav.Shallowest_spot_Lat, fav.Shallowest_spot_Lon, ) if err != nil { return nil, err } affected, err := res.RowsAffected() if err == nil { rvCount += int(affected) } else { rvCount++ } } feedback.Info("Add %d Reference Values", rvCount) } } feedback.Info("Storing fairway availabilities took %s", time.Since(start)) if err = tx.Commit(); err == nil { feedback.Info("Import of fairway availabilities was successful") } return faIDs, nil }