changeset 1392:0e1d89241cda

Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Wed, 28 Nov 2018 09:52:34 +0100
parents 801ae5f4bc5b
children efd77496de75
files pkg/controllers/importqueue.go pkg/imports/queue.go pkg/imports/sr.go pkg/models/import.go schema/gemma.sql
diffstat 5 files changed, 96 insertions(+), 43 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/controllers/importqueue.go	Wed Nov 28 08:11:23 2018 +0100
+++ b/pkg/controllers/importqueue.go	Wed Nov 28 09:52:34 2018 +0100
@@ -15,17 +15,19 @@
 
 import (
 	"database/sql"
+	"encoding/json"
 	"fmt"
 	"log"
 	"net/http"
 	"strconv"
 	"strings"
 
+	"github.com/gorilla/mux"
+	"github.com/jackc/pgx/pgtype"
+
 	"gemma.intevation.de/gemma/pkg/auth"
 	"gemma.intevation.de/gemma/pkg/imports"
 	"gemma.intevation.de/gemma/pkg/models"
-	"github.com/gorilla/mux"
-	"github.com/jackc/pgx/pgtype"
 )
 
 const (
@@ -36,7 +38,8 @@
   enqueued,
   kind,
   username,
-  signer
+  signer,
+  summary
 FROM waterway.imports
 `
 
@@ -166,7 +169,7 @@
 
 	imports := make([]*models.Import, 0, 20)
 
-	var signer sql.NullString
+	var signer, summary sql.NullString
 
 	for rows.Next() {
 		var it models.Import
@@ -177,12 +180,19 @@
 			&it.Kind,
 			&it.User,
 			&signer,
+			&summary,
 		); err != nil {
 			return
 		}
 		if signer.Valid {
 			it.Signer = signer.String
 		}
+		if summary.Valid {
+			if err = json.NewDecoder(
+				strings.NewReader(summary.String)).Decode(&it.Summary); err != nil {
+				return
+			}
+		}
 		imports = append(imports, &it)
 	}
 
--- a/pkg/imports/queue.go	Wed Nov 28 08:11:23 2018 +0100
+++ b/pkg/imports/queue.go	Wed Nov 28 09:52:34 2018 +0100
@@ -16,9 +16,11 @@
 import (
 	"context"
 	"database/sql"
+	"encoding/json"
 	"fmt"
 	"log"
 	"runtime/debug"
+	"strings"
 	"sync"
 	"time"
 
@@ -36,7 +38,7 @@
 	}
 
 	Job interface {
-		Do(context.Context, int64, *sql.Conn, Feedback) error
+		Do(context.Context, int64, *sql.Conn, Feedback) (interface{}, error)
 		CleanUp() error
 	}
 
@@ -114,12 +116,18 @@
   WHERE state = 'queued'::waterway.import_state AND
   kind = ANY($1)
 )
-LIMIT 1
-`
+LIMIT 1`
+
 	updateStateSQL = `
 UPDATE waterway.imports SET state = $1::waterway.import_state
-WHERE id = $2
-`
+WHERE id = $2`
+
+	updateStateSummarySQL = `
+UPDATE waterway.imports SET
+   state = $1::waterway.import_state,
+   summary = $2
+WHERE id = $3`
+
 	logMessageSQL = `
 INSERT INTO waterway.import_logs (
   import_id,
@@ -273,10 +281,22 @@
 	return &ji, nil
 }
 
-func updateState(id int64, state string) error {
-	ctx := context.Background()
+func updateStateSummary(
+	ctx context.Context,
+	id int64,
+	state string,
+	summary interface{},
+) error {
+	var s sql.NullString
+	if summary != nil {
+		var b strings.Builder
+		if err := json.NewEncoder(&b).Encode(summary); err != nil {
+			return err
+		}
+		s = sql.NullString{String: b.String(), Valid: true}
+	}
 	return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error {
-		_, err := conn.ExecContext(ctx, updateStateSQL, state, id)
+		_, err := conn.ExecContext(ctx, updateStateSummarySQL, state, s, id)
 		return err
 	})
 }
@@ -370,11 +390,15 @@
 
 			feedback.Info("import #%d started", idj.id)
 
+			ctx := context.Background()
+			var summary interface{}
+
 			errDo := survive(func() error {
-				ctx := context.Background()
 				return auth.RunAs(ctx, idj.user,
 					func(conn *sql.Conn) error {
-						return job.Do(ctx, idj.id, conn, feedback)
+						var err error
+						summary, err = job.Do(ctx, idj.id, conn, feedback)
+						return err
 					})
 			})()
 			if errDo != nil {
@@ -391,7 +415,7 @@
 			} else {
 				state = "pending"
 			}
-			if err := updateState(idj.id, state); err != nil {
+			if err := updateStateSummary(ctx, idj.id, state, summary); err != nil {
 				log.Printf("setting state of job %d failed: %v\n", idj.id, err)
 			}
 			log.Printf("import #%d finished: %s\n", idj.id, state)
--- a/pkg/imports/sr.go	Wed Nov 28 08:11:23 2018 +0100
+++ b/pkg/imports/sr.go	Wed Nov 28 09:52:34 2018 +0100
@@ -115,6 +115,8 @@
 )
 RETURNING
   id,
+  ST_X(ST_Centroid(point_cloud::geometry)),
+  ST_Y(ST_Centroid(point_cloud::geometry)),
   CASE WHEN ST_Y(ST_Centroid(point_cloud::geometry)) > 0 THEN
     32600
   ELSE
@@ -169,58 +171,61 @@
 	importID int64,
 	conn *sql.Conn,
 	feedback Feedback,
-) error {
+) (interface{}, error) {
 
 	z, err := zip.OpenReader(filepath.Join(sr.Dir, "sr.zip"))
 	if err != nil {
-		return err
+		return nil, err
 	}
 	defer z.Close()
 
 	feedback.Info("Looking for 'meta.json'")
 	mf := common.FindInZIP(z, "meta.json")
 	if mf == nil && !sr.completeOverride() {
-		return errors.New("Cannot find 'meta.json'")
+		return nil, errors.New("Cannot find 'meta.json'")
 	}
 
 	m, err := sr.loadMeta(mf)
 	if err != nil {
-		return err
+		return nil, err
 	}
 
 	if err := m.Validate(ctx, conn); err != nil {
-		return common.ToError(err)
+		return nil, common.ToError(err)
 	}
 
 	feedback.Info("Looking for '*.xyz'")
 	xyzf := common.FindInZIP(z, ".xyz")
 	if xyzf == nil {
-		return errors.New("Cannot find any *.xyz file")
+		return nil, errors.New("Cannot find any *.xyz file")
 	}
 
 	xyz, err := loadXYZ(xyzf, feedback)
 	if err != nil {
-		return err
+		return nil, err
 	}
 
 	if len(xyz) == 0 {
-		return errors.New("XYZ does not contain any vertices")
+		return nil, errors.New("XYZ does not contain any vertices")
 	}
 
 	// Is there a boundary shapefile in the ZIP archive?
 	polygon, err := loadBoundary(z)
 	if err != nil {
-		return err
+		return nil, err
 	}
 
 	tx, err := conn.BeginTx(ctx, nil)
 	if err != nil {
-		return err
+		return nil, err
 	}
 	defer tx.Rollback()
 
-	var id int64
-	var epsg uint32
+	var (
+		id       int64
+		epsg     uint32
+		lat, lon float64
+	)
 	start := time.Now()
 
 	err = tx.QueryRow(insertPointsSQL,
@@ -230,11 +235,11 @@
 		xyz.AsWKB(),
 		polygon.AsWBK(),
 		m.EPSG,
-	).Scan(&id, &epsg)
+	).Scan(&id, &lat, &lon, &epsg)
 	xyz, polygon = nil, nil // not need from now on.
 	feedback.Info("storing points took %s", time.Since(start))
 	if err != nil {
-		return err
+		return nil, err
 	}
 
 	feedback.Info("Best suited UTM EPSG: %d", epsg)
@@ -244,11 +249,11 @@
 	tin, err := octree.GenerateTinByID(ctx, conn, id, epsg)
 	feedback.Info("triangulation took %s", time.Since(start))
 	if err != nil {
-		return err
+		return nil, err
 	}
 
 	if tin == nil {
-		return errors.New("cannot load TIN from database")
+		return nil, errors.New("cannot load TIN from database")
 	}
 
 	feedback.Info("Building octree...")
@@ -259,7 +264,7 @@
 	tin = nil // not needed from now on
 	feedback.Info("building octree took %s", time.Since(start))
 	if err != nil {
-		return err
+		return nil, err
 	}
 
 	feedback.Info("Store octree...")
@@ -270,7 +275,7 @@
 	_, err = tx.Exec(insertOctreeSQL, id, checksum, octreeIndex)
 	feedback.Info("storing octree index took %s", time.Since(start))
 	if err != nil {
-		return err
+		return nil, err
 	}
 
 	tree := builder.Tree()
@@ -281,18 +286,30 @@
 	err = generateContours(tree, tx, id)
 	feedback.Info("generating and storing contour lines took %s", time.Since(start))
 	if err != nil {
-		return err
+		return nil, err
 	}
 
 	// Store for potential later removal.
 	if err = track(ctx, tx, importID, "waterway.sounding_results", id); err != nil {
-		return err
+		return nil, err
 	}
 
 	if err = tx.Commit(); err == nil {
 		feedback.Info("Storing sounding result was successful.")
 	}
-	return err
+
+	summary := struct {
+		Bottleneck string                    `json:"bottleneck"`
+		Date       models.SoundingResultDate `json:"date"`
+		Lat        float64                   `json:"lat"`
+		Lon        float64                   `json:"lon"`
+	}{
+		Bottleneck: m.Bottleneck,
+		Date:       m.Date,
+		Lat:        lat,
+		Lon:        lon,
+	}
+	return &summary, err
 }
 
 func (sr *SoundingResult) CleanUp() error {
--- a/pkg/models/import.go	Wed Nov 28 08:11:23 2018 +0100
+++ b/pkg/models/import.go	Wed Nov 28 09:52:34 2018 +0100
@@ -22,12 +22,13 @@
 	ImportTime struct{ time.Time }
 
 	Import struct {
-		ID       int64      `json:"id"`
-		State    string     `json:"state"`
-		Enqueued ImportTime `json:"enqueued"`
-		Kind     string     `json:"kind"`
-		User     string     `json:"user"`
-		Signer   string     `json:"signer,omitempty"`
+		ID       int64       `json:"id"`
+		State    string      `json:"state"`
+		Enqueued ImportTime  `json:"enqueued"`
+		Kind     string      `json:"kind"`
+		User     string      `json:"user"`
+		Signer   string      `json:"signer,omitempty"`
+		Summary  interface{} `json:"summary,omitempty"`
 	}
 
 	ImportLogEntry struct {
--- a/schema/gemma.sql	Wed Nov 28 08:11:23 2018 +0100
+++ b/schema/gemma.sql	Wed Nov 28 09:52:34 2018 +0100
@@ -547,7 +547,8 @@
         REFERENCES internal.user_profiles(username)
             ON DELETE SET NULL
             ON UPDATE CASCADE,
-    data TEXT
+    data TEXT,
+    summary TEXT
 );
 
 CREATE INDEX enqueued_idx ON waterway.imports(enqueued, state);