Mercurial > gemma
changeset 1985:8eeb0b5eb340
Imports: Made retries and the waiting between the attempts configurable.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 23 Jan 2019 17:58:57 +0100 |
parents | f9f1babe52ae |
children | 4949e723bf45 |
files | pkg/common/attributes.go pkg/controllers/agmimports.go pkg/controllers/manualimports.go pkg/controllers/srimports.go pkg/imports/queue.go pkg/imports/scheduled.go schema/gemma.sql |
diffstat | 7 files changed, 204 insertions(+), 105 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/common/attributes.go Wed Jan 23 16:25:43 2019 +0100 +++ b/pkg/common/attributes.go Wed Jan 23 17:58:57 2019 +0100 @@ -67,3 +67,16 @@ } return i, true } + +func (ca Attributes) Duration(key string) (time.Duration, bool) { + s, found := ca.Get(key) + if !found { + return 0, false + } + d, err := time.ParseDuration(s) + if err != nil { + log.Printf("error: %v\n", err) + return 0, false + } + return d, true +}
--- a/pkg/controllers/agmimports.go Wed Jan 23 16:25:43 2019 +0100 +++ b/pkg/controllers/agmimports.go Wed Jan 23 17:58:57 2019 +0100 @@ -102,18 +102,29 @@ } } - retries := -1 + var retries *int if r := req.FormValue("retries"); r != "" { - var err error - if retries, err = strconv.Atoi(r); err != nil { + if v, err := strconv.Atoi(r); err != nil { log.Printf("error: %v\n", err) - retries = -1 + } else { + retries = &v + } + } + + var waitDuration *time.Duration + if wd := req.FormValue("wait-duration"); wd != "" { + if v, err := time.ParseDuration(wd); err != nil { + log.Printf("error: %v\n", err) + } else { + waitDuration = &v } } jobID, err := imports.AddJob( imports.AGMJobKind, - due, retries, + due, + retries, + waitDuration, session.User, sendEmail, serialized)
--- a/pkg/controllers/manualimports.go Wed Jan 23 16:25:43 2019 +0100 +++ b/pkg/controllers/manualimports.go Wed Jan 23 17:58:57 2019 +0100 @@ -26,68 +26,74 @@ "gemma.intevation.de/gemma/pkg/models" ) -func retry(a common.Attributes) (time.Time, int) { +func retry(a common.Attributes) (time.Time, *int, *time.Duration) { due, _ := a.Time("due") - retries, ok := a.Int("retries") - if !ok { - retries = -1 + ret, ok := a.Int("retries") + var retries *int + if ok { + retries = &ret } - return due, retries + dur, ok := a.Duration("wait-retry") + var duration *time.Duration + if ok { + duration = &dur + } + return due, retries, duration } -func importBottleneck(input interface{}) (interface{}, time.Time, int, bool) { +func importBottleneck(input interface{}) (interface{}, time.Time, *int, *time.Duration, bool) { bi := input.(*models.BottleneckImport) bn := &imports.Bottleneck{ URL: bi.URL, Insecure: bi.Insecure, } - due, retries := retry(bi.Attributes) - return bn, due, retries, bi.SendEmail + due, retries, duration := retry(bi.Attributes) + return bn, due, retries, duration, bi.SendEmail } -func importGaugeMeasurement(input interface{}) (interface{}, time.Time, int, bool) { +func importGaugeMeasurement(input interface{}) (interface{}, time.Time, *int, *time.Duration, bool) { gi := input.(*models.GaugeMeasurementImport) gm := &imports.GaugeMeasurement{ URL: gi.URL, Insecure: gi.Insecure, } - due, retries := retry(gi.Attributes) - return gm, due, retries, gi.SendEmail + due, retries, duration := retry(gi.Attributes) + return gm, due, retries, duration, gi.SendEmail } -func importFairwayAvailability(input interface{}) (interface{}, time.Time, int, bool) { +func importFairwayAvailability(input interface{}) (interface{}, time.Time, *int, *time.Duration, bool) { fai := input.(*models.FairwayAvailabilityImport) fa := &imports.FairwayAvailability{ URL: fai.URL, Insecure: fai.Insecure, } - due, retries := retry(fai.Attributes) - return fa, due, retries, fai.SendEmail + due, retries, duration := retry(fai.Attributes) + return fa, due, retries, duration, fai.SendEmail } -func importWaterwayAxis(input interface{}) (interface{}, time.Time, int, bool) { +func importWaterwayAxis(input interface{}) (interface{}, time.Time, *int, *time.Duration, bool) { wxi := input.(*models.WaterwayAxisImport) wx := &imports.WaterwayAxis{ URL: wxi.URL, FeatureType: wxi.FeatureType, SortBy: wxi.SortBy, } - due, retries := retry(wxi.Attributes) - return wx, due, retries, wxi.SendEmail + due, retries, duration := retry(wxi.Attributes) + return wx, due, retries, duration, wxi.SendEmail } -func importWaterwayArea(input interface{}) (interface{}, time.Time, int, bool) { +func importWaterwayArea(input interface{}) (interface{}, time.Time, *int, *time.Duration, bool) { wai := input.(*models.WaterwayAreaImport) wa := &imports.WaterwayArea{ URL: wai.URL, FeatureType: wai.FeatureType, SortBy: wai.SortBy, } - due, retries := retry(wai.Attributes) - return wa, due, retries, wai.SendEmail + due, retries, duration := retry(wai.Attributes) + return wa, due, retries, duration, wai.SendEmail } -func importWaterwayGauge(input interface{}) (interface{}, time.Time, int, bool) { +func importWaterwayGauge(input interface{}) (interface{}, time.Time, *int, *time.Duration, bool) { wgi := input.(*models.WaterwayGaugeImport) username, _ := wgi.Attributes.Get("username") password, _ := wgi.Attributes.Get("password") @@ -98,11 +104,11 @@ Password: password, Insecure: insecure, } - due, retries := retry(wgi.Attributes) - return wg, due, retries, wgi.SendEmail + due, retries, duration := retry(wgi.Attributes) + return wg, due, retries, duration, wgi.SendEmail } -func importDistancemarksVirtual(input interface{}) (interface{}, time.Time, int, bool) { +func importDistancemarksVirtual(input interface{}) (interface{}, time.Time, *int, *time.Duration, bool) { dmvi := input.(*models.DistanceMarksVirtualImport) username, _ := dmvi.Attributes.Get("username") password, _ := dmvi.Attributes.Get("password") @@ -113,11 +119,11 @@ Password: password, Insecure: insecure, } - due, retries := retry(dmvi.Attributes) - return wg, due, retries, dmvi.SendEmail + due, retries, duration := retry(dmvi.Attributes) + return wg, due, retries, duration, dmvi.SendEmail } -func importFairwayDimension(input interface{}) (interface{}, time.Time, int, bool) { +func importFairwayDimension(input interface{}) (interface{}, time.Time, *int, *time.Duration, bool) { fdi := input.(*models.FairwayDimensionImport) fd := &imports.FairwayDimension{ URL: fdi.URL, @@ -129,22 +135,22 @@ Depth: fdi.Depth, SourceOrganization: fdi.SourceOrganization, } - due, retries := retry(fdi.Attributes) - return fd, due, retries, fdi.SendEmail + due, retries, duration := retry(fdi.Attributes) + return fd, due, retries, duration, fdi.SendEmail } -func importDistanceMarksAshore(input interface{}) (interface{}, time.Time, int, bool) { +func importDistanceMarksAshore(input interface{}) (interface{}, time.Time, *int, *time.Duration, bool) { dmai := input.(*models.DistanceMarksAshoreImport) dma := &imports.DistanceMarksAshore{ URL: dmai.URL, FeatureType: dmai.FeatureType, SortBy: dmai.SortBy, } - due, retries := retry(dmai.Attributes) - return dma, due, retries, dmai.SendEmail + due, retries, duration := retry(dmai.Attributes) + return dma, due, retries, duration, dmai.SendEmail } -func importStretch(input interface{}) (interface{}, time.Time, int, bool) { +func importStretch(input interface{}) (interface{}, time.Time, *int, *time.Duration, bool) { sti := input.(*models.StretchImport) st := &imports.Stretch{ Name: sti.Name, @@ -156,19 +162,19 @@ Date: sti.Date, Countries: sti.Countries, } - due, retries := retry(sti.Attributes) - return st, due, retries, sti.SendEmail + due, retries, duration := retry(sti.Attributes) + return st, due, retries, duration, sti.SendEmail } func manualImport( kind imports.JobKind, - setup func(interface{}) (interface{}, time.Time, int, bool), + setup func(interface{}) (interface{}, time.Time, *int, *time.Duration, bool), ) func(interface{}, *http.Request, *sql.Conn) (JSONResult, error) { return func(input interface{}, req *http.Request, _ *sql.Conn) ( jr JSONResult, err error) { - what, due, retries, sendEmail := setup(input) + what, due, retries, waitDuration, sendEmail := setup(input) var serialized string if serialized, err = common.ToJSONString(what); err != nil { @@ -180,7 +186,9 @@ var jobID int64 if jobID, err = imports.AddJob( kind, - due, retries, + due, + retries, + waitDuration, session.User, sendEmail, serialized,
--- a/pkg/controllers/srimports.go Wed Jan 23 16:25:43 2019 +0100 +++ b/pkg/controllers/srimports.go Wed Jan 23 17:58:57 2019 +0100 @@ -171,18 +171,29 @@ } } - retries := -1 + var retries *int if r := req.FormValue("retries"); r != "" { - var err error - if retries, err = strconv.Atoi(r); err != nil { + if v, err := strconv.Atoi(r); err != nil { log.Printf("error: %v\n", err) - retries = -1 + } else { + retries = &v + } + } + + var waitDuration *time.Duration + if wd := req.FormValue("wait-duration"); wd != "" { + if v, err := time.ParseDuration(wd); err != nil { + log.Printf("error: %v\n", err) + } else { + waitDuration = &v } } jobID, err := imports.AddJob( imports.SRJobKind, - due, retries, + due, + retries, + waitDuration, session.User, sendEmail, serialized)
--- a/pkg/imports/queue.go Wed Jan 23 16:25:43 2019 +0100 +++ b/pkg/imports/queue.go Wed Jan 23 17:58:57 2019 +0100 @@ -42,15 +42,6 @@ Error(fmt string, args ...interface{}) } - // RetryError is an error type to signal that - // the import should be tried again. - RetryError struct { - // Message is the error message. - Message string - // When is the new scheduled execution time. - When time.Time - } - // UnchangedError may be issued by Do of a Job to indicate // That the database has not changed. UnchangedError string @@ -104,6 +95,7 @@ id int64 kind JobKind user string + waitRetry pgtype.Interval trysLeft sql.NullInt64 sendEmail bool data string @@ -150,6 +142,7 @@ kind, due, trys_left, + retry_wait, username, send_email, data @@ -159,7 +152,8 @@ $3, $4, $5, - $6 + $6, + $7 ) RETURNING id` selectJobSQL = ` @@ -167,6 +161,7 @@ id, kind, trys_left, + retry_wait, username, send_email, data @@ -206,11 +201,6 @@ go iqueue.importLoop() } -// Error makes RetryError an error. -func (re *RetryError) Error() string { - return re.Message -} - // Error makes UnchangedError an error. func (ue UnchangedError) Error() string { return string(ue) @@ -266,11 +256,55 @@ return names } -func (idj *idJob) trys() int { +func (idj *idJob) nextRetry(feedback Feedback) bool { + switch { + case idj.waitRetry.Status != pgtype.Present && !idj.trysLeft.Valid: + return false + case idj.waitRetry.Status == pgtype.Present && !idj.trysLeft.Valid: + return true + case idj.trysLeft.Valid: + if idj.trysLeft.Int64 < 1 { + feedback.Warn("import should be retried, but no retrys left") + } else { + idj.trysLeft.Int64-- + feedback.Info("import failed but will be retried") + return true + } + } + return false +} + +func (idj *idJob) nextDue() time.Time { + now := time.Now() + if idj.waitRetry.Status == pgtype.Present { + var d time.Duration + if err := idj.waitRetry.AssignTo(&d); err != nil { + log.Printf("error: converting waitRetry failed: %v\n", err) + } else { + now = now.Add(d) + } + } + return now +} + +func (idj *idJob) trysLeftPointer() *int { if !idj.trysLeft.Valid { - return -1 + return nil } - return int(idj.trysLeft.Int64) + t := int(idj.trysLeft.Int64) + return &t +} + +func (idj *idJob) waitRetryPointer() *time.Duration { + if idj.waitRetry.Status != pgtype.Present { + return nil + } + d := new(time.Duration) + if err := idj.waitRetry.AssignTo(d); err != nil { + log.Printf("error: converting waitRetry failed: %v\n", err) + return nil + } + return d } func (q *importQueue) jobCreator(kind JobKind) JobCreator { @@ -282,20 +316,33 @@ func (q *importQueue) addJob( kind JobKind, due time.Time, - trysLeft int, + trysLeft *int, + waitRetry *time.Duration, user string, sendEmail bool, data string, ) (int64, error) { - ctx := context.Background() + var id int64 if due.IsZero() { due = time.Now() } + var tl sql.NullInt64 - if trysLeft >= 0 { - tl = sql.NullInt64{Int64: int64(trysLeft), Valid: true} + if trysLeft != nil { + tl = sql.NullInt64{Int64: int64(*trysLeft), Valid: true} } + + var wr pgtype.Interval + if waitRetry != nil { + if err := wr.Set(*waitRetry); err != nil { + return 0, err + } + } else { + wr = pgtype.Interval{Status: pgtype.Null} + } + + ctx := context.Background() err := auth.RunAs(ctx, user, func(conn *sql.Conn) error { return conn.QueryRowContext( ctx, @@ -303,6 +350,7 @@ string(kind), due, tl, + &wr, user, sendEmail, data).Scan(&id) @@ -323,12 +371,20 @@ func AddJob( kind JobKind, due time.Time, - trysLeft int, + trysLeft *int, + waitRetry *time.Duration, user string, sendEmail bool, data string, ) (int64, error) { - return iqueue.addJob(kind, due, trysLeft, user, sendEmail, data) + return iqueue.addJob( + kind, + due, + trysLeft, + waitRetry, + user, + sendEmail, + data) } type logFeedback int64 @@ -413,6 +469,7 @@ &ji.id, &ji.kind, &ji.trysLeft, + &ji.waitRetry, &ji.user, &ji.sendEmail, &ji.data, @@ -555,32 +612,17 @@ }) })() - var retry *RetryError - var unchanged bool - - switch v := errDo.(type) { - case *RetryError: - // NULL -> limit less - if idj.trysLeft.Valid && idj.trysLeft.Int64 <= 1 { - feedback.Warn("import should be retried, but no retrys left") - } else { - if idj.trysLeft.Valid { - idj.trysLeft.Int64-- - } - feedback.Info("import failed but will be retried") - retry = v - } - case UnchangedError: + var unchanged, retry bool + if v, ok := errDo.(UnchangedError); ok { feedback.Info("unchanged: %s", v.Error()) unchanged = true - default: - if errDo != nil { - feedback.Error("error in import: %v", errDo) - } + } else if errDo != nil { + feedback.Error("error in import: %v", errDo) + retry = idj.nextRetry(feedback) } var errCleanup error - if retry == nil { // cleanup debris + if retry { // cleanup debris if errCleanup = survive(job.CleanUp)(); errCleanup != nil { feedback.Error("error cleanup: %v", errCleanup) } @@ -605,10 +647,12 @@ go sendNotificationMail(idj.user, jc.Description(), state, idj.id) } - if retry != nil { + if retry { nid, err := q.addJob( idj.kind, - retry.When, idj.trys(), + idj.nextDue(), + idj.trysLeftPointer(), + idj.waitRetryPointer(), idj.user, idj.sendEmail, idj.data) if err != nil {
--- a/pkg/imports/scheduled.go Wed Jan 23 16:25:43 2019 +0100 +++ b/pkg/imports/scheduled.go Wed Jan 23 17:58:57 2019 +0100 @@ -19,6 +19,7 @@ "errors" "fmt" "log" + "time" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/scheduler" @@ -232,15 +233,24 @@ due, _ := cfg.Attributes.Time("due") - retries, found := cfg.Attributes.Int("retries") - if !found { - retries = -1 + ret, found := cfg.Attributes.Int("retries") + var retries *int + if found { + retries = &ret + } + + dur, found := cfg.Attributes.Duration("wait-retry") + var waitRetry *time.Duration + if found { + waitRetry = &dur } var jobID int64 if jobID, err = AddJob( kind, - due, retries, + due, + retries, + waitRetry, cfg.User, cfg.SendEMail, serialized,
--- a/schema/gemma.sql Wed Jan 23 16:25:43 2019 +0100 +++ b/schema/gemma.sql Wed Jan 23 17:58:57 2019 +0100 @@ -633,12 +633,14 @@ ); CREATE TABLE waterway.imports ( - id int PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, - state waterway.import_state NOT NULL DEFAULT 'queued', - kind varchar NOT NULL, - enqueued timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, - due timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, - trys_left int, + id int PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + state waterway.import_state NOT NULL DEFAULT 'queued', + kind varchar NOT NULL, + enqueued timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + due timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + retry_wait interval + CHECK(retry_wait IS NULL OR retry_wait >= interval '0 microseconds'), + trys_left int, -- if NULL and retry_wait NOT NUL, endless username varchar NOT NULL REFERENCES internal.user_profiles(username) ON DELETE CASCADE