Mercurial > gemma
changeset 1755:d3fe20a13339
Import: Filter fairway availabilities by period and resolve insert confilcts.
author | Raimund Renkert <raimund.renkert@intevation.de> |
---|---|
date | Thu, 10 Jan 2019 16:43:42 +0100 |
parents | 807569b08513 |
children | 295c82c5bc3e |
files | pkg/imports/fa.go pkg/models/fa.go pkg/soap/ifaf/service.go |
diffstat | 3 files changed, 173 insertions(+), 74 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/imports/fa.go Thu Jan 10 16:19:26 2019 +0100 +++ b/pkg/imports/fa.go Thu Jan 10 16:43:42 2019 +0100 @@ -48,6 +48,20 @@ WHERE responsible_country = users.current_user_country() AND staging_done = true ` + latestMeasureDateSQL = ` +SELECT + measure_date +FROM waterway.effective_fairway_availability +ORDER BY measure_date DESC LIMIT 1 +` + listFairwayAvailabilitySQL = ` +SELECT + fa.id, + bn.bottleneck_id, + fa.surdat +FROM waterway.fairway_availability fa +JOIN waterway.bottlenecks bn ON bn.id = fa.bottleneck_id +` insertFASQL = ` INSERT INTO waterway.fairway_availability ( position_code, @@ -79,7 +93,7 @@ $3, $4, $5 -)` +) ON CONFLICT ON CONSTRAINT bottleneck_pdfs_pkey DO NOTHING` insertEFASQL = ` INSERT INTO waterway.effective_fairway_availability ( fairway_availability_id, @@ -106,7 +120,7 @@ $8, $9, $10 -)` +) ON CONFLICT ON CONSTRAINT effective_fairway_availability_pkey DO NOTHING` insertFAVSQL = ` INSERT INTO waterway.fa_reference_values ( fairway_availability_id, @@ -125,7 +139,7 @@ $4, $5, ST_MakePoint($6, $7)::geography -)` +)ON CONFLICT ON CONSTRAINT fa_reference_values_pkey DO NOTHING` ) type faJobCreator struct{} @@ -197,12 +211,51 @@ } bottlenecks = append(bottlenecks, bn) } - if err = rows.Err(); err != nil { return nil, err } - faids, err := fa.doForFAs(ctx, bottlenecks, conn, feedback) + var faRows *sql.Rows + faRows, err = conn.QueryContext(ctx, listFairwayAvailabilitySQL) + if err != nil { + return nil, err + } + fairwayAvailabilities := map[models.UniqueFairwayAvailability]int64{} + for faRows.Next() { + var id int64 + var bnId string + var sd time.Time + if err = faRows.Scan( + &id, + &bnId, + &sd, + ); err != nil { + return nil, err + } + key := models.UniqueFairwayAvailability{ + BottleneckId: bnId, + Surdat: sd, + } + fairwayAvailabilities[key] = id + } + if err = faRows.Err(); err != nil { + return nil, err + } + + latestMeasureDateRow := conn.QueryRowContext(ctx, latestMeasureDateSQL) + var latestDate pgtype.Timestamp + err = latestMeasureDateRow.Scan(&latestDate) + switch { + case err == sql.ErrNoRows: + latestDate = pgtype.Timestamp{ + // Fill Database with data of the last 5 days. Change this to a more useful value. + Time: time.Now().AddDate(0, 0, -5), + } + case err != nil: + return nil, err + } + + faids, err := fa.doForFAs(ctx, bottlenecks, fairwayAvailabilities, latestDate, conn, feedback) if err != nil { feedback.Error("Error processing data: %s", err) } @@ -210,7 +263,7 @@ feedback.Info("No new fairway availablity data found") return nil, nil } - feedback.Info("Processed %d of %d bottlenecks", len(faids), len(bottlenecks)) + feedback.Info("Processed %d fairway availabilities", len(faids)) // TODO: needs to be filled more useful. summary := struct { FairwayAvailabilities []string `json:"fairwayAvailabilities"` @@ -223,6 +276,8 @@ func (fa *FairwayAvailability) doForFAs( ctx context.Context, bottlenecks []models.Bottleneck, + fairwayAvailabilities map[models.UniqueFairwayAvailability]int64, + latestDate pgtype.Timestamp, conn *sql.Conn, feedback Feedback, ) ([]string, error) { @@ -234,14 +289,17 @@ for _, bn := range bottlenecks { bnIds = append(bnIds, bn.ID) } + var period ifaf.RequestedPeriod + period.Date_start = latestDate.Time + period.Date_end = time.Now() ids := ifaf.ArrayOfString{ String: bnIds, } - // TODO: Filter by period. Period should start after latest measurement date. req := &ifaf.Get_bottleneck_fa{ Bottleneck_id: &ids, + Period: &period, } resp, err := client.Get_bottleneck_fa(req) if err != nil { @@ -287,80 +345,112 @@ var faID int64 feedback.Info("Found %d fairway availabilities", len(result.FairwayAvailability)) for _, faRes := range result.FairwayAvailability { - // TODO: high frequent requests lead to "duplicate key value violates unique constraint "fairway_availability_bottleneck_id_surdat_key" - // in the database. This has to be resolved. - // All data subsets can also ocure as duplicates! - err = insertFAStmt.QueryRowContext( - ctx, - faRes.POSITION, - faRes.Bottleneck_id, - faRes.SURDAT, - faRes.Critical, - faRes.Date_Info, - faRes.Source, - ).Scan(&faID) - if err != nil { - return nil, err + uniqueFa := models.UniqueFairwayAvailability{ + BottleneckId: faRes.Bottleneck_id, + Surdat: faRes.SURDAT, } - feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id) - faIDs = append(faIDs, faRes.Bottleneck_id) - for _, bnPdfs := range faRes.Bottleneck_PDFs.PdfInfo { - _, err = insertBnPdfsStmt.ExecContext( + if _, ok := fairwayAvailabilities[uniqueFa]; !ok { + err = insertFAStmt.QueryRowContext( ctx, - faID, - bnPdfs.ProfilePdfFilename, - bnPdfs.ProfilePdfURL, - bnPdfs.PDF_Generation_Date, - bnPdfs.Source, - ) + faRes.POSITION, + faRes.Bottleneck_id, + faRes.SURDAT, + faRes.Critical, + faRes.Date_Info, + faRes.Source, + ).Scan(&faID) if err != nil { return nil, err } - feedback.Info("Add %d Pdfs", len(faRes.Bottleneck_PDFs.PdfInfo)) + fairwayAvailabilities[uniqueFa] = faID + } else { + faID, _ = fairwayAvailabilities[uniqueFa] } - for _, efa := range faRes.Effective_fairway_availability.EffectiveFairwayAvailability { - los := efa.Level_of_Service - fgt := efa.Forecast_generation_time - if efa.Forecast_generation_time.Status == pgtype.Undefined { - fgt = pgtype.Timestamp{ - Status: pgtype.Null, + feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id) + faIDs = append(faIDs, faRes.Bottleneck_id) + if faRes.Bottleneck_PDFs != nil { + bnPdfCount := 0 + for _, bnPdfs := range faRes.Bottleneck_PDFs.PdfInfo { + res, err := insertBnPdfsStmt.ExecContext( + ctx, + faID, + bnPdfs.ProfilePdfFilename, + bnPdfs.ProfilePdfURL, + bnPdfs.PDF_Generation_Date, + bnPdfs.Source, + ) + if err != nil { + return nil, err + } + affected, err := res.RowsAffected() + if err == nil { + bnPdfCount += int(affected) + } else { + bnPdfCount++ } } - _, err = insertEFAStmt.ExecContext( - ctx, - faID, - efa.Measure_date, - string(*los), - efa.Available_depth_value, - efa.Available_width_value, - efa.Water_level_value, - efa.Measure_type, - efa.Source, - fgt, - efa.Value_lifetime, - ) - if err != nil { - return nil, err + feedback.Info("Add %d Pdfs", bnPdfCount) + } + if faRes.Effective_fairway_availability != nil { + efaCount := 0 + for _, efa := range faRes.Effective_fairway_availability.EffectiveFairwayAvailability { + los := efa.Level_of_Service + fgt := efa.Forecast_generation_time + if efa.Forecast_generation_time.Status == pgtype.Undefined { + fgt = pgtype.Timestamp{ + Status: pgtype.Null, + } + } + res, err := insertEFAStmt.ExecContext( + ctx, + faID, + efa.Measure_date, + string(*los), + efa.Available_depth_value, + efa.Available_width_value, + efa.Water_level_value, + efa.Measure_type, + efa.Source, + fgt.Get(), + efa.Value_lifetime, + ) + if err != nil { + return nil, err + } + affected, err := res.RowsAffected() + if err == nil { + efaCount += int(affected) + } else { + efaCount++ + } } - feedback.Info("Add %d Effective Fairway Availability", len( - faRes.Effective_fairway_availability.EffectiveFairwayAvailability)) + feedback.Info("Add %d Effective Fairway Availability", efaCount) } - for _, fav := range faRes.Reference_values.ReferenceValue { - _, err = insertFAVStmt.ExecContext( - ctx, - faID, - fav.Level_of_Service, - fav.Fairway_depth, - fav.Fairway_width, - fav.Fairway_radius, - fav.Shallowest_spot_Lat, - fav.Shallowest_spot_Lon, - ) - if err != nil { - return nil, err + + if faRes.Reference_values != nil { + rvCount := 0 + for _, fav := range faRes.Reference_values.ReferenceValue { + res, err := insertFAVStmt.ExecContext( + ctx, + faID, + fav.Level_of_Service, + fav.Fairway_depth, + fav.Fairway_width, + fav.Fairway_radius, + fav.Shallowest_spot_Lat, + fav.Shallowest_spot_Lon, + ) + if err != nil { + return nil, err + } + affected, err := res.RowsAffected() + if err == nil { + rvCount += int(affected) + } else { + rvCount++ + } } - feedback.Info("Add %d Reference Values", - len(faRes.Reference_values.ReferenceValue)) + feedback.Info("Add %d Reference Values", rvCount) } } feedback.Info("Storing fairway availabilities took %s", time.Since(start))
--- a/pkg/models/fa.go Thu Jan 10 16:19:26 2019 +0100 +++ b/pkg/models/fa.go Thu Jan 10 16:43:42 2019 +0100 @@ -13,7 +13,11 @@ package models -import "gemma.intevation.de/gemma/pkg/common" +import ( + "time" + + "gemma.intevation.de/gemma/pkg/common" +) // FairwayAvailabilityImport contains data used to define the endpoint type FairwayAvailabilityImport struct { @@ -23,3 +27,8 @@ // Attributes are optional attributes. Attributes common.Attributes `json:"attributes,omitempty"` } + +type UniqueFairwayAvailability struct { + BottleneckId string + Surdat time.Time +}
--- a/pkg/soap/ifaf/service.go Thu Jan 10 16:19:26 2019 +0100 +++ b/pkg/soap/ifaf/service.go Thu Jan 10 16:43:42 2019 +0100 @@ -857,11 +857,11 @@ } type RequestedPeriod struct { - Date_start time.Time `xml:"Date_start,omitempty"` + Date_start time.Time `xml:"http://www.ris.eu/wamos/common/3.0 Date_start,omitempty"` - Date_end time.Time `xml:"Date_end,omitempty"` + Date_end time.Time `xml:"http://www.ris.eu/wamos/common/3.0 Date_end,omitempty"` - Value_interval int32 `xml:"Value_interval,omitempty"` + Value_interval int32 `xml:"http://www.ris.eu/wamos/common/3.0 Value_interval,omitempty"` } type ArrayOfString struct {