changeset 1755:d3fe20a13339

Import: Filter fairway availabilities by period and resolve insert confilcts.
author Raimund Renkert <raimund.renkert@intevation.de>
date Thu, 10 Jan 2019 16:43:42 +0100
parents 807569b08513
children 295c82c5bc3e
files pkg/imports/fa.go pkg/models/fa.go pkg/soap/ifaf/service.go
diffstat 3 files changed, 173 insertions(+), 74 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/fa.go	Thu Jan 10 16:19:26 2019 +0100
+++ b/pkg/imports/fa.go	Thu Jan 10 16:43:42 2019 +0100
@@ -48,6 +48,20 @@
 WHERE responsible_country = users.current_user_country()
   AND staging_done = true
 `
+	latestMeasureDateSQL = `
+SELECT
+	measure_date
+FROM waterway.effective_fairway_availability
+ORDER BY measure_date DESC LIMIT 1
+`
+	listFairwayAvailabilitySQL = `
+SELECT
+  fa.id,
+  bn.bottleneck_id,
+  fa.surdat
+FROM waterway.fairway_availability fa
+JOIN waterway.bottlenecks bn ON bn.id = fa.bottleneck_id
+`
 	insertFASQL = `
 INSERT INTO waterway.fairway_availability (
   position_code,
@@ -79,7 +93,7 @@
   $3,
   $4,
   $5
-)`
+) ON CONFLICT ON CONSTRAINT bottleneck_pdfs_pkey DO NOTHING`
 	insertEFASQL = `
 INSERT INTO waterway.effective_fairway_availability (
   fairway_availability_id,
@@ -106,7 +120,7 @@
   $8,
   $9,
   $10
-)`
+) ON CONFLICT ON CONSTRAINT effective_fairway_availability_pkey DO NOTHING`
 	insertFAVSQL = `
 INSERT INTO waterway.fa_reference_values (
   fairway_availability_id,
@@ -125,7 +139,7 @@
   $4,
   $5,
   ST_MakePoint($6, $7)::geography
-)`
+)ON CONFLICT ON CONSTRAINT fa_reference_values_pkey DO NOTHING`
 )
 
 type faJobCreator struct{}
@@ -197,12 +211,51 @@
 		}
 		bottlenecks = append(bottlenecks, bn)
 	}
-
 	if err = rows.Err(); err != nil {
 		return nil, err
 	}
 
-	faids, err := fa.doForFAs(ctx, bottlenecks, conn, feedback)
+	var faRows *sql.Rows
+	faRows, err = conn.QueryContext(ctx, listFairwayAvailabilitySQL)
+	if err != nil {
+		return nil, err
+	}
+	fairwayAvailabilities := map[models.UniqueFairwayAvailability]int64{}
+	for faRows.Next() {
+		var id int64
+		var bnId string
+		var sd time.Time
+		if err = faRows.Scan(
+			&id,
+			&bnId,
+			&sd,
+		); err != nil {
+			return nil, err
+		}
+		key := models.UniqueFairwayAvailability{
+			BottleneckId: bnId,
+			Surdat:       sd,
+		}
+		fairwayAvailabilities[key] = id
+	}
+	if err = faRows.Err(); err != nil {
+		return nil, err
+	}
+
+	latestMeasureDateRow := conn.QueryRowContext(ctx, latestMeasureDateSQL)
+	var latestDate pgtype.Timestamp
+	err = latestMeasureDateRow.Scan(&latestDate)
+	switch {
+	case err == sql.ErrNoRows:
+		latestDate = pgtype.Timestamp{
+			// Fill Database with data of the last 5 days. Change this to a more useful value.
+			Time: time.Now().AddDate(0, 0, -5),
+		}
+	case err != nil:
+		return nil, err
+	}
+
+	faids, err := fa.doForFAs(ctx, bottlenecks, fairwayAvailabilities, latestDate, conn, feedback)
 	if err != nil {
 		feedback.Error("Error processing data: %s", err)
 	}
@@ -210,7 +263,7 @@
 		feedback.Info("No new fairway availablity data found")
 		return nil, nil
 	}
-	feedback.Info("Processed %d of %d bottlenecks", len(faids), len(bottlenecks))
+	feedback.Info("Processed %d fairway availabilities", len(faids))
 	// TODO: needs to be filled more useful.
 	summary := struct {
 		FairwayAvailabilities []string `json:"fairwayAvailabilities"`
@@ -223,6 +276,8 @@
 func (fa *FairwayAvailability) doForFAs(
 	ctx context.Context,
 	bottlenecks []models.Bottleneck,
+	fairwayAvailabilities map[models.UniqueFairwayAvailability]int64,
+	latestDate pgtype.Timestamp,
 	conn *sql.Conn,
 	feedback Feedback,
 ) ([]string, error) {
@@ -234,14 +289,17 @@
 	for _, bn := range bottlenecks {
 		bnIds = append(bnIds, bn.ID)
 	}
+	var period ifaf.RequestedPeriod
+	period.Date_start = latestDate.Time
+	period.Date_end = time.Now()
 
 	ids := ifaf.ArrayOfString{
 		String: bnIds,
 	}
 
-	// TODO: Filter by period. Period should start after latest measurement date.
 	req := &ifaf.Get_bottleneck_fa{
 		Bottleneck_id: &ids,
+		Period:        &period,
 	}
 	resp, err := client.Get_bottleneck_fa(req)
 	if err != nil {
@@ -287,80 +345,112 @@
 	var faID int64
 	feedback.Info("Found %d fairway availabilities", len(result.FairwayAvailability))
 	for _, faRes := range result.FairwayAvailability {
-		// TODO: high frequent requests lead to "duplicate key value violates unique constraint "fairway_availability_bottleneck_id_surdat_key"
-		// in the database. This has to be resolved.
-		// All data subsets can also ocure as duplicates!
-		err = insertFAStmt.QueryRowContext(
-			ctx,
-			faRes.POSITION,
-			faRes.Bottleneck_id,
-			faRes.SURDAT,
-			faRes.Critical,
-			faRes.Date_Info,
-			faRes.Source,
-		).Scan(&faID)
-		if err != nil {
-			return nil, err
+		uniqueFa := models.UniqueFairwayAvailability{
+			BottleneckId: faRes.Bottleneck_id,
+			Surdat:       faRes.SURDAT,
 		}
-		feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id)
-		faIDs = append(faIDs, faRes.Bottleneck_id)
-		for _, bnPdfs := range faRes.Bottleneck_PDFs.PdfInfo {
-			_, err = insertBnPdfsStmt.ExecContext(
+		if _, ok := fairwayAvailabilities[uniqueFa]; !ok {
+			err = insertFAStmt.QueryRowContext(
 				ctx,
-				faID,
-				bnPdfs.ProfilePdfFilename,
-				bnPdfs.ProfilePdfURL,
-				bnPdfs.PDF_Generation_Date,
-				bnPdfs.Source,
-			)
+				faRes.POSITION,
+				faRes.Bottleneck_id,
+				faRes.SURDAT,
+				faRes.Critical,
+				faRes.Date_Info,
+				faRes.Source,
+			).Scan(&faID)
 			if err != nil {
 				return nil, err
 			}
-			feedback.Info("Add %d Pdfs", len(faRes.Bottleneck_PDFs.PdfInfo))
+			fairwayAvailabilities[uniqueFa] = faID
+		} else {
+			faID, _ = fairwayAvailabilities[uniqueFa]
 		}
-		for _, efa := range faRes.Effective_fairway_availability.EffectiveFairwayAvailability {
-			los := efa.Level_of_Service
-			fgt := efa.Forecast_generation_time
-			if efa.Forecast_generation_time.Status == pgtype.Undefined {
-				fgt = pgtype.Timestamp{
-					Status: pgtype.Null,
+		feedback.Info("Processing for Bottleneck %s", faRes.Bottleneck_id)
+		faIDs = append(faIDs, faRes.Bottleneck_id)
+		if faRes.Bottleneck_PDFs != nil {
+			bnPdfCount := 0
+			for _, bnPdfs := range faRes.Bottleneck_PDFs.PdfInfo {
+				res, err := insertBnPdfsStmt.ExecContext(
+					ctx,
+					faID,
+					bnPdfs.ProfilePdfFilename,
+					bnPdfs.ProfilePdfURL,
+					bnPdfs.PDF_Generation_Date,
+					bnPdfs.Source,
+				)
+				if err != nil {
+					return nil, err
+				}
+				affected, err := res.RowsAffected()
+				if err == nil {
+					bnPdfCount += int(affected)
+				} else {
+					bnPdfCount++
 				}
 			}
-			_, err = insertEFAStmt.ExecContext(
-				ctx,
-				faID,
-				efa.Measure_date,
-				string(*los),
-				efa.Available_depth_value,
-				efa.Available_width_value,
-				efa.Water_level_value,
-				efa.Measure_type,
-				efa.Source,
-				fgt,
-				efa.Value_lifetime,
-			)
-			if err != nil {
-				return nil, err
+			feedback.Info("Add %d Pdfs", bnPdfCount)
+		}
+		if faRes.Effective_fairway_availability != nil {
+			efaCount := 0
+			for _, efa := range faRes.Effective_fairway_availability.EffectiveFairwayAvailability {
+				los := efa.Level_of_Service
+				fgt := efa.Forecast_generation_time
+				if efa.Forecast_generation_time.Status == pgtype.Undefined {
+					fgt = pgtype.Timestamp{
+						Status: pgtype.Null,
+					}
+				}
+				res, err := insertEFAStmt.ExecContext(
+					ctx,
+					faID,
+					efa.Measure_date,
+					string(*los),
+					efa.Available_depth_value,
+					efa.Available_width_value,
+					efa.Water_level_value,
+					efa.Measure_type,
+					efa.Source,
+					fgt.Get(),
+					efa.Value_lifetime,
+				)
+				if err != nil {
+					return nil, err
+				}
+				affected, err := res.RowsAffected()
+				if err == nil {
+					efaCount += int(affected)
+				} else {
+					efaCount++
+				}
 			}
-			feedback.Info("Add %d Effective Fairway Availability", len(
-				faRes.Effective_fairway_availability.EffectiveFairwayAvailability))
+			feedback.Info("Add %d Effective Fairway Availability", efaCount)
 		}
-		for _, fav := range faRes.Reference_values.ReferenceValue {
-			_, err = insertFAVStmt.ExecContext(
-				ctx,
-				faID,
-				fav.Level_of_Service,
-				fav.Fairway_depth,
-				fav.Fairway_width,
-				fav.Fairway_radius,
-				fav.Shallowest_spot_Lat,
-				fav.Shallowest_spot_Lon,
-			)
-			if err != nil {
-				return nil, err
+
+		if faRes.Reference_values != nil {
+			rvCount := 0
+			for _, fav := range faRes.Reference_values.ReferenceValue {
+				res, err := insertFAVStmt.ExecContext(
+					ctx,
+					faID,
+					fav.Level_of_Service,
+					fav.Fairway_depth,
+					fav.Fairway_width,
+					fav.Fairway_radius,
+					fav.Shallowest_spot_Lat,
+					fav.Shallowest_spot_Lon,
+				)
+				if err != nil {
+					return nil, err
+				}
+				affected, err := res.RowsAffected()
+				if err == nil {
+					rvCount += int(affected)
+				} else {
+					rvCount++
+				}
 			}
-			feedback.Info("Add %d Reference Values",
-				len(faRes.Reference_values.ReferenceValue))
+			feedback.Info("Add %d Reference Values", rvCount)
 		}
 	}
 	feedback.Info("Storing fairway availabilities took %s", time.Since(start))
--- a/pkg/models/fa.go	Thu Jan 10 16:19:26 2019 +0100
+++ b/pkg/models/fa.go	Thu Jan 10 16:43:42 2019 +0100
@@ -13,7 +13,11 @@
 
 package models
 
-import "gemma.intevation.de/gemma/pkg/common"
+import (
+	"time"
+
+	"gemma.intevation.de/gemma/pkg/common"
+)
 
 // FairwayAvailabilityImport contains data used to define the endpoint
 type FairwayAvailabilityImport struct {
@@ -23,3 +27,8 @@
 	// Attributes are optional attributes.
 	Attributes common.Attributes `json:"attributes,omitempty"`
 }
+
+type UniqueFairwayAvailability struct {
+	BottleneckId string
+	Surdat       time.Time
+}
--- a/pkg/soap/ifaf/service.go	Thu Jan 10 16:19:26 2019 +0100
+++ b/pkg/soap/ifaf/service.go	Thu Jan 10 16:43:42 2019 +0100
@@ -857,11 +857,11 @@
 }
 
 type RequestedPeriod struct {
-	Date_start time.Time `xml:"Date_start,omitempty"`
+	Date_start time.Time `xml:"http://www.ris.eu/wamos/common/3.0 Date_start,omitempty"`
 
-	Date_end time.Time `xml:"Date_end,omitempty"`
+	Date_end time.Time `xml:"http://www.ris.eu/wamos/common/3.0 Date_end,omitempty"`
 
-	Value_interval int32 `xml:"Value_interval,omitempty"`
+	Value_interval int32 `xml:"http://www.ris.eu/wamos/common/3.0 Value_interval,omitempty"`
 }
 
 type ArrayOfString struct {