changeset 1985:8eeb0b5eb340

Imports: Made retries and the waiting between the attempts configurable.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 23 Jan 2019 17:58:57 +0100
parents f9f1babe52ae
children 4949e723bf45
files pkg/common/attributes.go pkg/controllers/agmimports.go pkg/controllers/manualimports.go pkg/controllers/srimports.go pkg/imports/queue.go pkg/imports/scheduled.go schema/gemma.sql
diffstat 7 files changed, 204 insertions(+), 105 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/common/attributes.go	Wed Jan 23 16:25:43 2019 +0100
+++ b/pkg/common/attributes.go	Wed Jan 23 17:58:57 2019 +0100
@@ -67,3 +67,16 @@
 	}
 	return i, true
 }
+
+func (ca Attributes) Duration(key string) (time.Duration, bool) {
+	s, found := ca.Get(key)
+	if !found {
+		return 0, false
+	}
+	d, err := time.ParseDuration(s)
+	if err != nil {
+		log.Printf("error: %v\n", err)
+		return 0, false
+	}
+	return d, true
+}
--- a/pkg/controllers/agmimports.go	Wed Jan 23 16:25:43 2019 +0100
+++ b/pkg/controllers/agmimports.go	Wed Jan 23 17:58:57 2019 +0100
@@ -102,18 +102,29 @@
 		}
 	}
 
-	retries := -1
+	var retries *int
 	if r := req.FormValue("retries"); r != "" {
-		var err error
-		if retries, err = strconv.Atoi(r); err != nil {
+		if v, err := strconv.Atoi(r); err != nil {
 			log.Printf("error: %v\n", err)
-			retries = -1
+		} else {
+			retries = &v
+		}
+	}
+
+	var waitDuration *time.Duration
+	if wd := req.FormValue("wait-duration"); wd != "" {
+		if v, err := time.ParseDuration(wd); err != nil {
+			log.Printf("error: %v\n", err)
+		} else {
+			waitDuration = &v
 		}
 	}
 
 	jobID, err := imports.AddJob(
 		imports.AGMJobKind,
-		due, retries,
+		due,
+		retries,
+		waitDuration,
 		session.User,
 		sendEmail,
 		serialized)
--- a/pkg/controllers/manualimports.go	Wed Jan 23 16:25:43 2019 +0100
+++ b/pkg/controllers/manualimports.go	Wed Jan 23 17:58:57 2019 +0100
@@ -26,68 +26,74 @@
 	"gemma.intevation.de/gemma/pkg/models"
 )
 
-func retry(a common.Attributes) (time.Time, int) {
+func retry(a common.Attributes) (time.Time, *int, *time.Duration) {
 	due, _ := a.Time("due")
-	retries, ok := a.Int("retries")
-	if !ok {
-		retries = -1
+	ret, ok := a.Int("retries")
+	var retries *int
+	if ok {
+		retries = &ret
 	}
-	return due, retries
+	dur, ok := a.Duration("wait-retry")
+	var duration *time.Duration
+	if ok {
+		duration = &dur
+	}
+	return due, retries, duration
 }
 
-func importBottleneck(input interface{}) (interface{}, time.Time, int, bool) {
+func importBottleneck(input interface{}) (interface{}, time.Time, *int, *time.Duration, bool) {
 	bi := input.(*models.BottleneckImport)
 	bn := &imports.Bottleneck{
 		URL:      bi.URL,
 		Insecure: bi.Insecure,
 	}
-	due, retries := retry(bi.Attributes)
-	return bn, due, retries, bi.SendEmail
+	due, retries, duration := retry(bi.Attributes)
+	return bn, due, retries, duration, bi.SendEmail
 }
 
-func importGaugeMeasurement(input interface{}) (interface{}, time.Time, int, bool) {
+func importGaugeMeasurement(input interface{}) (interface{}, time.Time, *int, *time.Duration, bool) {
 	gi := input.(*models.GaugeMeasurementImport)
 	gm := &imports.GaugeMeasurement{
 		URL:      gi.URL,
 		Insecure: gi.Insecure,
 	}
-	due, retries := retry(gi.Attributes)
-	return gm, due, retries, gi.SendEmail
+	due, retries, duration := retry(gi.Attributes)
+	return gm, due, retries, duration, gi.SendEmail
 }
 
-func importFairwayAvailability(input interface{}) (interface{}, time.Time, int, bool) {
+func importFairwayAvailability(input interface{}) (interface{}, time.Time, *int, *time.Duration, bool) {
 	fai := input.(*models.FairwayAvailabilityImport)
 	fa := &imports.FairwayAvailability{
 		URL:      fai.URL,
 		Insecure: fai.Insecure,
 	}
-	due, retries := retry(fai.Attributes)
-	return fa, due, retries, fai.SendEmail
+	due, retries, duration := retry(fai.Attributes)
+	return fa, due, retries, duration, fai.SendEmail
 }
 
-func importWaterwayAxis(input interface{}) (interface{}, time.Time, int, bool) {
+func importWaterwayAxis(input interface{}) (interface{}, time.Time, *int, *time.Duration, bool) {
 	wxi := input.(*models.WaterwayAxisImport)
 	wx := &imports.WaterwayAxis{
 		URL:         wxi.URL,
 		FeatureType: wxi.FeatureType,
 		SortBy:      wxi.SortBy,
 	}
-	due, retries := retry(wxi.Attributes)
-	return wx, due, retries, wxi.SendEmail
+	due, retries, duration := retry(wxi.Attributes)
+	return wx, due, retries, duration, wxi.SendEmail
 }
 
-func importWaterwayArea(input interface{}) (interface{}, time.Time, int, bool) {
+func importWaterwayArea(input interface{}) (interface{}, time.Time, *int, *time.Duration, bool) {
 	wai := input.(*models.WaterwayAreaImport)
 	wa := &imports.WaterwayArea{
 		URL:         wai.URL,
 		FeatureType: wai.FeatureType,
 		SortBy:      wai.SortBy,
 	}
-	due, retries := retry(wai.Attributes)
-	return wa, due, retries, wai.SendEmail
+	due, retries, duration := retry(wai.Attributes)
+	return wa, due, retries, duration, wai.SendEmail
 }
 
-func importWaterwayGauge(input interface{}) (interface{}, time.Time, int, bool) {
+func importWaterwayGauge(input interface{}) (interface{}, time.Time, *int, *time.Duration, bool) {
 	wgi := input.(*models.WaterwayGaugeImport)
 	username, _ := wgi.Attributes.Get("username")
 	password, _ := wgi.Attributes.Get("password")
@@ -98,11 +104,11 @@
 		Password: password,
 		Insecure: insecure,
 	}
-	due, retries := retry(wgi.Attributes)
-	return wg, due, retries, wgi.SendEmail
+	due, retries, duration := retry(wgi.Attributes)
+	return wg, due, retries, duration, wgi.SendEmail
 }
 
-func importDistancemarksVirtual(input interface{}) (interface{}, time.Time, int, bool) {
+func importDistancemarksVirtual(input interface{}) (interface{}, time.Time, *int, *time.Duration, bool) {
 	dmvi := input.(*models.DistanceMarksVirtualImport)
 	username, _ := dmvi.Attributes.Get("username")
 	password, _ := dmvi.Attributes.Get("password")
@@ -113,11 +119,11 @@
 		Password: password,
 		Insecure: insecure,
 	}
-	due, retries := retry(dmvi.Attributes)
-	return wg, due, retries, dmvi.SendEmail
+	due, retries, duration := retry(dmvi.Attributes)
+	return wg, due, retries, duration, dmvi.SendEmail
 }
 
-func importFairwayDimension(input interface{}) (interface{}, time.Time, int, bool) {
+func importFairwayDimension(input interface{}) (interface{}, time.Time, *int, *time.Duration, bool) {
 	fdi := input.(*models.FairwayDimensionImport)
 	fd := &imports.FairwayDimension{
 		URL:                fdi.URL,
@@ -129,22 +135,22 @@
 		Depth:              fdi.Depth,
 		SourceOrganization: fdi.SourceOrganization,
 	}
-	due, retries := retry(fdi.Attributes)
-	return fd, due, retries, fdi.SendEmail
+	due, retries, duration := retry(fdi.Attributes)
+	return fd, due, retries, duration, fdi.SendEmail
 }
 
-func importDistanceMarksAshore(input interface{}) (interface{}, time.Time, int, bool) {
+func importDistanceMarksAshore(input interface{}) (interface{}, time.Time, *int, *time.Duration, bool) {
 	dmai := input.(*models.DistanceMarksAshoreImport)
 	dma := &imports.DistanceMarksAshore{
 		URL:         dmai.URL,
 		FeatureType: dmai.FeatureType,
 		SortBy:      dmai.SortBy,
 	}
-	due, retries := retry(dmai.Attributes)
-	return dma, due, retries, dmai.SendEmail
+	due, retries, duration := retry(dmai.Attributes)
+	return dma, due, retries, duration, dmai.SendEmail
 }
 
-func importStretch(input interface{}) (interface{}, time.Time, int, bool) {
+func importStretch(input interface{}) (interface{}, time.Time, *int, *time.Duration, bool) {
 	sti := input.(*models.StretchImport)
 	st := &imports.Stretch{
 		Name:      sti.Name,
@@ -156,19 +162,19 @@
 		Date:      sti.Date,
 		Countries: sti.Countries,
 	}
-	due, retries := retry(sti.Attributes)
-	return st, due, retries, sti.SendEmail
+	due, retries, duration := retry(sti.Attributes)
+	return st, due, retries, duration, sti.SendEmail
 }
 
 func manualImport(
 	kind imports.JobKind,
-	setup func(interface{}) (interface{}, time.Time, int, bool),
+	setup func(interface{}) (interface{}, time.Time, *int, *time.Duration, bool),
 ) func(interface{}, *http.Request, *sql.Conn) (JSONResult, error) {
 
 	return func(input interface{}, req *http.Request, _ *sql.Conn) (
 		jr JSONResult, err error) {
 
-		what, due, retries, sendEmail := setup(input)
+		what, due, retries, waitDuration, sendEmail := setup(input)
 
 		var serialized string
 		if serialized, err = common.ToJSONString(what); err != nil {
@@ -180,7 +186,9 @@
 		var jobID int64
 		if jobID, err = imports.AddJob(
 			kind,
-			due, retries,
+			due,
+			retries,
+			waitDuration,
 			session.User,
 			sendEmail,
 			serialized,
--- a/pkg/controllers/srimports.go	Wed Jan 23 16:25:43 2019 +0100
+++ b/pkg/controllers/srimports.go	Wed Jan 23 17:58:57 2019 +0100
@@ -171,18 +171,29 @@
 		}
 	}
 
-	retries := -1
+	var retries *int
 	if r := req.FormValue("retries"); r != "" {
-		var err error
-		if retries, err = strconv.Atoi(r); err != nil {
+		if v, err := strconv.Atoi(r); err != nil {
 			log.Printf("error: %v\n", err)
-			retries = -1
+		} else {
+			retries = &v
+		}
+	}
+
+	var waitDuration *time.Duration
+	if wd := req.FormValue("wait-duration"); wd != "" {
+		if v, err := time.ParseDuration(wd); err != nil {
+			log.Printf("error: %v\n", err)
+		} else {
+			waitDuration = &v
 		}
 	}
 
 	jobID, err := imports.AddJob(
 		imports.SRJobKind,
-		due, retries,
+		due,
+		retries,
+		waitDuration,
 		session.User,
 		sendEmail,
 		serialized)
--- a/pkg/imports/queue.go	Wed Jan 23 16:25:43 2019 +0100
+++ b/pkg/imports/queue.go	Wed Jan 23 17:58:57 2019 +0100
@@ -42,15 +42,6 @@
 		Error(fmt string, args ...interface{})
 	}
 
-	// RetryError is an error type to signal that
-	// the import should be tried again.
-	RetryError struct {
-		// Message is the error message.
-		Message string
-		// When is the new scheduled execution time.
-		When time.Time
-	}
-
 	// UnchangedError may be issued by Do of a Job to indicate
 	// That the database has not changed.
 	UnchangedError string
@@ -104,6 +95,7 @@
 		id        int64
 		kind      JobKind
 		user      string
+		waitRetry pgtype.Interval
 		trysLeft  sql.NullInt64
 		sendEmail bool
 		data      string
@@ -150,6 +142,7 @@
   kind,
   due,
   trys_left,
+  retry_wait,
   username,
   send_email,
   data
@@ -159,7 +152,8 @@
   $3,
   $4,
   $5,
-  $6
+  $6,
+  $7
 ) RETURNING id`
 
 	selectJobSQL = `
@@ -167,6 +161,7 @@
   id,
   kind,
   trys_left,
+  retry_wait,
   username,
   send_email,
   data
@@ -206,11 +201,6 @@
 	go iqueue.importLoop()
 }
 
-// Error makes RetryError an error.
-func (re *RetryError) Error() string {
-	return re.Message
-}
-
 // Error makes UnchangedError an error.
 func (ue UnchangedError) Error() string {
 	return string(ue)
@@ -266,11 +256,55 @@
 	return names
 }
 
-func (idj *idJob) trys() int {
+func (idj *idJob) nextRetry(feedback Feedback) bool {
+	switch {
+	case idj.waitRetry.Status != pgtype.Present && !idj.trysLeft.Valid:
+		return false
+	case idj.waitRetry.Status == pgtype.Present && !idj.trysLeft.Valid:
+		return true
+	case idj.trysLeft.Valid:
+		if idj.trysLeft.Int64 < 1 {
+			feedback.Warn("import should be retried, but no retrys left")
+		} else {
+			idj.trysLeft.Int64--
+			feedback.Info("import failed but will be retried")
+			return true
+		}
+	}
+	return false
+}
+
+func (idj *idJob) nextDue() time.Time {
+	now := time.Now()
+	if idj.waitRetry.Status == pgtype.Present {
+		var d time.Duration
+		if err := idj.waitRetry.AssignTo(&d); err != nil {
+			log.Printf("error: converting waitRetry failed: %v\n", err)
+		} else {
+			now = now.Add(d)
+		}
+	}
+	return now
+}
+
+func (idj *idJob) trysLeftPointer() *int {
 	if !idj.trysLeft.Valid {
-		return -1
+		return nil
 	}
-	return int(idj.trysLeft.Int64)
+	t := int(idj.trysLeft.Int64)
+	return &t
+}
+
+func (idj *idJob) waitRetryPointer() *time.Duration {
+	if idj.waitRetry.Status != pgtype.Present {
+		return nil
+	}
+	d := new(time.Duration)
+	if err := idj.waitRetry.AssignTo(d); err != nil {
+		log.Printf("error: converting waitRetry failed: %v\n", err)
+		return nil
+	}
+	return d
 }
 
 func (q *importQueue) jobCreator(kind JobKind) JobCreator {
@@ -282,20 +316,33 @@
 func (q *importQueue) addJob(
 	kind JobKind,
 	due time.Time,
-	trysLeft int,
+	trysLeft *int,
+	waitRetry *time.Duration,
 	user string,
 	sendEmail bool,
 	data string,
 ) (int64, error) {
-	ctx := context.Background()
+
 	var id int64
 	if due.IsZero() {
 		due = time.Now()
 	}
+
 	var tl sql.NullInt64
-	if trysLeft >= 0 {
-		tl = sql.NullInt64{Int64: int64(trysLeft), Valid: true}
+	if trysLeft != nil {
+		tl = sql.NullInt64{Int64: int64(*trysLeft), Valid: true}
 	}
+
+	var wr pgtype.Interval
+	if waitRetry != nil {
+		if err := wr.Set(*waitRetry); err != nil {
+			return 0, err
+		}
+	} else {
+		wr = pgtype.Interval{Status: pgtype.Null}
+	}
+
+	ctx := context.Background()
 	err := auth.RunAs(ctx, user, func(conn *sql.Conn) error {
 		return conn.QueryRowContext(
 			ctx,
@@ -303,6 +350,7 @@
 			string(kind),
 			due,
 			tl,
+			&wr,
 			user,
 			sendEmail,
 			data).Scan(&id)
@@ -323,12 +371,20 @@
 func AddJob(
 	kind JobKind,
 	due time.Time,
-	trysLeft int,
+	trysLeft *int,
+	waitRetry *time.Duration,
 	user string,
 	sendEmail bool,
 	data string,
 ) (int64, error) {
-	return iqueue.addJob(kind, due, trysLeft, user, sendEmail, data)
+	return iqueue.addJob(
+		kind,
+		due,
+		trysLeft,
+		waitRetry,
+		user,
+		sendEmail,
+		data)
 }
 
 type logFeedback int64
@@ -413,6 +469,7 @@
 			&ji.id,
 			&ji.kind,
 			&ji.trysLeft,
+			&ji.waitRetry,
 			&ji.user,
 			&ji.sendEmail,
 			&ji.data,
@@ -555,32 +612,17 @@
 					})
 			})()
 
-			var retry *RetryError
-			var unchanged bool
-
-			switch v := errDo.(type) {
-			case *RetryError:
-				// NULL -> limit less
-				if idj.trysLeft.Valid && idj.trysLeft.Int64 <= 1 {
-					feedback.Warn("import should be retried, but no retrys left")
-				} else {
-					if idj.trysLeft.Valid {
-						idj.trysLeft.Int64--
-					}
-					feedback.Info("import failed but will be retried")
-					retry = v
-				}
-			case UnchangedError:
+			var unchanged, retry bool
+			if v, ok := errDo.(UnchangedError); ok {
 				feedback.Info("unchanged: %s", v.Error())
 				unchanged = true
-			default:
-				if errDo != nil {
-					feedback.Error("error in import: %v", errDo)
-				}
+			} else if errDo != nil {
+				feedback.Error("error in import: %v", errDo)
+				retry = idj.nextRetry(feedback)
 			}
 
 			var errCleanup error
-			if retry == nil { // cleanup debris
+			if retry { // cleanup debris
 				if errCleanup = survive(job.CleanUp)(); errCleanup != nil {
 					feedback.Error("error cleanup: %v", errCleanup)
 				}
@@ -605,10 +647,12 @@
 				go sendNotificationMail(idj.user, jc.Description(), state, idj.id)
 			}
 
-			if retry != nil {
+			if retry {
 				nid, err := q.addJob(
 					idj.kind,
-					retry.When, idj.trys(),
+					idj.nextDue(),
+					idj.trysLeftPointer(),
+					idj.waitRetryPointer(),
 					idj.user, idj.sendEmail,
 					idj.data)
 				if err != nil {
--- a/pkg/imports/scheduled.go	Wed Jan 23 16:25:43 2019 +0100
+++ b/pkg/imports/scheduled.go	Wed Jan 23 17:58:57 2019 +0100
@@ -19,6 +19,7 @@
 	"errors"
 	"fmt"
 	"log"
+	"time"
 
 	"gemma.intevation.de/gemma/pkg/common"
 	"gemma.intevation.de/gemma/pkg/scheduler"
@@ -232,15 +233,24 @@
 
 	due, _ := cfg.Attributes.Time("due")
 
-	retries, found := cfg.Attributes.Int("retries")
-	if !found {
-		retries = -1
+	ret, found := cfg.Attributes.Int("retries")
+	var retries *int
+	if found {
+		retries = &ret
+	}
+
+	dur, found := cfg.Attributes.Duration("wait-retry")
+	var waitRetry *time.Duration
+	if found {
+		waitRetry = &dur
 	}
 
 	var jobID int64
 	if jobID, err = AddJob(
 		kind,
-		due, retries,
+		due,
+		retries,
+		waitRetry,
 		cfg.User,
 		cfg.SendEMail,
 		serialized,
--- a/schema/gemma.sql	Wed Jan 23 16:25:43 2019 +0100
+++ b/schema/gemma.sql	Wed Jan 23 17:58:57 2019 +0100
@@ -633,12 +633,14 @@
 );
 
 CREATE TABLE waterway.imports (
-    id        int PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
-    state     waterway.import_state NOT NULL DEFAULT 'queued',
-    kind      varchar   NOT NULL,
-    enqueued  timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
-    due       timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
-    trys_left int,
+    id         int PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
+    state      waterway.import_state NOT NULL DEFAULT 'queued',
+    kind       varchar   NOT NULL,
+    enqueued   timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+    due        timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+    retry_wait interval
+        CHECK(retry_wait IS NULL OR retry_wait >= interval '0 microseconds'),
+    trys_left  int, -- if NULL and retry_wait NOT NUL, endless
     username  varchar   NOT NULL
         REFERENCES internal.user_profiles(username)
             ON DELETE CASCADE