Mercurial > gemma
changeset 1392:0e1d89241cda
Imports: An Import (e.g. a sounding result import) can now write a 'summary' of a successful import. This is done if the import switches to to state 'pending'.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 28 Nov 2018 09:52:34 +0100 |
parents | 801ae5f4bc5b |
children | efd77496de75 |
files | pkg/controllers/importqueue.go pkg/imports/queue.go pkg/imports/sr.go pkg/models/import.go schema/gemma.sql |
diffstat | 5 files changed, 96 insertions(+), 43 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/controllers/importqueue.go Wed Nov 28 08:11:23 2018 +0100 +++ b/pkg/controllers/importqueue.go Wed Nov 28 09:52:34 2018 +0100 @@ -15,17 +15,19 @@ import ( "database/sql" + "encoding/json" "fmt" "log" "net/http" "strconv" "strings" + "github.com/gorilla/mux" + "github.com/jackc/pgx/pgtype" + "gemma.intevation.de/gemma/pkg/auth" "gemma.intevation.de/gemma/pkg/imports" "gemma.intevation.de/gemma/pkg/models" - "github.com/gorilla/mux" - "github.com/jackc/pgx/pgtype" ) const ( @@ -36,7 +38,8 @@ enqueued, kind, username, - signer + signer, + summary FROM waterway.imports ` @@ -166,7 +169,7 @@ imports := make([]*models.Import, 0, 20) - var signer sql.NullString + var signer, summary sql.NullString for rows.Next() { var it models.Import @@ -177,12 +180,19 @@ &it.Kind, &it.User, &signer, + &summary, ); err != nil { return } if signer.Valid { it.Signer = signer.String } + if summary.Valid { + if err = json.NewDecoder( + strings.NewReader(summary.String)).Decode(&it.Summary); err != nil { + return + } + } imports = append(imports, &it) }
--- a/pkg/imports/queue.go Wed Nov 28 08:11:23 2018 +0100 +++ b/pkg/imports/queue.go Wed Nov 28 09:52:34 2018 +0100 @@ -16,9 +16,11 @@ import ( "context" "database/sql" + "encoding/json" "fmt" "log" "runtime/debug" + "strings" "sync" "time" @@ -36,7 +38,7 @@ } Job interface { - Do(context.Context, int64, *sql.Conn, Feedback) error + Do(context.Context, int64, *sql.Conn, Feedback) (interface{}, error) CleanUp() error } @@ -114,12 +116,18 @@ WHERE state = 'queued'::waterway.import_state AND kind = ANY($1) ) -LIMIT 1 -` +LIMIT 1` + updateStateSQL = ` UPDATE waterway.imports SET state = $1::waterway.import_state -WHERE id = $2 -` +WHERE id = $2` + + updateStateSummarySQL = ` +UPDATE waterway.imports SET + state = $1::waterway.import_state, + summary = $2 +WHERE id = $3` + logMessageSQL = ` INSERT INTO waterway.import_logs ( import_id, @@ -273,10 +281,22 @@ return &ji, nil } -func updateState(id int64, state string) error { - ctx := context.Background() +func updateStateSummary( + ctx context.Context, + id int64, + state string, + summary interface{}, +) error { + var s sql.NullString + if summary != nil { + var b strings.Builder + if err := json.NewEncoder(&b).Encode(summary); err != nil { + return err + } + s = sql.NullString{String: b.String(), Valid: true} + } return auth.RunAs(ctx, queueUser, func(conn *sql.Conn) error { - _, err := conn.ExecContext(ctx, updateStateSQL, state, id) + _, err := conn.ExecContext(ctx, updateStateSummarySQL, state, s, id) return err }) } @@ -370,11 +390,15 @@ feedback.Info("import #%d started", idj.id) + ctx := context.Background() + var summary interface{} + errDo := survive(func() error { - ctx := context.Background() return auth.RunAs(ctx, idj.user, func(conn *sql.Conn) error { - return job.Do(ctx, idj.id, conn, feedback) + var err error + summary, err = job.Do(ctx, idj.id, conn, feedback) + return err }) })() if errDo != nil { @@ -391,7 +415,7 @@ } else { state = "pending" } - if err := updateState(idj.id, state); err != nil { + if err := updateStateSummary(ctx, idj.id, state, summary); err != nil { log.Printf("setting state of job %d failed: %v\n", idj.id, err) } log.Printf("import #%d finished: %s\n", idj.id, state)
--- a/pkg/imports/sr.go Wed Nov 28 08:11:23 2018 +0100 +++ b/pkg/imports/sr.go Wed Nov 28 09:52:34 2018 +0100 @@ -115,6 +115,8 @@ ) RETURNING id, + ST_X(ST_Centroid(point_cloud::geometry)), + ST_Y(ST_Centroid(point_cloud::geometry)), CASE WHEN ST_Y(ST_Centroid(point_cloud::geometry)) > 0 THEN 32600 ELSE @@ -169,58 +171,61 @@ importID int64, conn *sql.Conn, feedback Feedback, -) error { +) (interface{}, error) { z, err := zip.OpenReader(filepath.Join(sr.Dir, "sr.zip")) if err != nil { - return err + return nil, err } defer z.Close() feedback.Info("Looking for 'meta.json'") mf := common.FindInZIP(z, "meta.json") if mf == nil && !sr.completeOverride() { - return errors.New("Cannot find 'meta.json'") + return nil, errors.New("Cannot find 'meta.json'") } m, err := sr.loadMeta(mf) if err != nil { - return err + return nil, err } if err := m.Validate(ctx, conn); err != nil { - return common.ToError(err) + return nil, common.ToError(err) } feedback.Info("Looking for '*.xyz'") xyzf := common.FindInZIP(z, ".xyz") if xyzf == nil { - return errors.New("Cannot find any *.xyz file") + return nil, errors.New("Cannot find any *.xyz file") } xyz, err := loadXYZ(xyzf, feedback) if err != nil { - return err + return nil, err } if len(xyz) == 0 { - return errors.New("XYZ does not contain any vertices") + return nil, errors.New("XYZ does not contain any vertices") } // Is there a boundary shapefile in the ZIP archive? polygon, err := loadBoundary(z) if err != nil { - return err + return nil, err } tx, err := conn.BeginTx(ctx, nil) if err != nil { - return err + return nil, err } defer tx.Rollback() - var id int64 - var epsg uint32 + var ( + id int64 + epsg uint32 + lat, lon float64 + ) start := time.Now() err = tx.QueryRow(insertPointsSQL, @@ -230,11 +235,11 @@ xyz.AsWKB(), polygon.AsWBK(), m.EPSG, - ).Scan(&id, &epsg) + ).Scan(&id, &lat, &lon, &epsg) xyz, polygon = nil, nil // not need from now on. feedback.Info("storing points took %s", time.Since(start)) if err != nil { - return err + return nil, err } feedback.Info("Best suited UTM EPSG: %d", epsg) @@ -244,11 +249,11 @@ tin, err := octree.GenerateTinByID(ctx, conn, id, epsg) feedback.Info("triangulation took %s", time.Since(start)) if err != nil { - return err + return nil, err } if tin == nil { - return errors.New("cannot load TIN from database") + return nil, errors.New("cannot load TIN from database") } feedback.Info("Building octree...") @@ -259,7 +264,7 @@ tin = nil // not needed from now on feedback.Info("building octree took %s", time.Since(start)) if err != nil { - return err + return nil, err } feedback.Info("Store octree...") @@ -270,7 +275,7 @@ _, err = tx.Exec(insertOctreeSQL, id, checksum, octreeIndex) feedback.Info("storing octree index took %s", time.Since(start)) if err != nil { - return err + return nil, err } tree := builder.Tree() @@ -281,18 +286,30 @@ err = generateContours(tree, tx, id) feedback.Info("generating and storing contour lines took %s", time.Since(start)) if err != nil { - return err + return nil, err } // Store for potential later removal. if err = track(ctx, tx, importID, "waterway.sounding_results", id); err != nil { - return err + return nil, err } if err = tx.Commit(); err == nil { feedback.Info("Storing sounding result was successful.") } - return err + + summary := struct { + Bottleneck string `json:"bottleneck"` + Date models.SoundingResultDate `json:"date"` + Lat float64 `json:"lat"` + Lon float64 `json:"lon"` + }{ + Bottleneck: m.Bottleneck, + Date: m.Date, + Lat: lat, + Lon: lon, + } + return &summary, err } func (sr *SoundingResult) CleanUp() error {
--- a/pkg/models/import.go Wed Nov 28 08:11:23 2018 +0100 +++ b/pkg/models/import.go Wed Nov 28 09:52:34 2018 +0100 @@ -22,12 +22,13 @@ ImportTime struct{ time.Time } Import struct { - ID int64 `json:"id"` - State string `json:"state"` - Enqueued ImportTime `json:"enqueued"` - Kind string `json:"kind"` - User string `json:"user"` - Signer string `json:"signer,omitempty"` + ID int64 `json:"id"` + State string `json:"state"` + Enqueued ImportTime `json:"enqueued"` + Kind string `json:"kind"` + User string `json:"user"` + Signer string `json:"signer,omitempty"` + Summary interface{} `json:"summary,omitempty"` } ImportLogEntry struct {
--- a/schema/gemma.sql Wed Nov 28 08:11:23 2018 +0100 +++ b/schema/gemma.sql Wed Nov 28 09:52:34 2018 +0100 @@ -547,7 +547,8 @@ REFERENCES internal.user_profiles(username) ON DELETE SET NULL ON UPDATE CASCADE, - data TEXT + data TEXT, + summary TEXT ); CREATE INDEX enqueued_idx ON waterway.imports(enqueued, state);