changeset 4027:b17453420eff

Avoid doing one SELECT per line in import file When the import is run by a waterway admin (more row level security processing involved than as sys_admin), this approximately halves the overall duration of the import
author Tom Gottfried <tom@intevation.de>
date Mon, 22 Jul 2019 19:19:00 +0200
parents 82037bbd2c7c
children 040a5dc95eb9
files pkg/imports/agm.go
diffstat 1 files changed, 90 insertions(+), 55 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/agm.go	Mon Jul 22 18:35:14 2019 +0200
+++ b/pkg/imports/agm.go	Mon Jul 22 19:19:00 2019 +0200
@@ -152,12 +152,12 @@
 const (
 	agmSelectSQL = `
 SELECT
-  id,
   country_code,
   sender,
   language_code,
   date_issue,
   reference_code,
+  measure_date,
   water_level,
   date_info,
   source_organization
@@ -165,7 +165,7 @@
 WHERE
   location
     = ($1::char(2), $2::char(3), $3::char(5), $4::char(5), $5::int)
-  AND measure_date = $6
+  AND measure_date BETWEEN $6 AND $7
   AND staging_done
 `
 
@@ -314,6 +314,7 @@
 
 	agmLines := []*agmLine{}
 	ignored := 0
+	mdMinMax := map[models.Isrs][2]time.Time{}
 
 lines:
 	for line := 1; ; line++ {
@@ -368,6 +369,16 @@
 		if err != nil {
 			return nil, fmt.Errorf("Invalid 'measure_date' line %d: %v", line, err)
 		}
+		if _, hasGid := mdMinMax[*gid]; hasGid {
+			if md.Before(mdMinMax[*gid][0]) {
+				mdMinMax[*gid] = [2]time.Time{md, mdMinMax[*gid][1]}
+			}
+			if md.After(mdMinMax[*gid][1]) {
+				mdMinMax[*gid] = [2]time.Time{mdMinMax[*gid][0], md}
+			}
+		} else {
+			mdMinMax[*gid] = [2]time.Time{md, md}
+		}
 
 		newSender := agm.Originator
 		newCountryCode := gid.CountryCode
@@ -399,48 +410,17 @@
 		))
 	}
 
+	oldGMLines := map[models.Isrs]map[int64]*agmLine{}
+	for gid, minMax := range mdMinMax {
+		oldGMLines[gid], err = getOldGMLines(
+			ctx, selectStmt, gid, minMax[0], minMax[1])
+		if err != nil {
+			return nil, err
+		}
+	}
+
 agmLines:
 	for _, line := range agmLines {
-		var (
-			oldID                 int64
-			oldCountryCode        string
-			oldSender             string
-			oldLanguageCode       string
-			oldDateIssue          time.Time
-			oldReferenceCode      string
-			oldValue              float64
-			oldDateInfo           time.Time
-			oldSourceOrganization string
-		)
-
-		err = selectStmt.QueryRowContext(
-			ctx,
-			line.Location.CountryCode,
-			line.Location.LoCode,
-			line.Location.FairwaySection,
-			line.Location.Orc,
-			line.Location.Hectometre,
-			line.MeasureDate.Time,
-		).Scan(
-			&oldID,
-			&oldCountryCode,
-			&oldSender,
-			&oldLanguageCode,
-			&oldDateIssue,
-			&oldReferenceCode,
-			&oldValue,
-			&oldDateInfo,
-			&oldSourceOrganization,
-		)
-
-		var newEntry bool
-		switch {
-		case err == sql.ErrNoRows:
-			// Complete new one
-			newEntry = true
-		case err != nil:
-			return nil, err
-		}
 
 		switch err := func() error {
 			tx, err := conn.BeginTx(ctx, nil)
@@ -495,21 +475,9 @@
 			MeasureDate: line.MeasureDate,
 		}
 
-		if newEntry {
+		if o, hasOld := oldGMLines[line.Location][line.MeasureDate.Time.Unix()]; !hasOld {
 			ase.Versions = []*agmLine{line}
 		} else {
-			o := newAGMLine(
-				line.Location,
-				oldCountryCode,
-				oldSender,
-				oldLanguageCode,
-				oldDateIssue,
-				oldReferenceCode,
-				line.MeasureDate.Time,
-				oldValue,
-				oldDateInfo,
-				oldSourceOrganization,
-			)
 			// Ignore if there is no diff.
 			if !line.hasDiff(o) {
 				continue
@@ -529,6 +497,73 @@
 	return entries, nil
 }
 
+func getOldGMLines(
+	ctx context.Context,
+	stmt *sql.Stmt,
+	location models.Isrs,
+	from time.Time,
+	to time.Time,
+) (map[int64]*agmLine, error) {
+	var (
+		oldCountryCode        string
+		oldSender             string
+		oldLanguageCode       string
+		oldDateIssue          time.Time
+		oldReferenceCode      string
+		oldMeasureDate        time.Time
+		oldValue              float64
+		oldDateInfo           time.Time
+		oldSourceOrganization string
+	)
+	gmLines := map[int64]*agmLine{}
+
+	gms, err := stmt.QueryContext(
+		ctx,
+		location.CountryCode,
+		location.LoCode,
+		location.FairwaySection,
+		location.Orc,
+		location.Hectometre,
+		from,
+		to,
+	)
+	if err != nil {
+		return nil, err
+	}
+	defer gms.Close()
+	for gms.Next() {
+		if err = gms.Scan(
+			&oldCountryCode,
+			&oldSender,
+			&oldLanguageCode,
+			&oldDateIssue,
+			&oldReferenceCode,
+			&oldMeasureDate,
+			&oldValue,
+			&oldDateInfo,
+			&oldSourceOrganization,
+		); err != nil {
+			return nil, err
+		}
+		gmLines[oldMeasureDate.Unix()] = newAGMLine(
+			location,
+			oldCountryCode,
+			oldSender,
+			oldLanguageCode,
+			oldDateIssue,
+			oldReferenceCode,
+			oldMeasureDate,
+			oldValue,
+			oldDateInfo,
+			oldSourceOrganization,
+		)
+	}
+	if err = gms.Err(); err != nil {
+		return nil, err
+	}
+	return gmLines, nil
+}
+
 func newAGMLine(
 	location models.Isrs,
 	countryCode string,