changeset 1708:49e047c2106e

Imports: Made imports re-runnable if they fail.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Tue, 08 Jan 2019 13:35:44 +0100
parents 74b66527ae81
children 8ff8d873ef6b
files pkg/common/attributes.go pkg/controllers/importconfig.go pkg/controllers/manualimports.go pkg/controllers/srimports.go pkg/imports/config.go pkg/imports/queue.go pkg/imports/scheduled.go pkg/models/bn.go pkg/models/fa.go pkg/models/gauge.go pkg/models/waterway.go schema/gemma.sql
diffstat 12 files changed, 249 insertions(+), 69 deletions(-) [+]
line wrap: on
line diff
--- /dev/null	Thu Jan 01 00:00:00 1970 +0000
+++ b/pkg/common/attributes.go	Tue Jan 08 13:35:44 2019 +0100
@@ -0,0 +1,69 @@
+// This is Free Software under GNU Affero General Public License v >= 3.0
+// without warranty, see README.md and license for details.
+//
+// SPDX-License-Identifier: AGPL-3.0-or-later
+// License-Filename: LICENSES/AGPL-3.0.txt
+//
+// Copyright (C) 2018 by via donau
+//   – Österreichische Wasserstraßen-Gesellschaft mbH
+// Software engineering by Intevation GmbH
+//
+// Author(s):
+//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>
+
+package common
+
+import (
+	"log"
+	"strconv"
+	"strings"
+	"time"
+)
+
+// Attributes is a map of optional key/value attributes
+// of a configuration.
+type Attributes map[string]string
+
+// Get fetches a value for given key out of the configuration.
+// If the key was not found the bool component of the return value
+// return false.
+func (ca Attributes) Get(key string) (string, bool) {
+	if ca == nil {
+		return "", false
+	}
+	value, found := ca[key]
+	return value, found
+}
+
+// Bool returns a bool value for a given key.
+func (ca Attributes) Bool(key string) bool {
+	s, found := ca.Get(key)
+	return found && strings.ToLower(s) == "true"
+}
+
+// Time gives a time.Time for a given key.
+func (ca Attributes) Time(key string) (time.Time, bool) {
+	s, found := ca.Get(key)
+	if !found {
+		return time.Time{}, false
+	}
+	t, err := time.Parse("2006-01-02T15:04:05", s)
+	if err != nil {
+		log.Printf("error: %v\n", err)
+		return time.Time{}, false
+	}
+	return t, true
+}
+
+func (ca Attributes) Int(key string) (int, bool) {
+	s, found := ca.Get(key)
+	if !found {
+		return 0, false
+	}
+	i, err := strconv.Atoi(s)
+	if err != nil {
+		log.Printf("error: %v\n", err)
+		return 0, false
+	}
+	return i, true
+}
--- a/pkg/controllers/importconfig.go	Tue Jan 08 12:34:29 2019 +0100
+++ b/pkg/controllers/importconfig.go	Tue Jan 08 13:35:44 2019 +0100
@@ -25,6 +25,7 @@
 	"github.com/gorilla/mux"
 
 	"gemma.intevation.de/gemma/pkg/auth"
+	"gemma.intevation.de/gemma/pkg/common"
 	"gemma.intevation.de/gemma/pkg/imports"
 	"gemma.intevation.de/gemma/pkg/scheduler"
 )
@@ -301,7 +302,7 @@
 	ctx context.Context,
 	tx *sql.Tx,
 	id int64,
-	attrs imports.ConfigAttributes,
+	attrs common.Attributes,
 ) error {
 	if len(attrs) == 0 {
 		return nil
--- a/pkg/controllers/manualimports.go	Tue Jan 08 12:34:29 2019 +0100
+++ b/pkg/controllers/manualimports.go	Tue Jan 08 13:35:44 2019 +0100
@@ -18,6 +18,7 @@
 	"database/sql"
 	"log"
 	"net/http"
+	"time"
 
 	"gemma.intevation.de/gemma/pkg/auth"
 	"gemma.intevation.de/gemma/pkg/common"
@@ -25,52 +26,65 @@
 	"gemma.intevation.de/gemma/pkg/models"
 )
 
-func importBottleneck(input interface{}) (interface{}, bool, bool) {
+func retry(a common.Attributes) (time.Time, int) {
+	due, _ := a.Time("due")
+	retries, ok := a.Int("retries")
+	if !ok {
+		retries = -1
+	}
+	return due, retries
+}
+
+func importBottleneck(input interface{}) (interface{}, time.Time, int, bool, bool) {
 	bi := input.(*models.BottleneckImport)
 	bn := &imports.Bottleneck{
 		URL:      bi.URL,
 		Insecure: bi.Insecure,
 	}
-	return bn, bi.SendEmail, false
+	due, retries := retry(bi.Attributes)
+	return bn, due, retries, bi.SendEmail, false
 }
 
-func importGaugeMeasurement(input interface{}) (interface{}, bool, bool) {
+func importGaugeMeasurement(input interface{}) (interface{}, time.Time, int, bool, bool) {
 	gi := input.(*models.GaugeMeasurementImport)
 	gm := &imports.GaugeMeasurement{
 		URL:      gi.URL,
 		Insecure: gi.Insecure,
 	}
-	return gm, gi.SendEmail, true
+	due, retries := retry(gi.Attributes)
+	return gm, due, retries, gi.SendEmail, true
 }
 
-func importFairwayAvailability(input interface{}) (interface{}, bool, bool) {
+func importFairwayAvailability(input interface{}) (interface{}, time.Time, int, bool, bool) {
 	fai := input.(*models.FairwayAvailabilityImport)
 	fa := &imports.FairwayAvailability{
 		URL:      fai.URL,
 		Insecure: fai.Insecure,
 	}
-	return fa, fai.SendEmail, true
+	due, retries := retry(fai.Attributes)
+	return fa, due, retries, fai.SendEmail, true
 }
 
-func importWaterwayAxis(input interface{}) (interface{}, bool, bool) {
+func importWaterwayAxis(input interface{}) (interface{}, time.Time, int, bool, bool) {
 	wxi := input.(*models.WaterwayAxisImport)
 	wx := &imports.WaterwayAxis{
 		URL:         wxi.URL,
 		FeatureType: wxi.FeatureType,
 		SortBy:      wxi.SortBy,
 	}
-	return wx, wxi.SendEmail, true
+	due, retries := retry(wxi.Attributes)
+	return wx, due, retries, wxi.SendEmail, true
 }
 
 func manualImport(
 	kind imports.JobKind,
-	setup func(interface{}) (interface{}, bool, bool),
+	setup func(interface{}) (interface{}, time.Time, int, bool, bool),
 ) func(interface{}, *http.Request, *sql.Conn) (JSONResult, error) {
 
 	return func(input interface{}, req *http.Request, _ *sql.Conn) (
 		jr JSONResult, err error) {
 
-		what, sendEmail, autoAccept := setup(input)
+		what, due, retries, sendEmail, autoAccept := setup(input)
 
 		var serialized string
 		if serialized, err = common.ToJSONString(what); err != nil {
@@ -82,6 +96,7 @@
 		var jobID int64
 		if jobID, err = imports.AddJob(
 			kind,
+			due, retries,
 			session.User,
 			sendEmail, autoAccept,
 			serialized,
--- a/pkg/controllers/srimports.go	Tue Jan 08 12:34:29 2019 +0100
+++ b/pkg/controllers/srimports.go	Tue Jan 08 13:35:44 2019 +0100
@@ -163,8 +163,26 @@
 
 	sendEmail := req.FormValue("bottleneck") != ""
 
+	var due time.Time
+	if d := req.FormValue("due"); d != "" {
+		var err error
+		if due, err = time.Parse("2006-01-02T15:04:05", d); err != nil {
+			log.Printf("error: %v\n", err)
+		}
+	}
+
+	retries := -1
+	if r := req.FormValue("retries"); r != "" {
+		var err error
+		if retries, err = strconv.Atoi(r); err != nil {
+			log.Printf("error: %v\n", err)
+			retries = -1
+		}
+	}
+
 	jobID, err := imports.AddJob(
 		imports.SRJobKind,
+		due, retries,
 		session.User,
 		sendEmail, false,
 		serialized)
--- a/pkg/imports/config.go	Tue Jan 08 12:34:29 2019 +0100
+++ b/pkg/imports/config.go	Tue Jan 08 13:35:44 2019 +0100
@@ -18,11 +18,11 @@
 	"database/sql"
 	"encoding/json"
 	"fmt"
-	"strings"
 
 	"github.com/robfig/cron"
 
 	"gemma.intevation.de/gemma/pkg/auth"
+	"gemma.intevation.de/gemma/pkg/common"
 )
 
 type (
@@ -33,10 +33,6 @@
 	// of the registered import types.
 	ImportKind string
 
-	// ConfigAttributes is a map of optional key/value attributes
-	// of an import configuration.
-	ConfigAttributes map[string]string
-
 	// Config is JSON serialized form of a import configuration.
 	Config struct {
 		// Kind is the import type.
@@ -56,40 +52,23 @@
 		// URL is an optional URL used by the import.
 		URL *string `json:"url"`
 		// Attributes are optional key/value pairs for a configuration.
-		Attributes ConfigAttributes `json:"attributes,omitempty"`
+		Attributes common.Attributes `json:"attributes,omitempty"`
 	}
 
 	// IDConfig is the same as Config with an ID.
 	// Mainly used for server delivered configurations.
 	IDConfig struct {
-		ID         int64            `json:"id"`
-		User       string           `json:"user"`
-		Kind       ImportKind       `json:"kind"`
-		SendEMail  bool             `json:"send-email"`
-		AutoAccept bool             `json:"auto-accept"`
-		Cron       *CronSpec        `json:"cron,omitempty"`
-		URL        *string          `json:"url,omitempty"`
-		Attributes ConfigAttributes `json:"attributes,omitempty"`
+		ID         int64             `json:"id"`
+		User       string            `json:"user"`
+		Kind       ImportKind        `json:"kind"`
+		SendEMail  bool              `json:"send-email"`
+		AutoAccept bool              `json:"auto-accept"`
+		Cron       *CronSpec         `json:"cron,omitempty"`
+		URL        *string           `json:"url,omitempty"`
+		Attributes common.Attributes `json:"attributes,omitempty"`
 	}
 )
 
-// Get fetches a value for given key out of the configuration.
-// If the key was not found the bool component of the return value
-// return false.
-func (ca ConfigAttributes) Get(key string) (string, bool) {
-	if ca == nil {
-		return "", false
-	}
-	value, found := ca[key]
-	return value, found
-}
-
-// Bool returns a bool value for a given key.
-func (ca ConfigAttributes) Bool(key string) bool {
-	s, found := ca.Get(key)
-	return found && strings.ToLower(s) == "true"
-}
-
 // UnmarshalJSON checks if the incoming string
 // is a registered import type.
 func (ik *ImportKind) UnmarshalJSON(data []byte) error {
@@ -174,14 +153,14 @@
 			return err
 		}
 		defer rows.Close()
-		var attributes map[string]string
+		var attributes common.Attributes
 		for rows.Next() {
 			var k, v string
 			if err = rows.Scan(&k, &v); err != nil {
 				return err
 			}
 			if attributes == nil {
-				attributes = map[string]string{}
+				attributes = common.Attributes{}
 			}
 			attributes[k] = v
 		}
--- a/pkg/imports/queue.go	Tue Jan 08 12:34:29 2019 +0100
+++ b/pkg/imports/queue.go	Tue Jan 08 13:35:44 2019 +0100
@@ -42,6 +42,15 @@
 		Error(fmt string, args ...interface{})
 	}
 
+	// RetryError is an error type to signal that
+	// the import should be tried again.
+	RetryError struct {
+		// Message is the error message.
+		Message string
+		// When is the new scheduled execution time.
+		When time.Time
+	}
+
 	// Job is the central abstraction of an import job
 	// run by the import queue.
 	Job interface {
@@ -88,6 +97,7 @@
 		id         int64
 		kind       JobKind
 		user       string
+		trysLeft   sql.NullInt64
 		sendEmail  bool
 		autoAccept bool
 		data       string
@@ -131,33 +141,39 @@
 	insertJobSQL = `
 INSERT INTO waterway.imports (
   kind,
+  due,
+  trys_left,
   username,
   send_email,
   auto_accept,
   data
 ) VALUES (
   $1,
-  $2,
+  COALESCE($2, CURRENT_TIMESTAMP),
   $3,
   $4,
-  $5
+  $5,
+  $6,
+  $7
 ) RETURNING id`
 
 	selectJobSQL = `
 SELECT
   id,
   kind,
+  trys_left,
   username,
   send_email,
   auto_accept,
   data
 FROM waterway.imports
-WHERE state = 'queued'::waterway.import_state AND enqueued IN (
-  SELECT min(enqueued)
-  FROM waterway.imports
-  WHERE state = 'queued'::waterway.import_state AND
-  kind = ANY($1)
-)
+WHERE
+  due <= CURRENT_TIMESTAMP AND
+  state = 'queued'::waterway.import_state AND enqueued IN (
+    SELECT min(enqueued)
+    FROM waterway.imports
+    WHERE state = 'queued'::waterway.import_state AND
+    kind = ANY($1))
 LIMIT 1`
 
 	updateStateSQL = `
@@ -186,6 +202,11 @@
 	go iqueue.importLoop()
 }
 
+// Error makes RetryError an error.
+func (re *RetryError) Error() string {
+	return re.Message
+}
+
 func (q *importQueue) registerJobCreator(kind JobKind, jc JobCreator) {
 	q.creatorsMu.Lock()
 	defer q.creatorsMu.Unlock()
@@ -236,6 +257,13 @@
 	return names
 }
 
+func (idj *idJob) trys() int {
+	if !idj.trysLeft.Valid {
+		return -1
+	}
+	return int(idj.trysLeft.Int64)
+}
+
 func (q *importQueue) jobCreator(kind JobKind) JobCreator {
 	q.creatorsMu.Lock()
 	defer q.creatorsMu.Unlock()
@@ -244,17 +272,28 @@
 
 func (q *importQueue) addJob(
 	kind JobKind,
+	due time.Time,
+	trysLeft int,
 	user string,
 	sendEmail, autoAccept bool,
 	data string,
 ) (int64, error) {
 	ctx := context.Background()
 	var id int64
+	if due.IsZero() {
+		due = time.Now()
+	}
+	var tl sql.NullInt64
+	if trysLeft >= 0 {
+		tl = sql.NullInt64{Int64: int64(trysLeft), Valid: true}
+	}
 	err := auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
 		return conn.QueryRowContext(
 			ctx,
 			insertJobSQL,
 			string(kind),
+			due,
+			tl,
 			user,
 			sendEmail,
 			autoAccept,
@@ -270,10 +309,18 @@
 }
 
 // AddJob adds a job to the global import queue to be executed
-// as soon as possible. This is gone in a separate Go routine
+// as soon as possible after due.
+// This is gone in a separate Go routine
 // so this will not block.
-func AddJob(kind JobKind, user string, sendEmail, autoAccept bool, data string) (int64, error) {
-	return iqueue.addJob(kind, user, sendEmail, autoAccept, data)
+func AddJob(
+	kind JobKind,
+	due time.Time,
+	trysLeft int,
+	user string,
+	sendEmail, autoAccept bool,
+	data string,
+) (int64, error) {
+	return iqueue.addJob(kind, due, trysLeft, user, sendEmail, autoAccept, data)
 }
 
 type logFeedback int64
@@ -357,6 +404,7 @@
 		if err = tx.QueryRowContext(ctx, selectJobSQL, &kinds).Scan(
 			&ji.id,
 			&ji.kind,
+			&ji.trysLeft,
 			&ji.user,
 			&ji.sendEmail,
 			&ji.autoAccept,
@@ -502,9 +550,21 @@
 			if errDo != nil {
 				feedback.Error("error do: %v", errDo)
 			}
-			errCleanup := survive(job.CleanUp)()
-			if errCleanup != nil {
-				feedback.Error("error cleanup: %v", errCleanup)
+			// Should we try again?
+			retry, shouldRetry := errDo.(*RetryError)
+
+			if shouldRetry && idj.trysLeft.Valid { // NULL -> limit less
+				if idj.trysLeft.Int64--; idj.trysLeft.Int64 <= 0 {
+					shouldRetry = false
+				}
+			}
+
+			var errCleanup error
+			if !shouldRetry { // cleanup debris
+				errCleanup = survive(job.CleanUp)()
+				if errCleanup != nil {
+					feedback.Error("error cleanup: %v", errCleanup)
+				}
 			}
 
 			var state string
@@ -524,6 +584,19 @@
 			if idj.sendEmail {
 				go sendNotificationMail(idj.user, jc.Description(), state, idj.id)
 			}
+
+			if shouldRetry {
+				nid, err := q.addJob(
+					idj.kind,
+					retry.When, idj.trys(),
+					idj.user, idj.sendEmail, idj.autoAccept,
+					idj.data)
+				if err != nil {
+					log.Printf("error: retry enqueue failed: %v\n", err)
+				} else {
+					log.Printf("info: re-enqueued job with id %d\n", nid)
+				}
+			}
 		}(jc, idj)
 	}
 }
--- a/pkg/imports/scheduled.go	Tue Jan 08 12:34:29 2019 +0100
+++ b/pkg/imports/scheduled.go	Tue Jan 08 13:35:44 2019 +0100
@@ -94,9 +94,17 @@
 			return
 		}
 
+		due, _ := cfg.Attributes.Time("due")
+
+		retries, found := cfg.Attributes.Int("retries")
+		if !found {
+			retries = -1
+		}
+
 		var jobID int64
 		if jobID, err = AddJob(
 			kind,
+			due, retries,
 			cfg.User,
 			cfg.SendEMail, cfg.AutoAccept,
 			serialized,
--- a/pkg/models/bn.go	Tue Jan 08 12:34:29 2019 +0100
+++ b/pkg/models/bn.go	Tue Jan 08 13:35:44 2019 +0100
@@ -13,10 +13,13 @@
 
 package models
 
+import "gemma.intevation.de/gemma/pkg/common"
+
 type BottleneckImport struct {
-	URL       string `json:"url"`
-	Insecure  bool   `json:"insecure"`
-	SendEmail bool   `json:"send-email"`
+	URL        string            `json:"url"`
+	Insecure   bool              `json:"insecure"`
+	SendEmail  bool              `json:"send-email"`
+	Attributes common.Attributes `json:"attributes,omitempty"`
 }
 
 type Bottleneck struct {
--- a/pkg/models/fa.go	Tue Jan 08 12:34:29 2019 +0100
+++ b/pkg/models/fa.go	Tue Jan 08 13:35:44 2019 +0100
@@ -13,9 +13,13 @@
 
 package models
 
+import "gemma.intevation.de/gemma/pkg/common"
+
 // FairwayAvailabilityImport contains data used to define the endpoint
 type FairwayAvailabilityImport struct {
 	URL       string `json:"url"`
 	Insecure  bool   `json:"insecure"`
 	SendEmail bool   `json:"send-email"`
+	// Attributes are optional attributes.
+	Attributes common.Attributes `json:"attributes,omitempty"`
 }
--- a/pkg/models/gauge.go	Tue Jan 08 12:34:29 2019 +0100
+++ b/pkg/models/gauge.go	Tue Jan 08 13:35:44 2019 +0100
@@ -18,6 +18,8 @@
 	"fmt"
 	"strconv"
 	"time"
+
+	"gemma.intevation.de/gemma/pkg/common"
 )
 
 // GaugeMeasurementImport contains data used to define the endpoint
@@ -25,6 +27,8 @@
 	URL       string `json:"url"`
 	Insecure  bool   `json:"insecure"`
 	SendEmail bool   `json:"send-email"`
+	// Attributes are optional attributes.
+	Attributes common.Attributes `json:"attributes,omitempty"`
 }
 
 // GaugeMeasurement holds information about a gauge and the latest measurement
--- a/pkg/models/waterway.go	Tue Jan 08 12:34:29 2019 +0100
+++ b/pkg/models/waterway.go	Tue Jan 08 13:35:44 2019 +0100
@@ -13,6 +13,8 @@
 
 package models
 
+import "gemma.intevation.de/gemma/pkg/common"
+
 // WaterwayAxisImport specifies an import of the waterway axis.
 type WaterwayAxisImport struct {
 	// URL is the capabilities URL of the WFS.
@@ -24,4 +26,6 @@
 	// SendEmail is set to true if an email should be send after
 	// importing the axis.
 	SendEmail bool `json:"send-email"`
+	// Attributes are optional attributes.
+	Attributes common.Attributes `json:"attributes,omitempty"`
 }
--- a/schema/gemma.sql	Tue Jan 08 12:34:29 2019 +0100
+++ b/schema/gemma.sql	Tue Jan 08 13:35:44 2019 +0100
@@ -590,11 +590,13 @@
 );
 
 CREATE TABLE waterway.imports (
-    id int PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
-    state waterway.import_state NOT NULL DEFAULT 'queued',
-    enqueued timestamp NOT NULL DEFAULT now(),
-    kind  varchar NOT NULL,
-    username varchar NOT NULL
+    id        int PRIMARY KEY GENERATED BY DEFAULT AS IDENTITY,
+    state     waterway.import_state NOT NULL DEFAULT 'queued',
+    kind      varchar   NOT NULL,
+    enqueued  timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+    due       timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
+    trys_left int,
+    username  varchar   NOT NULL
         REFERENCES internal.user_profiles(username)
             ON DELETE CASCADE
             ON UPDATE CASCADE,
@@ -602,10 +604,10 @@
         REFERENCES internal.user_profiles(username)
             ON DELETE SET NULL
             ON UPDATE CASCADE,
-    send_email boolean NOT NULL DEFAULT false,
+    send_email  boolean NOT NULL DEFAULT false,
     auto_accept boolean NOT NULL DEFAULT false,
-    data TEXT,
-    summary TEXT
+    data        TEXT,
+    summary     TEXT
 );
 
 CREATE INDEX enqueued_idx ON waterway.imports(enqueued, state);