Mercurial > gemma
view pkg/imports/fa.go @ 2006:35acb7f9ae0c
Do anything else before expectedly failing role creation
Creating roles during database setup expectedly fails in case there
already is another gemma database in the cluster. Doing it at the end
of the transaction ensures it does not hide errors in other commands
in the script.
In passing, add the default admin via the designated view to ensure it
will become a correctly set up application user.
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Thu, 24 Jan 2019 17:23:43 +0100 |
parents | 295c82c5bc3e |
children | 25967829cf00 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Raimund Renkert <raimund.renkert@intevation.de> package imports import ( "context" "database/sql" "errors" "time" "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/soap/ifaf" ) // FairwayAvailability imports fairway availability data // from an IFAF SOAP service. type FairwayAvailability struct { // URL is the URL of the SOAP service. URL string `json:"url"` // Insecure indicates if HTTPS traffic // should validate certificates or not. Insecure bool `json:"insecure"` } // FAJobKind is import queue type identifier. const FAJobKind JobKind = "fa" const ( listBottlenecksSQL = ` SELECT bottleneck_id, responsible_country FROM waterway.bottlenecks WHERE responsible_country = users.current_user_country() AND staging_done = true ` latestMeasureDateSQL = ` SELECT measure_date FROM waterway.effective_fairway_availability ORDER BY measure_date DESC LIMIT 1 ` listFairwayAvailabilitySQL = ` SELECT fa.id, bn.bottleneck_id, fa.surdat FROM waterway.fairway_availability fa JOIN waterway.bottlenecks bn ON bn.id = fa.bottleneck_id ` insertFASQL = ` INSERT INTO waterway.fairway_availability ( position_code, bottleneck_id, surdat, critical, date_info, source_organization ) VALUES ( $1, (SELECT id FROM waterway.bottlenecks WHERE bottleneck_id = $2), $3, $4, $5, $6 ) RETURNING id` insertBnPdfsSQL = ` INSERT INTO waterway.bottleneck_pdfs ( fairway_availability_id, profile_pdf_filename, profile_pdf_url, pdf_generation_date, source_organization ) VALUES ( $1, $2, $3, $4, $5 ) ON CONFLICT ON CONSTRAINT bottleneck_pdfs_pkey DO NOTHING` insertEFASQL = ` INSERT INTO waterway.effective_fairway_availability ( fairway_availability_id, measure_date, level_of_service, available_depth_value, available_width_value, water_level_value, measure_type, source_organization, forecast_generation_time, value_lifetime ) VALUES ( $1, $2, (SELECT level_of_service FROM levels_of_service WHERE name = $3), $4, $5, $6, $7, $8, $9, $10 ) ON CONFLICT ON CONSTRAINT effective_fairway_availability_pkey DO NOTHING` insertFAVSQL = ` INSERT INTO waterway.fa_reference_values ( fairway_availability_id, level_of_service, fairway_depth, fairway_width, fairway_radius, shallowest_spot ) VALUES ( $1, (SELECT level_of_service FROM levels_of_service WHERE name = $2), $3, $4, $5, ST_MakePoint($6, $7)::geography )ON CONFLICT ON CONSTRAINT fa_reference_values_pkey DO NOTHING` ) type faJobCreator struct{} func init() { RegisterJobCreator(FAJobKind, faJobCreator{}) } func (faJobCreator) Description() string { return "fairway availability" } func (faJobCreator) Create(_ JobKind, data string) (Job, error) { fa := new(FairwayAvailability) if err := common.FromJSONString(data, fa); err != nil { return nil, err } return fa, nil } func (faJobCreator) Depends() []string { return []string{ "bottlenecks", "fairway_availability", "bottleneck_pdfs", "effective_fairway_availability", "fa_reference_values", "levels_of_service", } } func (faJobCreator) AutoAccept() bool { return true } // StageDone moves the imported fairway availablities out of the staging area. // Currently doing nothing. func (faJobCreator) StageDone(context.Context, *sql.Tx, int64) error { return nil } // CleanUp of a fairway availablities import is a NOP. func (*FairwayAvailability) CleanUp() error { return nil } // Do executes the actual fairway availability import. func (fa *FairwayAvailability) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { // Get available bottlenecks from database for use as filter in SOAP request var rows *sql.Rows rows, err := conn.QueryContext(ctx, listBottlenecksSQL) if err != nil { return nil, err } defer rows.Close() bottlenecks := []models.Bottleneck{} for rows.Next() { var bn models.Bottleneck if err = rows.Scan( &bn.ID, &bn.ResponsibleCountry, ); err != nil { return nil, err } bottlenecks = append(bottlenecks, bn) } if err = rows.Err(); err != nil { return nil, err } var faRows *sql.Rows faRows, err = conn.QueryContext(ctx, listFairwayAvailabilitySQL) if err != nil { return nil, err } fairwayAvailabilities := map[models.UniqueFairwayAvailability]int64{} for faRows.Next() { var id int64 var bnId string var sd time.Time if err = faRows.Scan( &id, &bnId, &sd, ); err != nil { return nil, err } key := models.UniqueFairwayAvailability{ BottleneckId: bnId, Surdat: sd, } fairwayAvailabilities[key] = id } if err = faRows.Err(); err != nil { return nil, err } var latestDate pgtype.Timestamp err = conn.QueryRowContext(ctx, latestMeasureDateSQL).Scan(&latestDate) switch { case err == sql.ErrNoRows: latestDate = pgtype.Timestamp{ // Fill Database with data of the last 5 days. Change this to a more useful value. Time: time.Now().AddDate(0, 0, -5), } case err != nil: return nil, err } faids, err := fa.doForFAs(ctx, bottlenecks, fairwayAvailabilities, latestDate, conn, feedback) if err != nil { feedback.Error("Error processing data: %s", err) } if len(faids) == 0 { feedback.Info("No new fairway availablity data found") return nil, nil } feedback.Info("Processed %d fairway availabilities", len(faids)) // TODO: needs to be filled more useful. summary := struct { FairwayAvailabilities []string `json:"fairwayAvailabilities"` }{ FairwayAvailabilities: faids, } return &summary, err } func (fa *FairwayAvailability) doForFAs( ctx context.Context, bottlenecks []models.Bottleneck, fairwayAvailabilities map[models.UniqueFairwayAvailability]int64, latestDate pgtype.Timestamp, conn *sql.Conn, feedback Feedback, ) ([]string, error) { start := time.Now() client := ifaf.NewFairwayAvailabilityService(fa.URL, fa.Insecure, nil) var bnIds []string for _, bn := range bottlenecks { bnIds = append(bnIds, bn.ID) } var period ifaf.RequestedPeriod period.Date_start = latestDate.Time period.Date_end = time.Now() ids := ifaf.ArrayOfString{ String: bnIds, } req := &ifaf.Get_bottleneck_fa{ Bottleneck_id: &ids, Period: &period, } resp, err := client.Get_bottleneck_fa(req) if err != nil { feedback.Error("%v", err) return nil, err } if resp.Get_bottleneck_faResult == nil { err := errors.New("no fairway availabilities found") return nil, err } result := resp.Get_bottleneck_faResult tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() insertFAStmt, err := tx.PrepareContext(ctx, insertFASQL) if err != nil { return nil, err } defer insertFAStmt.Close() insertBnPdfsStmt, err := tx.PrepareContext(ctx, insertBnPdfsSQL) if err != nil { return nil, err } defer insertBnPdfsStmt.Close() insertEFAStmt, err := tx.PrepareContext(ctx, insertEFASQL) if err != nil { return nil, err } defer insertEFAStmt.Close() insertFAVStmt, err := tx.PrepareContext(ctx, insertFAVSQL) if err != nil { return nil, err } defer insertFAVStmt.Close() var faIDs []string var faID int64 feedback.Info("Found %d fairway availabilities", len(result.FairwayAvailability)) for _, faRes := range result.FairwayAvailability { uniqueFa := models.UniqueFairwayAvailability{ BottleneckId: faRes.Bottleneck_id, Surdat: faRes.SURDAT, } var found bool if faID, found = fairwayAvailabilities[uniqueFa]; !found { err = insertFAStmt.QueryRowContext( ctx, faRes.POSITION, faRes.Bottleneck_id, faRes.SURDAT, faRes.Critical, faRes.Date_Info, faRes.Source, ).Scan(&faID) if err != nil { return nil, err } fairwayAvailabilities[uniqueFa] = faID } feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id) faIDs = append(faIDs, faRes.Bottleneck_id) if faRes.Bottleneck_PDFs != nil { bnPdfCount := 0 for _, bnPdfs := range faRes.Bottleneck_PDFs.PdfInfo { res, err := insertBnPdfsStmt.ExecContext( ctx, faID, bnPdfs.ProfilePdfFilename, bnPdfs.ProfilePdfURL, bnPdfs.PDF_Generation_Date, bnPdfs.Source, ) if err != nil { return nil, err } affected, err := res.RowsAffected() if err == nil { bnPdfCount += int(affected) } else { bnPdfCount++ } } feedback.Info("Add %d Pdfs", bnPdfCount) } if faRes.Effective_fairway_availability != nil { efaCount := 0 for _, efa := range faRes.Effective_fairway_availability.EffectiveFairwayAvailability { los := efa.Level_of_Service fgt := efa.Forecast_generation_time if efa.Forecast_generation_time.Status == pgtype.Undefined { fgt = pgtype.Timestamp{ Status: pgtype.Null, } } res, err := insertEFAStmt.ExecContext( ctx, faID, efa.Measure_date, string(*los), efa.Available_depth_value, efa.Available_width_value, efa.Water_level_value, efa.Measure_type, efa.Source, fgt.Get(), efa.Value_lifetime, ) if err != nil { return nil, err } affected, err := res.RowsAffected() if err == nil { efaCount += int(affected) } else { efaCount++ } } feedback.Info("Add %d Effective Fairway Availability", efaCount) } if faRes.Reference_values != nil { rvCount := 0 for _, fav := range faRes.Reference_values.ReferenceValue { res, err := insertFAVStmt.ExecContext( ctx, faID, fav.Level_of_Service, fav.Fairway_depth, fav.Fairway_width, fav.Fairway_radius, fav.Shallowest_spot_Lat, fav.Shallowest_spot_Lon, ) if err != nil { return nil, err } affected, err := res.RowsAffected() if err == nil { rvCount += int(affected) } else { rvCount++ } } feedback.Info("Add %d Reference Values", rvCount) } } feedback.Info("Storing fairway availabilities took %s", time.Since(start)) if err = tx.Commit(); err == nil { feedback.Info("Import of fairway availabilities was successful") } return faIDs, nil }