# HG changeset patch # User Sascha L. Teichmann # Date 1546950944 -3600 # Node ID 49e047c2106e79ab92927d1f1f9e336c2aa7cff6 # Parent 74b66527ae818dbf7e067ff6eb94a3ea073167dc Imports: Made imports re-runnable if they fail. diff -r 74b66527ae81 -r 49e047c2106e pkg/common/attributes.go --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pkg/common/attributes.go Tue Jan 08 13:35:44 2019 +0100 @@ -0,0 +1,69 @@ +// This is Free Software under GNU Affero General Public License v >= 3.0 +// without warranty, see README.md and license for details. +// +// SPDX-License-Identifier: AGPL-3.0-or-later +// License-Filename: LICENSES/AGPL-3.0.txt +// +// Copyright (C) 2018 by via donau +// – Österreichische Wasserstraßen-Gesellschaft mbH +// Software engineering by Intevation GmbH +// +// Author(s): +// * Sascha L. Teichmann + +package common + +import ( + "log" + "strconv" + "strings" + "time" +) + +// Attributes is a map of optional key/value attributes +// of a configuration. +type Attributes map[string]string + +// Get fetches a value for given key out of the configuration. +// If the key was not found the bool component of the return value +// return false. +func (ca Attributes) Get(key string) (string, bool) { + if ca == nil { + return "", false + } + value, found := ca[key] + return value, found +} + +// Bool returns a bool value for a given key. +func (ca Attributes) Bool(key string) bool { + s, found := ca.Get(key) + return found && strings.ToLower(s) == "true" +} + +// Time gives a time.Time for a given key. +func (ca Attributes) Time(key string) (time.Time, bool) { + s, found := ca.Get(key) + if !found { + return time.Time{}, false + } + t, err := time.Parse("2006-01-02T15:04:05", s) + if err != nil { + log.Printf("error: %v\n", err) + return time.Time{}, false + } + return t, true +} + +func (ca Attributes) Int(key string) (int, bool) { + s, found := ca.Get(key) + if !found { + return 0, false + } + i, err := strconv.Atoi(s) + if err != nil { + log.Printf("error: %v\n", err) + return 0, false + } + return i, true +} diff -r 74b66527ae81 -r 49e047c2106e pkg/controllers/importconfig.go --- a/pkg/controllers/importconfig.go Tue Jan 08 12:34:29 2019 +0100 +++ b/pkg/controllers/importconfig.go Tue Jan 08 13:35:44 2019 +0100 @@ -25,6 +25,7 @@ "github.com/gorilla/mux" "gemma.intevation.de/gemma/pkg/auth" + "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/imports" "gemma.intevation.de/gemma/pkg/scheduler" ) @@ -301,7 +302,7 @@ ctx context.Context, tx *sql.Tx, id int64, - attrs imports.ConfigAttributes, + attrs common.Attributes, ) error { if len(attrs) == 0 { return nil diff -r 74b66527ae81 -r 49e047c2106e pkg/controllers/manualimports.go --- a/pkg/controllers/manualimports.go Tue Jan 08 12:34:29 2019 +0100 +++ b/pkg/controllers/manualimports.go Tue Jan 08 13:35:44 2019 +0100 @@ -18,6 +18,7 @@ "database/sql" "log" "net/http" + "time" "gemma.intevation.de/gemma/pkg/auth" "gemma.intevation.de/gemma/pkg/common" @@ -25,52 +26,65 @@ "gemma.intevation.de/gemma/pkg/models" ) -func importBottleneck(input interface{}) (interface{}, bool, bool) { +func retry(a common.Attributes) (time.Time, int) { + due, _ := a.Time("due") + retries, ok := a.Int("retries") + if !ok { + retries = -1 + } + return due, retries +} + +func importBottleneck(input interface{}) (interface{}, time.Time, int, bool, bool) { bi := input.(*models.BottleneckImport) bn := &imports.Bottleneck{ URL: bi.URL, Insecure: bi.Insecure, } - return bn, bi.SendEmail, false + due, retries := retry(bi.Attributes) + return bn, due, retries, bi.SendEmail, false } -func importGaugeMeasurement(input interface{}) (interface{}, bool, bool) { +func importGaugeMeasurement(input interface{}) (interface{}, time.Time, int, bool, bool) { gi := input.(*models.GaugeMeasurementImport) gm := &imports.GaugeMeasurement{ URL: gi.URL, Insecure: gi.Insecure, } - return gm, gi.SendEmail, true + due, retries := retry(gi.Attributes) + return gm, due, retries, gi.SendEmail, true } -func importFairwayAvailability(input interface{}) (interface{}, bool, bool) { +func importFairwayAvailability(input interface{}) (interface{}, time.Time, int, bool, bool) { fai := input.(*models.FairwayAvailabilityImport) fa := &imports.FairwayAvailability{ URL: fai.URL, Insecure: fai.Insecure, } - return fa, fai.SendEmail, true + due, retries := retry(fai.Attributes) + return fa, due, retries, fai.SendEmail, true } -func importWaterwayAxis(input interface{}) (interface{}, bool, bool) { +func importWaterwayAxis(input interface{}) (interface{}, time.Time, int, bool, bool) { wxi := input.(*models.WaterwayAxisImport) wx := &imports.WaterwayAxis{ URL: wxi.URL, FeatureType: wxi.FeatureType, SortBy: wxi.SortBy, } - return wx, wxi.SendEmail, true + due, retries := retry(wxi.Attributes) + return wx, due, retries, wxi.SendEmail, true } func manualImport( kind imports.JobKind, - setup func(interface{}) (interface{}, bool, bool), + setup func(interface{}) (interface{}, time.Time, int, bool, bool), ) func(interface{}, *http.Request, *sql.Conn) (JSONResult, error) { return func(input interface{}, req *http.Request, _ *sql.Conn) ( jr JSONResult, err error) { - what, sendEmail, autoAccept := setup(input) + what, due, retries, sendEmail, autoAccept := setup(input) var serialized string if serialized, err = common.ToJSONString(what); err != nil { @@ -82,6 +96,7 @@ var jobID int64 if jobID, err = imports.AddJob( kind, + due, retries, session.User, sendEmail, autoAccept, serialized, diff -r 74b66527ae81 -r 49e047c2106e pkg/controllers/srimports.go --- a/pkg/controllers/srimports.go Tue Jan 08 12:34:29 2019 +0100 +++ b/pkg/controllers/srimports.go Tue Jan 08 13:35:44 2019 +0100 @@ -163,8 +163,26 @@ sendEmail := req.FormValue("bottleneck") != "" + var due time.Time + if d := req.FormValue("due"); d != "" { + var err error + if due, err = time.Parse("2006-01-02T15:04:05", d); err != nil { + log.Printf("error: %v\n", err) + } + } + + retries := -1 + if r := req.FormValue("retries"); r != "" { + var err error + if retries, err = strconv.Atoi(r); err != nil { + log.Printf("error: %v\n", err) + retries = -1 + } + } + jobID, err := imports.AddJob( imports.SRJobKind, + due, retries, session.User, sendEmail, false, serialized) diff -r 74b66527ae81 -r 49e047c2106e pkg/imports/config.go --- a/pkg/imports/config.go Tue Jan 08 12:34:29 2019 +0100 +++ b/pkg/imports/config.go Tue Jan 08 13:35:44 2019 +0100 @@ -18,11 +18,11 @@ "database/sql" "encoding/json" "fmt" - "strings" "github.com/robfig/cron" "gemma.intevation.de/gemma/pkg/auth" + "gemma.intevation.de/gemma/pkg/common" ) type ( @@ -33,10 +33,6 @@ // of the registered import types. ImportKind string - // ConfigAttributes is a map of optional key/value attributes - // of an import configuration. - ConfigAttributes map[string]string - // Config is JSON serialized form of a import configuration. Config struct { // Kind is the import type. @@ -56,40 +52,23 @@ // URL is an optional URL used by the import. URL *string `json:"url"` // Attributes are optional key/value pairs for a configuration. - Attributes ConfigAttributes `json:"attributes,omitempty"` + Attributes common.Attributes `json:"attributes,omitempty"` } // IDConfig is the same as Config with an ID. // Mainly used for server delivered configurations. IDConfig struct { - ID int64 `json:"id"` - User string `json:"user"` - Kind ImportKind `json:"kind"` - SendEMail bool `json:"send-email"` - AutoAccept bool `json:"auto-accept"` - Cron *CronSpec `json:"cron,omitempty"` - URL *string `json:"url,omitempty"` - Attributes ConfigAttributes `json:"attributes,omitempty"` + ID int64 `json:"id"` + User string `json:"user"` + Kind ImportKind `json:"kind"` + SendEMail bool `json:"send-email"` + AutoAccept bool `json:"auto-accept"` + Cron *CronSpec `json:"cron,omitempty"` + URL *string `json:"url,omitempty"` + Attributes common.Attributes `json:"attributes,omitempty"` } ) -// Get fetches a value for given key out of the configuration. -// If the key was not found the bool component of the return value -// return false. -func (ca ConfigAttributes) Get(key string) (string, bool) { - if ca == nil { - return "", false - } - value, found := ca[key] - return value, found -} - -// Bool returns a bool value for a given key. -func (ca ConfigAttributes) Bool(key string) bool { - s, found := ca.Get(key) - return found && strings.ToLower(s) == "true" -} - // UnmarshalJSON checks if the incoming string // is a registered import type. func (ik *ImportKind) UnmarshalJSON(data []byte) error { @@ -174,14 +153,14 @@ return err } defer rows.Close() - var attributes map[string]string + var attributes common.Attributes for rows.Next() { var k, v string if err = rows.Scan(&k, &v); err != nil { return err } if attributes == nil { - attributes = map[string]string{} + attributes = common.Attributes{} } attributes[k] = v } diff -r 74b66527ae81 -r 49e047c2106e pkg/imports/queue.go --- a/pkg/imports/queue.go Tue Jan 08 12:34:29 2019 +0100 +++ b/pkg/imports/queue.go Tue Jan 08 13:35:44 2019 +0100 @@ -42,6 +42,15 @@ Error(fmt string, args ...interface{}) } + // RetryError is an error type to signal that + // the import should be tried again. + RetryError struct { + // Message is the error message. + Message string + // When is the new scheduled execution time. + When time.Time + } + // Job is the central abstraction of an import job // run by the import queue. Job interface { @@ -88,6 +97,7 @@ id int64 kind JobKind user string + trysLeft sql.NullInt64 sendEmail bool autoAccept bool data string @@ -131,33 +141,39 @@ insertJobSQL = ` INSERT INTO waterway.imports ( kind, + due, + trys_left, username, send_email, auto_accept, data ) VALUES ( $1, - $2, + COALESCE($2, CURRENT_TIMESTAMP), $3, $4, - $5 + $5, + $6, + $7 ) RETURNING id` selectJobSQL = ` SELECT id, kind, + trys_left, username, send_email, auto_accept, data FROM waterway.imports -WHERE state = 'queued'::waterway.import_state AND enqueued IN ( - SELECT min(enqueued) - FROM waterway.imports - WHERE state = 'queued'::waterway.import_state AND - kind = ANY($1) -) +WHERE + due <= CURRENT_TIMESTAMP AND + state = 'queued'::waterway.import_state AND enqueued IN ( + SELECT min(enqueued) + FROM waterway.imports + WHERE state = 'queued'::waterway.import_state AND + kind = ANY($1)) LIMIT 1` updateStateSQL = ` @@ -186,6 +202,11 @@ go iqueue.importLoop() } +// Error makes RetryError an error. +func (re *RetryError) Error() string { + return re.Message +} + func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) { q.creatorsMu.Lock() defer q.creatorsMu.Unlock() @@ -236,6 +257,13 @@ return names } +func (idj *idJob) trys() int { + if !idj.trysLeft.Valid { + return -1 + } + return int(idj.trysLeft.Int64) +} + func (q *importQueue) jobCreator(kind JobKind) JobCreator { q.creatorsMu.Lock() defer q.creatorsMu.Unlock() @@ -244,17 +272,28 @@ func (q *importQueue) addJob( kind JobKind, + due time.Time, + trysLeft int, user string, sendEmail, autoAccept bool, data string, ) (int64, error) { ctx := context.Background() var id int64 + if due.IsZero() { + due = time.Now() + } + var tl sql.NullInt64 + if trysLeft >= 0 { + tl = sql.NullInt64{Int64: int64(trysLeft), Valid: true} + } err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { return conn.QueryRowContext( ctx, insertJobSQL, string(kind), + due, + tl, user, sendEmail, autoAccept, @@ -270,10 +309,18 @@ } // AddJob adds a job to the global import queue to be executed -// as soon as possible. This is gone in a separate Go routine +// as soon as possible after due. +// This is gone in a separate Go routine // so this will not block. -func AddJob(kind JobKind, user string, sendEmail, autoAccept bool, data string) (int64, error) { - return iqueue.addJob(kind, user, sendEmail, autoAccept, data) +func AddJob( + kind JobKind, + due time.Time, + trysLeft int, + user string, + sendEmail, autoAccept bool, + data string, +) (int64, error) { + return iqueue.addJob(kind, due, trysLeft, user, sendEmail, autoAccept, data) } type logFeedback int64 @@ -357,6 +404,7 @@ if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan( &ji.id, &ji.kind, + &ji.trysLeft, &ji.user, &ji.sendEmail, &ji.autoAccept, @@ -502,9 +550,21 @@ if errDo != nil { feedback.Error("error do: %v", errDo) } - errCleanup := survive(job.CleanUp)() - if errCleanup != nil { - feedback.Error("error cleanup: %v", errCleanup) + // Should we try again? + retry, shouldRetry := errDo.(*RetryError) + + if shouldRetry && idj.trysLeft.Valid { // NULL -> limit less + if idj.trysLeft.Int64--; idj.trysLeft.Int64 <= 0 { + shouldRetry = false + } + } + + var errCleanup error + if !shouldRetry { // cleanup debris + errCleanup = survive(job.CleanUp)() + if errCleanup != nil { + feedback.Error("error cleanup: %v", errCleanup) + } } var state string @@ -524,6 +584,19 @@ if idj.sendEmail { go sendNotificationMail(idj.user, jc.Description(), state, idj.id) } + + if shouldRetry { + nid, err := q.addJob( + idj.kind, + retry.When, idj.trys(), + idj.user, idj.sendEmail, idj.autoAccept, + idj.data) + if err != nil { + log.Printf("error: retry enqueue failed: %v\n", err) + } else { + log.Printf("info: re-enqueued job with id %d\n", nid) + } + } }(jc, idj) } } diff -r 74b66527ae81 -r 49e047c2106e pkg/imports/scheduled.go --- a/pkg/imports/scheduled.go Tue Jan 08 12:34:29 2019 +0100 +++ b/pkg/imports/scheduled.go Tue Jan 08 13:35:44 2019 +0100 @@ -94,9 +94,17 @@ return } + due, _ := cfg.Attributes.Time("due") + + retries, found := cfg.Attributes.Int("retries") + if !found { + retries = -1 + } + var jobID int64 if jobID, err = AddJob( kind, + due, retries, cfg.User, cfg.SendEMail, cfg.AutoAccept, serialized, diff -r 74b66527ae81 -r 49e047c2106e pkg/models/bn.go --- a/pkg/models/bn.go Tue Jan 08 12:34:29 2019 +0100 +++ b/pkg/models/bn.go Tue Jan 08 13:35:44 2019 +0100 @@ -13,10 +13,13 @@ package models +import "gemma.intevation.de/gemma/pkg/common" + type BottleneckImport struct { - URL string `json:"url"` - Insecure bool `json:"insecure"` - SendEmail bool `json:"send-email"` + URL string `json:"url"` + Insecure bool `json:"insecure"` + SendEmail bool `json:"send-email"` + Attributes common.Attributes `json:"attributes,omitempty"` } type Bottleneck struct { diff -r 74b66527ae81 -r 49e047c2106e pkg/models/fa.go --- a/pkg/models/fa.go Tue Jan 08 12:34:29 2019 +0100 +++ b/pkg/models/fa.go Tue Jan 08 13:35:44 2019 +0100 @@ -13,9 +13,13 @@ package models +import "gemma.intevation.de/gemma/pkg/common" + // FairwayAvailabilityImport contains data used to define the endpoint type FairwayAvailabilityImport struct { URL string `json:"url"` Insecure bool `json:"insecure"` SendEmail bool `json:"send-email"` + // Attributes are optional attributes. + Attributes common.Attributes `json:"attributes,omitempty"` } diff -r 74b66527ae81 -r 49e047c2106e pkg/models/gauge.go --- a/pkg/models/gauge.go Tue Jan 08 12:34:29 2019 +0100 +++ b/pkg/models/gauge.go Tue Jan 08 13:35:44 2019 +0100 @@ -18,6 +18,8 @@ "fmt" "strconv" "time" + + "gemma.intevation.de/gemma/pkg/common" ) // GaugeMeasurementImport contains data used to define the endpoint @@ -25,6 +27,8 @@ URL string `json:"url"` Insecure bool `json:"insecure"` SendEmail bool `json:"send-email"` + // Attributes are optional attributes. + Attributes common.Attributes `json:"attributes,omitempty"` } // GaugeMeasurement holds information about a gauge and the latest measurement diff -r 74b66527ae81 -r 49e047c2106e pkg/models/waterway.go --- a/pkg/models/waterway.go Tue Jan 08 12:34:29 2019 +0100 +++ b/pkg/models/waterway.go Tue Jan 08 13:35:44 2019 +0100 @@ -13,6 +13,8 @@ package models +import "gemma.intevation.de/gemma/pkg/common" + // WaterwayAxisImport specifies an import of the waterway axis. type WaterwayAxisImport struct { // URL is the capabilities URL of the WFS. @@ -24,4 +26,6 @@ // SendEmail is set to true if an email should be send after // importing the axis. SendEmail bool `json:"send-email"` + // Attributes are optional attributes. + Attributes common.Attributes `json:"attributes,omitempty"` } diff -r 74b66527ae81 -r 49e047c2106e schema/gemma.sql --- a/schema/gemma.sql Tue Jan 08 12:34:29 2019 +0100 +++ b/schema/gemma.sql Tue Jan 08 13:35:44 2019 +0100 @@ -590,11 +590,13 @@ ); CREATE TABLE waterway.imports ( - id int PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, - state waterway.import_state NOT NULL DEFAULT 'queued', - enqueued timestamp NOT NULL DEFAULT now(), - kind varchar NOT NULL, - username varchar NOT NULL + id int PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY, + state waterway.import_state NOT NULL DEFAULT 'queued', + kind varchar NOT NULL, + enqueued timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + due timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP, + trys_left int, + username varchar NOT NULL REFERENCES internal.user_profiles(username) ON DELETE CASCADE ON UPDATE CASCADE, @@ -602,10 +604,10 @@ REFERENCES internal.user_profiles(username) ON DELETE SET NULL ON UPDATE CASCADE, - send_email boolean NOT NULL DEFAULT false, + send_email boolean NOT NULL DEFAULT false, auto_accept boolean NOT NULL DEFAULT false, - data TEXT, - summary TEXT + data TEXT, + summary TEXT ); CREATE INDEX enqueued_idx ON waterway.imports(enqueued, state);