changeset 4131:49ec33a7f954

Merged request_hist_bns branch with improved bottleneck import.
author Sascha Wilde <wilde@intevation.de>
date Thu, 01 Aug 2019 18:53:51 +0200
parents 52f7264265bb (current diff) 980f12d3c766 (diff)
children ec8438712447
files
diffstat 5 files changed, 246 insertions(+), 312 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/bn.go	Thu Aug 01 17:02:09 2019 +0200
+++ b/pkg/imports/bn.go	Thu Aug 01 18:53:51 2019 +0200
@@ -24,6 +24,7 @@
 	"strings"
 	"time"
 
+	"gemma.intevation.de/gemma/pkg/common"
 	"gemma.intevation.de/gemma/pkg/pgxutils"
 	"gemma.intevation.de/gemma/pkg/soap/ifbn"
 	"github.com/jackc/pgx/pgtype"
@@ -89,69 +90,23 @@
 RETURNING id
 `
 
-	updateBottleneckSQL = `
-WITH
-bounds (b) AS (VALUES (isrs_fromText($7)), (isrs_fromText($8))),
-r AS (SELECT isrsrange(
-    (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY),
-    (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r)
-UPDATE waterway.bottlenecks SET (
-  bottleneck_id,
-  validity,
-  gauge_location,
-  objnam,
-  nobjnm,
-  stretch,
-  area,
-  rb,
-  lb,
-  responsible_country,
-  revisiting_time,
-  limiting,
-  date_info,
-  source_organization
-) = (
-  $2,
-  $3::tstzrange,
-  isrs_fromText($4),
-  $5,
-  $6,
-  (SELECT r FROM r),
-  ISRSrange_area(
-    ISRSrange_axis((SELECT r FROM r),
-                   $16),
-    (SELECT ST_Collect(CAST(area AS geometry))
-        FROM waterway.waterway_area)),
-  $9,
-  $10,
-  $11,
-  $12::smallint,
-  $13,
-  $14::timestamptz,
-  $15
-)
-WHERE id=$1
-RETURNING id
-`
-
+	// We only check for NOT NULL values, for correct compairison with
+	// values, which might be null (and then muyst not be compairt with `='
+	// but with `IS NULL' is comlicated and that we are checking more than
+	// only (bottleneck_id, validity, date_info) is luxury already.
 	findExactMatchBottleneckSQL = `
 WITH
-bounds (b) AS (VALUES (isrs_fromText($6)), (isrs_fromText($7))),
+bounds (b) AS (VALUES (isrs_fromText($4)), (isrs_fromText($5))),
 r AS (SELECT isrsrange(
-    (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY),
-    (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r)
+      (SELECT b FROM bounds ORDER BY b USING <~ FETCH FIRST ROW ONLY),
+      (SELECT b FROM bounds ORDER BY b USING >~ FETCH FIRST ROW ONLY)) AS r)
 SELECT id FROM waterway.bottlenecks
 WHERE (
   bottleneck_id,
   validity,
   gauge_location,
-  objnam,
-  nobjnm,
   stretch,
-  rb,
-  lb,
   responsible_country,
-  revisiting_time,
   limiting,
   date_info,
   source_organization,
@@ -160,58 +115,19 @@
   $1,
   $2::tstzrange,
   isrs_fromText($3),
-  $4,
-  $5,
   (SELECT r FROM r),
-  $8,
+  $6,
+  $7,
+  $8::timestamptz,
   $9,
-  $10,
-  $11::smallint,
-  $12,
-  $13::timestamptz,
-  $14,
   true
 )
 `
 
-	findMatchBottleneckSQL = `
+	findIntersectingBottleneckSQL = `
 SELECT id FROM waterway.bottlenecks
-WHERE (
-  bottleneck_id,
-  validity,
-  staging_done
-) = (
-  $1,
-  $2::tstzrange,
-  true
-)
-`
-	// FIXME: Is this still neede wtih the new simplified historization
-	// model?  My intuition is: no it isn't and should be removed, but we
-	// should double check before doing so... [sw]
-	//
-	// Alignment with gauge validity might have generated new entries
-	// for the same time range. Thus, remove the old ones
-	deleteObsoleteBNSQL = `
-DELETE FROM waterway.bottlenecks
-WHERE bottleneck_id = $1 AND validity <@ $2 AND id <> ALL($3)
-`
-
-	fixBNValiditySQL = `
-UPDATE waterway.bottlenecks SET
-   -- Set enddate of old entry to new startdate in case of overlap:
-  validity = validity - $2
-WHERE bottleneck_id = $1
-  AND validity && $2 AND NOT validity <@ $2
-`
-
-	deleteBottleneckMaterialSQL = `
-WITH del AS (
-  DELETE FROM waterway.bottlenecks_riverbed_materials
-  WHERE bottleneck_id = ANY($1)
-    AND riverbed <> ALL($2)
-  RETURNING riverbed)
-SELECT DISTINCT riverbed FROM del
+WHERE (bottleneck_id, staging_done) = ($1, true)
+  AND $2::tstzrange && validity
 `
 
 	insertBottleneckMaterialSQL = `
@@ -223,6 +139,25 @@
   unnest(CAST($2 AS varchar[])) AS materials
 ON CONFLICT (bottleneck_id, riverbed) DO NOTHING
 `
+
+	bnStageDoneDeleteSQL = `
+DELETE FROM waterway.bottlenecks WHERE id IN (
+  SELECT key
+  FROM import.track_imports
+  WHERE import_id = $1
+        AND relation = 'waterway.bottlenecks'::regclass
+	AND deletion
+)`
+
+	bnStageDoneSQL = `
+UPDATE waterway.bottlenecks SET staging_done = true
+WHERE id IN (
+  SELECT key
+  FROM import.track_imports
+  WHERE import_id = $1
+        AND relation = 'waterway.bottlenecks'::regclass
+	AND NOT deletion
+)`
 )
 
 type bnJobCreator struct{}
@@ -244,22 +179,16 @@
 	}
 }
 
-const (
-	bnStageDoneSQL = `
-UPDATE waterway.bottlenecks SET staging_done = true
-WHERE id IN (
-  SELECT key from import.track_imports
-  WHERE import_id = $1 AND
-        relation = 'waterway.bottlenecks'::regclass)`
-)
-
 // StageDone moves the imported bottleneck out of the staging area.
 func (bnJobCreator) StageDone(
 	ctx context.Context,
 	tx *sql.Tx,
 	id int64,
 ) error {
-	_, err := tx.ExecContext(ctx, bnStageDoneSQL, id)
+	_, err := tx.ExecContext(ctx, bnStageDoneDeleteSQL, id)
+	if err == nil {
+		_, err = tx.ExecContext(ctx, bnStageDoneSQL, id)
+	}
 	return err
 }
 
@@ -287,7 +216,9 @@
 	fetch := func() ([]*ifbn.BottleNeckType, error) {
 		client := ifbn.NewIBottleneckService(bn.URL, bn.Insecure, nil)
 
-		req := &ifbn.Export_bn_by_isrs{}
+		req := &ifbn.Export_bn_by_isrs{
+			Period: &ifbn.RequestedPeriod{Date_start: &time.Time{}},
+		}
 
 		resp, err := client.Export_bn_by_isrs(req)
 		if err != nil {
@@ -305,6 +236,48 @@
 	return storeBottlenecks(ctx, fetch, importID, conn, feedback, bn.Tolerance)
 }
 
+type bnStmts struct {
+	insert           *sql.Stmt
+	findExactMatch   *sql.Stmt
+	findIntersecting *sql.Stmt
+	insertMaterial   *sql.Stmt
+	track            *sql.Stmt
+}
+
+func (bs *bnStmts) close() {
+	for _, s := range []**sql.Stmt{
+		&bs.insert,
+		&bs.findExactMatch,
+		&bs.findIntersecting,
+		&bs.insertMaterial,
+		&bs.track,
+	} {
+		if *s != nil {
+			(*s).Close()
+			*s = nil
+		}
+	}
+}
+
+func (bs *bnStmts) prepare(ctx context.Context, conn *sql.Conn) error {
+	for _, x := range []struct {
+		sql  string
+		stmt **sql.Stmt
+	}{
+		{insertBottleneckSQL, &bs.insert},
+		{findExactMatchBottleneckSQL, &bs.findExactMatch},
+		{findIntersectingBottleneckSQL, &bs.findIntersecting},
+		{insertBottleneckMaterialSQL, &bs.insertMaterial},
+		{trackImportDeletionSQL, &bs.track},
+	} {
+		var err error
+		if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil {
+			return err
+		}
+	}
+	return nil
+}
+
 func storeBottlenecks(
 	ctx context.Context,
 	fetch func() ([]*ifbn.BottleNeckType, error),
@@ -322,46 +295,37 @@
 
 	feedback.Info("Found %d bottlenecks for import", len(bns))
 
-	var insertStmt, updateStmt, findExactMatchingBNStmt, findMatchingBNStmt,
-		deleteObsoleteBNStmt, fixValidityStmt, deleteMaterialStmt,
-		insertMaterialStmt, trackStmt *sql.Stmt
+	var bs bnStmts
+	defer bs.close()
 
-	for _, x := range []struct {
-		sql  string
-		stmt **sql.Stmt
-	}{
-		{insertBottleneckSQL, &insertStmt},
-		{updateBottleneckSQL, &updateStmt},
-		{findExactMatchBottleneckSQL, &findExactMatchingBNStmt},
-		{findMatchBottleneckSQL, &findMatchingBNStmt},
-		{deleteObsoleteBNSQL, &deleteObsoleteBNStmt},
-		{fixBNValiditySQL, &fixValidityStmt},
-		{deleteBottleneckMaterialSQL, &deleteMaterialStmt},
-		{insertBottleneckMaterialSQL, &insertMaterialStmt},
-		{trackImportSQL, &trackStmt},
-	} {
-		var err error
-		if *x.stmt, err = conn.PrepareContext(ctx, x.sql); err != nil {
-			return nil, err
-		}
-		defer (*x.stmt).Close()
+	if err := bs.prepare(ctx, conn); err != nil {
+		return nil, err
 	}
 
 	var nids []string
+	seenOldBnIds := make(map[int64]bool)
 
 	feedback.Info("Tolerance used to snap waterway axis: %g", tolerance)
 
+	tx, err := conn.BeginTx(ctx, nil)
+	if err != nil {
+		return nil, err
+	}
+	defer tx.Rollback()
+
 	for _, bn := range bns {
 		if err := storeBottleneck(
-			ctx, importID, conn, feedback, bn, &nids, tolerance,
-			insertStmt, updateStmt,
-			findExactMatchingBNStmt, findMatchingBNStmt,
-			deleteObsoleteBNStmt, fixValidityStmt,
-			deleteMaterialStmt, insertMaterialStmt,
-			trackStmt); err != nil {
+			tx, ctx, importID, feedback, bn,
+			&nids, seenOldBnIds, tolerance,
+			&bs,
+		); err != nil {
 			return nil, err
 		}
 	}
+	if err = tx.Commit(); err != nil {
+		return nil, err
+	}
+
 	if len(nids) == 0 {
 		return nil, UnchangedError("No new bottlenecks inserted")
 	}
@@ -377,18 +341,15 @@
 }
 
 func storeBottleneck(
+	tx *sql.Tx,
 	ctx context.Context,
 	importID int64,
-	conn *sql.Conn,
 	feedback Feedback,
 	bn *ifbn.BottleNeckType,
 	nids *[]string,
+	seenOldBnIds map[int64]bool,
 	tolerance float64,
-	insertStmt, updateStmt,
-	findExactMatchingBNStmt, findMatchingBNStmt,
-	deleteObsoleteBNStmt, fixValidityStmt,
-	deleteMaterialStmt, insertMaterialStmt,
-	trackStmt *sql.Stmt,
+	bs *bnStmts,
 ) error {
 	feedback.Info("Processing %s (%s)", bn.OBJNAM, bn.Bottleneck_id)
 
@@ -407,7 +368,7 @@
 		// version is advisable.
 		//
 		// Never the less, the current solution "just works" for the
-		// rtime being...  --  sw
+		// time being and reflects the upstream systems...  --  sw
 		feedback.Warn("No validity information, assuming infinite validity.")
 		tfrom.Set(time.Date(1970, time.January, 1, 0, 0, 0, 0, time.UTC))
 		uBound = pgtype.Unbounded
@@ -438,8 +399,13 @@
 		if t, ok := fromTo[toKey]; ok {
 			tto.Set(t)
 			uBound = pgtype.Exclusive
+			feedback.Info("Valid from %s to %s",
+				fromTo[fromKey].Format(common.TimeFormat),
+				fromTo[toKey].Format(common.TimeFormat))
 		} else {
 			uBound = pgtype.Unbounded
+			feedback.Info("Valid from %s",
+				fromTo[fromKey].Format(common.TimeFormat))
 		}
 	}
 
@@ -487,27 +453,28 @@
 
 	// Check if an bottleneck identical to the one we would insert already
 	// exists:
-	bns, err := findExactMatchingBNStmt.QueryContext(ctx,
+	var old int64
+	err := tx.StmtContext(ctx, bs.findExactMatch).QueryRowContext(
+		ctx,
 		bn.Bottleneck_id,
 		&validity,
 		bn.Fk_g_fid,
-		bn.OBJNAM,
-		bn.NOBJNM,
 		bn.From_ISRS, bn.To_ISRS,
-		rb,
-		lb,
 		country,
-		revisitingTime,
 		limiting,
 		bn.Date_Info,
 		bn.Source,
-	)
-
-	if err != nil {
+	).Scan(&old)
+	switch {
+	case err == sql.ErrNoRows:
+		// We dont have a matching old.
+	case err != nil:
 		return err
-	}
-	defer bns.Close()
-	if bns.Next() {
+	default:
+		// We could check if the materials are also matching -- but per
+		// specification the Date_Info would hvae to change on that kind of
+		// change anyway.  So actualy we ar alreayd checking more in dpth than
+		// required.
 		feedback.Info("unchanged")
 		return nil
 	}
@@ -516,55 +483,67 @@
 	// it can be used for debugging if something goes wrong...
 	feedback.Info("Range: from %s to %s", bn.From_ISRS, bn.To_ISRS)
 
-	// Check if an bottleneck with the same identity
-	// (bottleneck_id,validity) already exists:
-	// Check if an bottleneck identical to the one we would insert already
-	// exists:
-	var existing_bn_id *int64
-	err = findMatchingBNStmt.QueryRowContext(ctx,
-		bn.Bottleneck_id,
-		&validity,
-	).Scan(&existing_bn_id)
-	switch {
-	case err == sql.ErrNoRows:
-		existing_bn_id = nil
-	case err != nil:
-		// This is unexpected and propably a serious error
-		return err
-	}
-
-	tx, err := conn.BeginTx(ctx, nil)
+	// Check if the new bottleneck intersects with the validity of existing
+	// for the same bottleneck_id we consider this an update and mark the
+	// old data for deletion.
+	bns, err := tx.StmtContext(ctx, bs.findIntersecting).QueryContext(
+		ctx, bn.Bottleneck_id, &validity,
+	)
 	if err != nil {
 		return err
 	}
-	defer tx.Rollback()
+	defer bns.Close()
+
+	// Mark old intersecting bottleneck data for deletion.  Don't worry about
+	// materials, they will be deleted via cascading.
+	var oldBnIds []int64
+	for bns.Next() {
+		var oldID int64
+		err := bns.Scan(&oldID)
+		if err != nil {
+			return err
+		}
+		oldBnIds = append(oldBnIds, oldID)
+	}
+
+	if err := bns.Err(); err != nil {
+		return err
+	}
+
+	switch {
+	case len(oldBnIds) == 1:
+		feedback.Info("Bottelneck '%s' "+
+			"with intersecting validity already exists: "+
+			"UPDATING", bn.Bottleneck_id)
+	case len(oldBnIds) > 1:
+		// This case is unexpected and should only happen when historic
+		// data in the bottleneck service was changed subsequently...
+		// We handle it gracefully anyway, but warn.
+		feedback.Warn("More than one Bottelneck '%s' "+
+			"with intersecting validity already exists: "+
+			"REPLACING all of them!", bn.Bottleneck_id)
+	}
+
+	for _, oldID := range oldBnIds {
+		// It is possible, that two new bottlenecks intersect with the
+		// same old noe, therefor we have to handle duplicates in
+		// oldBnIds.
+		if !seenOldBnIds[oldID] {
+			if _, err := tx.StmtContext(ctx, bs.track).ExecContext(
+				ctx, importID, "waterway.bottlenecks", oldID, true,
+			); err != nil {
+				return err
+			}
+			seenOldBnIds[oldID] = true
+		}
+	}
 
 	var bnIds []int64
-	if existing_bn_id != nil {
-		feedback.Info("Bottelneck '%s' "+
-			"with matching validity already exists:"+
-			"UPDATING", bn.Bottleneck_id)
-		// Updating existnig BN data:
-		bns, err = tx.StmtContext(ctx, updateStmt).QueryContext(ctx,
-			existing_bn_id,
-			bn.Bottleneck_id,
-			&validity,
-			bn.Fk_g_fid,
-			bn.OBJNAM,
-			bn.NOBJNM,
-			bn.From_ISRS, bn.To_ISRS,
-			rb,
-			lb,
-			country,
-			revisitingTime,
-			limiting,
-			bn.Date_Info,
-			bn.Source,
-			tolerance,
-		)
-	} else {
-		// New BN data:
-		bns, err = tx.StmtContext(ctx, insertStmt).QueryContext(ctx,
+	// Add new BN data:
+	savepoint := Savepoint(ctx, tx, "insert_bottlenck")
+
+	err = savepoint(func() error {
+		bns, err := tx.StmtContext(ctx, bs.insert).QueryContext(ctx,
 			bn.Bottleneck_id,
 			&validity,
 			bn.Fk_g_fid,
@@ -578,114 +557,56 @@
 			limiting,
 			bn.Date_Info,
 			bn.Source,
-			tolerance,
-		)
-	}
+			tolerance)
+		if err != nil {
+			return err
+		}
+		defer bns.Close()
+		for bns.Next() {
+			var nid int64
+			if err := bns.Scan(&nid); err != nil {
+				return err
+			}
+			bnIds = append(bnIds, nid)
+		}
+		if err := bns.Err(); err != nil {
+			return err
+		}
+
+		// Add new materials
+		if len(bnIds) > 0 && materials != nil {
+			var (
+				pgBnIds     pgtype.Int8Array
+				pgMaterials pgtype.VarcharArray
+			)
+			pgBnIds.Set(bnIds)
+			pgMaterials.Set(materials)
+
+			// Insert riverbed materials
+			if _, err := tx.StmtContext(ctx, bs.insertMaterial).ExecContext(ctx,
+
+				&pgBnIds,
+				&pgMaterials,
+			); err != nil {
+				return err
+			}
+		}
+		return nil
+	})
 	if err != nil {
 		feedback.Warn(pgxutils.ReadableError{err}.Error())
 		return nil
 	}
-	defer bns.Close()
-	for bns.Next() {
-		var nid int64
-		if err := bns.Scan(&nid); err != nil {
+
+	// Only add new BN data to tracking for staging review.
+	for _, nid := range bnIds {
+		if _, err := tx.StmtContext(ctx, bs.track).ExecContext(
+			ctx, importID, "waterway.bottlenecks", nid, false,
+		); err != nil {
 			return err
 		}
-		bnIds = append(bnIds, nid)
-	}
-	if err := bns.Err(); err != nil {
-		feedback.Warn(pgxutils.ReadableError{err}.Error())
-		return nil
-	}
-	if len(bnIds) == 0 {
-		feedback.Warn(
-			"No gauge matching '%s' or given time available", bn.Fk_g_fid)
-		return nil
-	}
-
-	// Remove obsolete bottleneck version entries
-	var pgBnIds pgtype.Int8Array
-	pgBnIds.Set(bnIds)
-	if _, err = tx.StmtContext(ctx, deleteObsoleteBNStmt).ExecContext(ctx,
-		bn.Bottleneck_id,
-		&validity,
-		&pgBnIds,
-	); err != nil {
-		feedback.Warn(pgxutils.ReadableError{err}.Error())
-		if err2 := tx.Rollback(); err2 != nil {
-			return err2
-		}
-		return nil
-	}
-
-	// Set end of validity of old version to start of new version
-	// in case of overlap
-	if _, err = tx.StmtContext(ctx, fixValidityStmt).ExecContext(ctx,
-		bn.Bottleneck_id,
-		validity,
-	); err != nil {
-		feedback.Warn(pgxutils.ReadableError{err}.Error())
-		if err2 := tx.Rollback(); err2 != nil {
-			return err2
-		}
-		return nil
-	}
-
-	if materials != nil {
-		// Remove obsolete riverbed materials
-		var pgMaterials pgtype.VarcharArray
-		pgMaterials.Set(materials)
-		mtls, err := tx.StmtContext(ctx,
-			deleteMaterialStmt).QueryContext(ctx,
-			&pgBnIds,
-			&pgMaterials,
-		)
-		if err != nil {
-			return err
-		}
-		defer mtls.Close()
-		for mtls.Next() {
-			var delMat string
-			if err := mtls.Scan(&delMat); err != nil {
-				return err
-			}
-			feedback.Warn("Removed riverbed material %s", delMat)
-		}
-		if err := mtls.Err(); err != nil {
-			return err
-		}
-
-		// Insert riverbed materials
-		if _, err := tx.StmtContext(ctx, insertMaterialStmt).ExecContext(ctx,
-			&pgBnIds,
-			&pgMaterials,
-		); err != nil {
-			feedback.Warn("Failed to insert riverbed materials")
-			feedback.Warn(pgxutils.ReadableError{err}.Error())
-			return nil
-		}
 	}
 
-	// Only add new BN data to tracking for staging review.
-	//
-	// FIXME: Review for updated bottlenecks is currently not possible, as
-	// the update is done instantly in place.
-	if existing_bn_id == nil {
-		for _, nid := range bnIds {
-			if _, err := tx.StmtContext(ctx, trackStmt).ExecContext(
-				ctx, importID, "waterway.bottlenecks", nid,
-			); err != nil {
-				return err
-			}
-		}
-	}
-
-	if err = tx.Commit(); err != nil {
-		return err
-	}
-	// See above...
-	if existing_bn_id == nil {
-		*nids = append(*nids, bn.Bottleneck_id)
-	}
+	*nids = append(*nids, bn.Bottleneck_id)
 	return nil
 }
--- a/pkg/soap/ifbn/service.go	Thu Aug 01 17:02:09 2019 +0200
+++ b/pkg/soap/ifbn/service.go	Thu Aug 01 18:53:51 2019 +0200
@@ -36,7 +36,6 @@
 }
 
 type Export_bn_by_isrs struct {
-	//XMLName xml.Name `xml:"http://www.ris.eu/bottleneck/3.0 export_bn_by_isrs"`
 	XMLName xml.Name `xml:"http://www.ris.eu/bottleneck/3.0 export_bn_by_isrs"`
 
 	ISRS *ArrayOfISRSPair `xml:"ISRS,omitempty"`
@@ -826,13 +825,11 @@
 }
 
 type RequestedPeriod struct {
-	//XMLName xml.Name `xml:"http://www.ris.eu/wamos/common/3.0 RequestedPeriod"`
-
-	Date_start time.Time `xml:"Date_start,omitempty"`
+	Date_start *time.Time `xml:"http://www.ris.eu/wamos/common/3.0 Date_start,omitempty"`
 
-	Date_end time.Time `xml:"Date_end,omitempty"`
+	Date_end *time.Time `xml:"http://www.ris.eu/wamos/common/3.0 Date_end,omitempty"`
 
-	Value_interval int32 `xml:"Value_interval,omitempty"`
+	Value_interval *int32 `xml:"http://www.ris.eu/wamos/common/3.0 Value_interval,omitempty"`
 }
 
 type ArrayOfString struct {
--- a/schema/gemma.sql	Thu Aug 01 17:02:09 2019 +0200
+++ b/schema/gemma.sql	Thu Aug 01 18:53:51 2019 +0200
@@ -649,9 +649,6 @@
         id int PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
         bottleneck_id varchar NOT NULL,
         validity tstzrange NOT NULL CHECK (NOT isempty(validity)),
-        UNIQUE (bottleneck_id, validity),
-        EXCLUDE USING GiST (bottleneck_id WITH =, validity WITH &&)
-            DEFERRABLE INITIALLY DEFERRED,
         gauge_location isrs NOT NULL,
         objnam varchar,
         nobjnm varchar,
@@ -673,7 +670,12 @@
         -- XXX: Also an attribut of sounding result?
         date_info timestamp with time zone NOT NULL,
         source_organization varchar NOT NULL,
-        staging_done boolean NOT NULL DEFAULT false
+        staging_done boolean NOT NULL DEFAULT false,
+        UNIQUE (bottleneck_id, validity, staging_done),
+        EXCLUDE USING GiST (bottleneck_id WITH =,
+                            validity WITH &&,
+                            CAST(staging_done AS int) WITH =)
+            DEFERRABLE INITIALLY DEFERRED
     )
     CREATE CONSTRAINT TRIGGER waterway_bottlenecks_reference_gauge
         AFTER INSERT OR UPDATE OF gauge_location ON bottlenecks
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/schema/updates/1102/01.bn_constraint.sql	Thu Aug 01 18:53:51 2019 +0200
@@ -0,0 +1,14 @@
+ALTER TABLE waterway.bottlenecks
+    DROP CONSTRAINT IF EXISTS bottlenecks_bottleneck_id_validity_key,
+    DROP CONSTRAINT IF EXISTS bottlenecks_bottleneck_id_validity_excl,
+    DROP CONSTRAINT IF EXISTS bottlenecks_bottleneck_id_validity_staging_done_key,
+    DROP CONSTRAINT IF EXISTS bottlenecks_bottleneck_id_validity_staging_done_excl;
+
+ALTER TABLE waterway.bottlenecks
+    ADD CONSTRAINT bottlenecks_bottleneck_id_validity_staging_done_key
+        UNIQUE (bottleneck_id, validity, staging_done),
+    ADD CONSTRAINT bottlenecks_bottleneck_id_validity_staging_done_excl
+        EXCLUDE USING GiST (bottleneck_id WITH =,
+                            validity WITH &&,
+                            CAST(staging_done AS int) WITH =)
+        DEFERRABLE INITIALLY DEFERRED;
--- a/schema/version.sql	Thu Aug 01 17:02:09 2019 +0200
+++ b/schema/version.sql	Thu Aug 01 18:53:51 2019 +0200
@@ -1,1 +1,1 @@
-INSERT INTO gemma_schema_version(version) VALUES (1101);
+INSERT INTO gemma_schema_version(version) VALUES (1102);