Mercurial > gemma
view pkg/imports/fa.go @ 2624:9dbaf69c7a66
Improve geoserver config to better calculate bounding boxes
* Disable the use of estimated extents for the postgis storage
configuration for geoserver, which is set via the gemma middleware.
This way we are able to get better bounding boxes for many layers
where the postgis function `ST_EstimatedExtent()` would be far off.
author | Bernhard Reiter <bernhard@intevation.de> |
---|---|
date | Wed, 13 Mar 2019 16:18:39 +0100 |
parents | cfc523c70e90 |
children | 4acbee65275d |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Raimund Renkert <raimund.renkert@intevation.de> // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "context" "database/sql" "errors" "fmt" "sort" "time" "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/soap/ifaf" ) // FairwayAvailability imports fairway availability data // from an IFAF SOAP service. type FairwayAvailability struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } type uniqueFairwayAvailability struct { BottleneckId string Surdat time.Time } // FAJobKind is import queue type identifier. const FAJobKind JobKind = "fa" const ( listBottlenecksSQL = ` SELECT bottleneck_id FROM waterway.bottlenecks WHERE responsible_country = users.current_user_country() AND staging_done = true ORDER BY bottleneck_id ` latestMeasureDateSQL = ` SELECT measure_date FROM waterway.effective_fairway_availability ORDER BY measure_date DESC LIMIT 1 ` listFairwayAvailabilitySQL = ` SELECT fa.id, bn.bottleneck_id, fa.surdat FROM waterway.fairway_availability fa JOIN waterway.bottlenecks bn ON bn.id = fa.bottleneck_id ` insertFASQL = ` INSERT INTO waterway.fairway_availability ( position_code, bottleneck_id, surdat, critical, date_info, source_organization ) VALUES ( $1, (SELECT id FROM waterway.bottlenecks WHERE bottleneck_id = $2), $3, $4, $5, $6 ) RETURNING id` insertBnPdfsSQL = ` INSERT INTO waterway.bottleneck_pdfs ( fairway_availability_id, profile_pdf_filename, profile_pdf_url, pdf_generation_date, source_organization ) VALUES ( $1, $2, $3, $4, $5 ) ON CONFLICT ON CONSTRAINT bottleneck_pdfs_pkey DO NOTHING` insertEFASQL = ` INSERT INTO waterway.effective_fairway_availability ( fairway_availability_id, measure_date, level_of_service, available_depth_value, available_width_value, water_level_value, measure_type, source_organization, forecast_generation_time, value_lifetime ) VALUES ( $1, $2, (SELECT level_of_service FROM levels_of_service WHERE name = $3), $4, $5, $6, $7, $8, $9, $10 ) ON CONFLICT ON CONSTRAINT effective_fairway_availability_pkey DO NOTHING` insertFAVSQL = ` INSERT INTO waterway.fa_reference_values ( fairway_availability_id, level_of_service, fairway_depth, fairway_width, fairway_radius, shallowest_spot ) VALUES ( $1, (SELECT level_of_service FROM levels_of_service WHERE name = $2), $3, $4, $5, ST_MakePoint($6, $7)::geography )ON CONFLICT ON CONSTRAINT fa_reference_values_pkey DO NOTHING` ) type faJobCreator struct{} func init() { RegisterJobCreator(FAJobKind, faJobCreator{}) } func (faJobCreator) Description() string { return "fairway availability" } func (faJobCreator) Create() Job { return new(FairwayAvailability) } func (faJobCreator) Depends() []string { return []string{ "bottlenecks", "fairway_availability", "bottleneck_pdfs", "effective_fairway_availability", "fa_reference_values", "levels_of_service", } } func (faJobCreator) AutoAccept() bool { return true } // StageDone moves the imported fairway availablities out of the staging area. // Currently doing nothing. func (faJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil } // CleanUp of a fairway availablities import is a NOP. func (*FairwayAvailability) CleanUp() error { return nil } type bottlenecks []string func (bns bottlenecks) contains(bn string) bool { idx := sort.SearchStrings(bns, bn) return idx < len(bns) && bns[idx] == bn } func loadBottleneckCountries(ctx context.Context, tx *sql.Tx) (bottlenecks, error) { // Get available bottlenecks from database for use as filter in SOAP request rows, err := tx.QueryContext(ctx, listBottlenecksSQL) if err != nil { return nil, err } defer rows.Close() var bns bottlenecks for rows.Next() { var bn string if err = rows.Scan(&bn); err != nil { return nil, err } bns = append(bns, bn) } if err = rows.Err(); err != nil { return nil, err } return bns, nil } func loadFairwayAvailabilities(ctx context.Context, tx *sql.Tx) (map[uniqueFairwayAvailability]int64, error) { rows, err := tx.QueryContext(ctx, listFairwayAvailabilitySQL) if err != nil { return nil, err } defer rows.Close() fairwayAvailabilities := map[uniqueFairwayAvailability]int64{} for rows.Next() { var id int64 var bnId string var sd time.Time if err = rows.Scan( &id, &bnId, &sd, ); err != nil { return nil, err } key := uniqueFairwayAvailability{ BottleneckId: bnId, Surdat: sd, } fairwayAvailabilities[key] = id } if err = rows.Err(); err != nil { return nil, err } return fairwayAvailabilities, nil } func latestDate(ctx context.Context, tx *sql.Tx) (pgtype.Timestamp, error) { var date pgtype.Timestamp err := tx.QueryRowContext(ctx, latestMeasureDateSQL).Scan(&date) switch { case err == sql.ErrNoRows: date = pgtype.Timestamp{ // Fill Database with data of the last 5 days. Change this to a more useful value. Time: time.Now().AddDate(0, 0, -5), } case err != nil: return pgtype.Timestamp{}, err } return date, nil } func storeFairwayAvailability( ctx context.Context, conn *sql.Conn, feedback Feedback, fetch func(context.Context, *sql.Tx, bottlenecks) ([]*ifaf.FairwayAvailability, error), ) (interface{}, error) { start := time.Now() tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() bns, err := loadBottleneckCountries(ctx, tx) if err != nil { return nil, err } fas, err := fetch(ctx, tx, bns) if err != nil { return nil, err } fairwayAvailabilities, err := loadFairwayAvailabilities(ctx, tx) if err != nil { return nil, err } faids, err := doForFAs(ctx, bns, fairwayAvailabilities, fas, tx, feedback) if err != nil { return nil, fmt.Errorf("Error processing data: %v", err) } if len(faids) == 0 { feedback.Info("No new fairway availablity data found") return nil, UnchangedError("No new fairway availablity data found") } feedback.Info("Processed %d fairway availabilities", len(faids)) if err = tx.Commit(); err != nil { feedback.Info( "Importing fairway availabilities failed after %s", time.Since(start)) return nil, err } feedback.Info( "Importing fairway availabilities successfully took %s", time.Since(start)) // TODO: needs to be filled more useful. summary := struct { FairwayAvailabilities []string `json:"fairwayAvailabilities"` }{ FairwayAvailabilities: faids, } return &summary, nil } func doForFAs( ctx context.Context, bnIds bottlenecks, fairwayAvailabilities map[uniqueFairwayAvailability]int64, fas []*ifaf.FairwayAvailability, tx *sql.Tx, feedback Feedback, ) ([]string, error) { insertFAStmt, err := tx.PrepareContext(ctx, insertFASQL) if err != nil { return nil, err } defer insertFAStmt.Close() insertBnPdfsStmt, err := tx.PrepareContext(ctx, insertBnPdfsSQL) if err != nil { return nil, err } defer insertBnPdfsStmt.Close() insertEFAStmt, err := tx.PrepareContext(ctx, insertEFASQL) if err != nil { return nil, err } defer insertEFAStmt.Close() insertFAVStmt, err := tx.PrepareContext(ctx, insertFAVSQL) if err != nil { return nil, err } defer insertFAVStmt.Close() var faIDs []string var faID int64 feedback.Info("Found %d fairway availabilities", len(fas)) for _, faRes := range fas { if !bnIds.contains(faRes.Bottleneck_id) { feedback.Warn("Bottleneck %s not found in database.", faRes.Bottleneck_id) continue } uniqueFa := uniqueFairwayAvailability{ BottleneckId: faRes.Bottleneck_id, Surdat: faRes.SURDAT, } var found bool if faID, found = fairwayAvailabilities[uniqueFa]; !found { err = insertFAStmt.QueryRowContext( ctx, faRes.POSITION, faRes.Bottleneck_id, faRes.SURDAT, faRes.Critical, faRes.Date_Info, faRes.Source, ).Scan(&faID) if err != nil { return nil, err } fairwayAvailabilities[uniqueFa] = faID } feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id) faIDs = append(faIDs, faRes.Bottleneck_id) if faRes.Bottleneck_PDFs != nil { bnPdfCount := 0 for _, bnPdfs := range faRes.Bottleneck_PDFs.PdfInfo { res, err := insertBnPdfsStmt.ExecContext( ctx, faID, bnPdfs.ProfilePdfFilename, bnPdfs.ProfilePdfURL, bnPdfs.PDF_Generation_Date, bnPdfs.Source, ) if err != nil { return nil, err } affected, err := res.RowsAffected() if err == nil { bnPdfCount += int(affected) } else { bnPdfCount++ } } feedback.Info("Add %d Pdfs", bnPdfCount) } if faRes.Effective_fairway_availability != nil { efaCount := 0 for _, efa := range faRes.Effective_fairway_availability.EffectiveFairwayAvailability { los := efa.Level_of_Service fgt := efa.Forecast_generation_time if efa.Forecast_generation_time.Status == pgtype.Undefined { fgt = pgtype.Timestamp{ Status: pgtype.Null, } } res, err := insertEFAStmt.ExecContext( ctx, faID, efa.Measure_date, string(*los), efa.Available_depth_value, efa.Available_width_value, efa.Water_level_value, efa.Measure_type, efa.Source, fgt.Get(), efa.Value_lifetime, ) if err != nil { return nil, err } affected, err := res.RowsAffected() if err == nil { efaCount += int(affected) } else { efaCount++ } } feedback.Info("Add %d Effective Fairway Availability", efaCount) } if faRes.Reference_values != nil { rvCount := 0 for _, fav := range faRes.Reference_values.ReferenceValue { res, err := insertFAVStmt.ExecContext( ctx, faID, fav.Level_of_Service, fav.Fairway_depth, fav.Fairway_width, fav.Fairway_radius, fav.Shallowest_spot_Lat, fav.Shallowest_spot_Lon, ) if err != nil { return nil, err } affected, err := res.RowsAffected() if err == nil { rvCount += int(affected) } else { rvCount++ } } feedback.Info("Add %d Reference Values", rvCount) } } return faIDs, nil } // Do executes the actual fairway availability import. func (fa *FairwayAvailability) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { fetch := func( ctx context.Context, tx *sql.Tx, bns bottlenecks, ) ([]*ifaf.FairwayAvailability, error) { latest, err := latestDate(ctx, tx) if err != nil { return nil, err } client := ifaf.NewFairwayAvailabilityService(fa.URL, fa.Insecure, nil) period := ifaf.RequestedPeriod{ Date_start: latest.Time, Date_end: time.Now(), } ids := ifaf.ArrayOfString{String: bns} req := &ifaf.Get_bottleneck_fa{ Bottleneck_id: &ids, Period: &period, } resp, err := client.Get_bottleneck_fa(req) if err != nil { return nil, err } if resp.Get_bottleneck_faResult == nil { return nil, errors.New("no fairway availabilities found") } result := resp.Get_bottleneck_faResult return result.FairwayAvailability, nil } return storeFairwayAvailability(ctx, conn, feedback, fetch) }