changeset 3219:4acbee65275d

Import queue: Split locked dependencies in exclusively and multiple uses.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 09 May 2019 12:49:53 +0200
parents c2b65a549c6f
children 56b297592c0a
files pkg/imports/agm.go pkg/imports/bn.go pkg/imports/dma.go pkg/imports/dmv.go pkg/imports/fa.go pkg/imports/fd.go pkg/imports/gm.go pkg/imports/queue.go pkg/imports/sec.go pkg/imports/sr.go pkg/imports/st.go pkg/imports/ubn.go pkg/imports/ufa.go pkg/imports/ugm.go pkg/imports/wa.go pkg/imports/wg.go pkg/imports/wp.go pkg/imports/wx.go
diffstat 18 files changed, 80 insertions(+), 65 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/agm.go	Thu May 09 12:25:10 2019 +0200
+++ b/pkg/imports/agm.go	Thu May 09 12:49:53 2019 +0200
@@ -55,10 +55,10 @@
 
 func (agmJobCreator) Create() Job { return new(ApprovedGaugeMeasurements) }
 
-func (agmJobCreator) Depends() []string {
-	return []string{
-		"gauges",
-		"gauge_measurements",
+func (agmJobCreator) Depends() [2][]string {
+	return [2][]string{
+		{"gauge_measurements"},
+		{"gauges"},
 	}
 }
 
--- a/pkg/imports/bn.go	Thu May 09 12:25:10 2019 +0200
+++ b/pkg/imports/bn.go	Thu May 09 12:49:53 2019 +0200
@@ -104,10 +104,10 @@
 
 func (bnJobCreator) Create() Job { return new(Bottleneck) }
 
-func (bnJobCreator) Depends() []string {
-	return []string{
-		"gauges",
-		"bottlenecks",
+func (bnJobCreator) Depends() [2][]string {
+	return [2][]string{
+		{"bottlenecks", "bottlenecks_riverbed_materials"},
+		{"gauges"},
 	}
 }
 
--- a/pkg/imports/dma.go	Thu May 09 12:25:10 2019 +0200
+++ b/pkg/imports/dma.go	Thu May 09 12:49:53 2019 +0200
@@ -56,9 +56,10 @@
 
 func (dmaJobCreator) Create() Job { return new(DistanceMarksAshore) }
 
-func (dmaJobCreator) Depends() []string {
-	return []string{
-		"distance_marks",
+func (dmaJobCreator) Depends() [2][]string {
+	return [2][]string{
+		{"distance_marks"},
+		{},
 	}
 }
 
--- a/pkg/imports/dmv.go	Thu May 09 12:25:10 2019 +0200
+++ b/pkg/imports/dmv.go	Thu May 09 12:49:53 2019 +0200
@@ -48,9 +48,10 @@
 
 func (dmvJobCreator) Create() Job { return new(DistanceMarksVirtual) }
 
-func (dmvJobCreator) Depends() []string {
-	return []string{
-		"distance_marks_virtual",
+func (dmvJobCreator) Depends() [2][]string {
+	return [2][]string{
+		{"distance_marks_virtual"},
+		{},
 	}
 }
 
--- a/pkg/imports/fa.go	Thu May 09 12:25:10 2019 +0200
+++ b/pkg/imports/fa.go	Thu May 09 12:49:53 2019 +0200
@@ -161,14 +161,11 @@
 
 func (faJobCreator) Create() Job { return new(FairwayAvailability) }
 
-func (faJobCreator) Depends() []string {
-	return []string{
-		"bottlenecks",
-		"fairway_availability",
-		"bottleneck_pdfs",
-		"effective_fairway_availability",
-		"fa_reference_values",
-		"levels_of_service",
+func (faJobCreator) Depends() [2][]string {
+	return [2][]string{
+		{"effective_fairway_availability", "fa_reference_values",
+			"bottleneck_pdfs", "fairway_availability"},
+		{"bottlenecks", "levels_of_service"},
 	}
 }
 
--- a/pkg/imports/fd.go	Thu May 09 12:25:10 2019 +0200
+++ b/pkg/imports/fd.go	Thu May 09 12:49:53 2019 +0200
@@ -80,9 +80,10 @@
 
 func (fdJobCreator) Create() Job { return new(FairwayDimension) }
 
-func (fdJobCreator) Depends() []string {
-	return []string{
-		"fairway_dimensions",
+func (fdJobCreator) Depends() [2][]string {
+	return [2][]string{
+		{"fairway_dimensions"},
+		{"level_of_service"},
 	}
 }
 
--- a/pkg/imports/gm.go	Thu May 09 12:25:10 2019 +0200
+++ b/pkg/imports/gm.go	Thu May 09 12:49:53 2019 +0200
@@ -92,10 +92,10 @@
 
 func (gmJobCreator) Create() Job { return new(GaugeMeasurement) }
 
-func (gmJobCreator) Depends() []string {
-	return []string{
-		"gauges",
-		"gauge_measurements",
+func (gmJobCreator) Depends() [2][]string {
+	return [2][]string{
+		{"gauge_measurements"},
+		{"gauges"},
 	}
 }
 
--- a/pkg/imports/queue.go	Thu May 09 12:25:10 2019 +0200
+++ b/pkg/imports/queue.go	Thu May 09 12:49:53 2019 +0200
@@ -75,10 +75,12 @@
 		Description() string
 		// Create build the actual job.
 		Create() Job
-		// Depends returns a list of ressources locked by this type of import.
+		// Depends returns two lists of ressources locked by this type of import.
 		// Imports are run concurrently if they have disjoint sets
 		// of dependencies.
-		Depends() []string
+		// The first list are locked exclusively.
+		// The second allows multiple read users but only one writing one.
+		Depends() [2][]string
 		// StageDone is called if an import is positively reviewed
 		// (state = accepted). This can be used to finalize the imported
 		// data to move it e.g from the staging area.
@@ -436,9 +438,12 @@
 	q.creatorsMu.Lock()
 nextCreator:
 	for kind, jc := range q.creators {
-		for _, d := range jc.Depends() {
-			if _, found := q.usedDeps[d]; found {
-				continue nextCreator
+		deps := jc.Depends()
+		for l := range deps {
+			for _, d := range deps[l] {
+				if _, found := q.usedDeps[d]; found {
+					continue nextCreator
+				}
 			}
 		}
 		which = append(which, string(kind))
@@ -590,8 +595,11 @@
 
 		// Lock dependencies.
 		q.creatorsMu.Lock()
-		for _, d := range jc.Depends() {
-			q.usedDeps[d] = struct{}{}
+		deps := jc.Depends()
+		for l := range deps {
+			for _, d := range deps[l] {
+				q.usedDeps[d] = struct{}{}
+			}
 		}
 		q.creatorsMu.Unlock()
 
@@ -600,8 +608,11 @@
 			// Unlock the dependencies.
 			defer func() {
 				q.creatorsMu.Lock()
-				for _, d := range jc.Depends() {
-					delete(q.usedDeps, d)
+				deps := jc.Depends()
+				for l := range deps {
+					for _, d := range deps[l] {
+						delete(q.usedDeps, d)
+					}
 				}
 				q.creatorsMu.Unlock()
 				select {
--- a/pkg/imports/sec.go	Thu May 09 12:25:10 2019 +0200
+++ b/pkg/imports/sec.go	Thu May 09 12:49:53 2019 +0200
@@ -46,9 +46,10 @@
 
 func (secJobCreator) Create() Job { return new(Section) }
 
-func (secJobCreator) Depends() []string {
-	return []string{
-		"sections",
+func (secJobCreator) Depends() [2][]string {
+	return [2][]string{
+		{"sections"},
+		{},
 	}
 }
 
--- a/pkg/imports/sr.go	Thu May 09 12:25:10 2019 +0200
+++ b/pkg/imports/sr.go	Thu May 09 12:49:53 2019 +0200
@@ -79,11 +79,10 @@
 
 func (srJobCreator) Create() Job { return new(SoundingResult) }
 
-func (srJobCreator) Depends() []string {
-	return []string{
-		"sounding_results",
-		"sounding_results_contour_lines",
-		"bottlenecks",
+func (srJobCreator) Depends() [2][]string {
+	return [2][]string{
+		{"sounding_results", "sounding_results_contour_lines"},
+		{"bottlenecks"},
 	}
 }
 
--- a/pkg/imports/st.go	Thu May 09 12:25:10 2019 +0200
+++ b/pkg/imports/st.go	Thu May 09 12:49:53 2019 +0200
@@ -48,9 +48,10 @@
 
 func (stJobCreator) Create() Job { return new(Stretch) }
 
-func (stJobCreator) Depends() []string {
-	return []string{
-		"stretches",
+func (stJobCreator) Depends() [2][]string {
+	return [2][]string{
+		{"stretches", "stretch_countries"},
+		{},
 	}
 }
 
--- a/pkg/imports/ubn.go	Thu May 09 12:25:10 2019 +0200
+++ b/pkg/imports/ubn.go	Thu May 09 12:49:53 2019 +0200
@@ -43,7 +43,7 @@
 
 func (ubnJobCreator) Create() Job { return new(UploadedBottleneck) }
 
-func (ubnJobCreator) Depends() []string {
+func (ubnJobCreator) Depends() [2][]string {
 	// Same as normal bottleneck import.
 	return bnJobCreator{}.Depends()
 }
--- a/pkg/imports/ufa.go	Thu May 09 12:25:10 2019 +0200
+++ b/pkg/imports/ufa.go	Thu May 09 12:49:53 2019 +0200
@@ -42,7 +42,7 @@
 
 func (ufaJobCreator) Create() Job { return new(UploadedFairwayAvailability) }
 
-func (ufaJobCreator) Depends() []string {
+func (ufaJobCreator) Depends() [2][]string {
 	// Same as faJobCreator
 	return faJobCreator{}.Depends()
 }
--- a/pkg/imports/ugm.go	Thu May 09 12:25:10 2019 +0200
+++ b/pkg/imports/ugm.go	Thu May 09 12:49:53 2019 +0200
@@ -38,7 +38,7 @@
 
 func (ugmJobCreator) Create() Job { return new(UploadedGaugeMeasurement) }
 
-func (ugmJobCreator) Depends() []string { return gmJobCreator{}.Depends() }
+func (ugmJobCreator) Depends() [2][]string { return gmJobCreator{}.Depends() }
 
 func (ugmJobCreator) AutoAccept() bool { return true }
 
--- a/pkg/imports/wa.go	Thu May 09 12:25:10 2019 +0200
+++ b/pkg/imports/wa.go	Thu May 09 12:49:53 2019 +0200
@@ -59,9 +59,10 @@
 
 func (waJobCreator) Create() Job { return new(WaterwayArea) }
 
-func (waJobCreator) Depends() []string {
-	return []string{
-		"waterway_area",
+func (waJobCreator) Depends() [2][]string {
+	return [2][]string{
+		{"waterway_area"},
+		{},
 	}
 }
 
--- a/pkg/imports/wg.go	Thu May 09 12:25:10 2019 +0200
+++ b/pkg/imports/wg.go	Thu May 09 12:49:53 2019 +0200
@@ -51,10 +51,10 @@
 
 func (wgJobCreator) Create() Job { return new(WaterwayGauge) }
 
-func (wgJobCreator) Depends() []string {
-	return []string{
-		"gauges",
-		"gauges_reference_water_levels",
+func (wgJobCreator) Depends() [2][]string {
+	return [2][]string{
+		{"gauges_reference_water_levels", "gauges"},
+		{"depth_references"},
 	}
 }
 
--- a/pkg/imports/wp.go	Thu May 09 12:25:10 2019 +0200
+++ b/pkg/imports/wp.go	Thu May 09 12:49:53 2019 +0200
@@ -72,9 +72,10 @@
 	return "waterway profiles"
 }
 
-func (wpJobCreator) Depends() []string {
-	return []string{
-		"waterway_profiles",
+func (wpJobCreator) Depends() [2][]string {
+	return [2][]string{
+		{"waterway_profiles"},
+		{"distance_marks_virtual"},
 	}
 }
 
--- a/pkg/imports/wx.go	Thu May 09 12:25:10 2019 +0200
+++ b/pkg/imports/wx.go	Thu May 09 12:49:53 2019 +0200
@@ -58,9 +58,10 @@
 
 func (wxJobCreator) Create() Job { return new(WaterwayAxis) }
 
-func (wxJobCreator) Depends() []string {
-	return []string{
-		"waterway_axis",
+func (wxJobCreator) Depends() [2][]string {
+	return [2][]string{
+		{"waterway_axis"},
+		{},
 	}
 }