diff pkg/imports/queue.go @ 1392:0e1d89241cda

Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 28 Nov 2018 09:52:34 +0100
parents d753ce6cf588
children 74b0df86b6e6
line wrap: on
line diff
--- a/pkg/imports/queue.go	Wed Nov 28 08:11:23 2018 +0100
+++ b/pkg/imports/queue.go	Wed Nov 28 09:52:34 2018 +0100
@@ -16,9 +16,11 @@
 import (
 	"context"
 	"database/sql"
+	"encoding/json"
 	"fmt"
 	"log"
 	"runtime/debug"
+	"strings"
 	"sync"
 	"time"
 
@@ -36,7 +38,7 @@
 	}
 
 	Job interface {
-		Do(context.Context, int64, *sql.Conn, Feedback) error
+		Do(context.Context, int64, *sql.Conn, Feedback) (interface{}, error)
 		CleanUp() error
 	}
 
@@ -114,12 +116,18 @@
   WHERE state = 'queued'::waterway.import_state AND
   kind = ANY($1)
 )
-LIMIT 1
-`
+LIMIT 1`
+
 	updateStateSQL = `
 UPDATE waterway.imports SET state = $1::waterway.import_state
-WHERE id = $2
-`
+WHERE id = $2`
+
+	updateStateSummarySQL = `
+UPDATE waterway.imports SET
+   state = $1::waterway.import_state,
+   summary = $2
+WHERE id = $3`
+
 	logMessageSQL = `
 INSERT INTO waterway.import_logs (
   import_id,
@@ -273,10 +281,22 @@
 	return &ji, nil
 }
 
-func updateState(id int64, state string) error {
-	ctx := context.Background()
+func updateStateSummary(
+	ctx context.Context,
+	id int64,
+	state string,
+	summary interface{},
+) error {
+	var s sql.NullString
+	if summary != nil {
+		var b strings.Builder
+		if err := json.NewEncoder(&b).Encode(summary); err != nil {
+			return err
+		}
+		s = sql.NullString{String: b.String(), Valid: true}
+	}
 	return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
-		_, err := conn.ExecContext(ctx, updateStateSQL, state, id)
+		_, err := conn.ExecContext(ctx, updateStateSummarySQL, state, s, id)
 		return err
 	})
 }
@@ -370,11 +390,15 @@
 
 			feedback.Info("import #%d started", idj.id)
 
+			ctx := context.Background()
+			var summary interface{}
+
 			errDo := survive(func() error {
-				ctx := context.Background()
 				return auth.RunAs(ctx, idj.user,
 					func(conn *sql.Conn) error {
-						return job.Do(ctx, idj.id, conn, feedback)
+						var err error
+						summary, err = job.Do(ctx, idj.id, conn, feedback)
+						return err
 					})
 			})()
 			if errDo != nil {
@@ -391,7 +415,7 @@
 			} else {
 				state = "pending"
 			}
-			if err := updateState(idj.id, state); err != nil {
+			if err := updateStateSummary(ctx, idj.id, state, summary); err != nil {
 				log.Printf("setting state of job %d failed: %v\n", idj.id, err)
 			}
 			log.Printf("import #%d finished: %s\n", idj.id, state)