# HG changeset patch # User Fadi Abbud # Date 1565961313 -7200 # Node ID 77b28832bb642261d52959e0760fa6e395f8f4f1 # Parent 36ee7e6211336b1d7748a6903ab5433437cc55c4# Parent cbc75527916f9c6877d817ee5ecd71045f1aec2b merge default into stylesconfig diff -r 36ee7e621133 -r 77b28832bb64 pkg/controllers/system.go --- a/pkg/controllers/system.go Fri Aug 16 10:29:17 2019 +0200 +++ b/pkg/controllers/system.go Fri Aug 16 15:15:13 2019 +0200 @@ -23,12 +23,15 @@ "net/http" "strings" "sync" + "time" "github.com/gorilla/mux" "gemma.intevation.de/gemma/pkg/auth" + "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/config" "gemma.intevation.de/gemma/pkg/geoserver" + "gemma.intevation.de/gemma/pkg/imports" "gemma.intevation.de/gemma/pkg/models" ) @@ -152,7 +155,7 @@ return } -type reconfFunc func(sql.NullString, string) (func(), error) +type reconfFunc func(sql.NullString, string) (func(*http.Request), error) var ( reconfigureFuncsMu sync.Mutex @@ -171,7 +174,11 @@ return reconfigureFuncs[key] } -func reconfigureClassBreaks(old sql.NullString, curr, which string, recalc func()) (func(), error) { +func reconfigureClassBreaks( + old sql.NullString, curr, + which string, + recalc func(*http.Request), +) (func(*http.Request), error) { // If new values are broken, don't proceed. currCVs, err := models.ParseColorValues(curr) @@ -179,10 +186,10 @@ return nil, err } - doBoth := func() { + doBoth := func(req *http.Request) { log.Printf("info: Trigger re-calculation of %s.", which) geoserver.ReconfigureStyle(which) - recalc() + recalc(req) } if !old.Valid { @@ -213,7 +220,7 @@ // Only the color changed -> no expensive recalc needed. if colorChanged { log.Println("info: Only colors changed.") - return func() { geoserver.ReconfigureStyle(which) }, nil + return func(*http.Request) { geoserver.ReconfigureStyle(which) }, nil } return nil, nil @@ -221,24 +228,52 @@ func init() { registerReconfigureFunc("morphology_classbreaks", - func(old sql.NullString, curr string) (func(), error) { + func(old sql.NullString, curr string) (func(*http.Request), error) { return reconfigureClassBreaks( old, curr, "sounding_results_contour_lines_geoserver", - func() { - log.Println( - "todo: Trigger expensive recalculation of sounding result contours.") + func(req *http.Request) { + if s, ok := auth.GetSession(req); ok { + triggerSoundingResultsContoursRecalc(s.User, curr) + } }) }) registerReconfigureFunc("morphology_classbreaks_compare", - func(old sql.NullString, curr string) (func(), error) { + func(old sql.NullString, curr string) (func(*http.Request), error) { return reconfigureClassBreaks( old, curr, "sounding_differences", - func() { go deleteSoundingDiffs() }) + func(*http.Request) { go deleteSoundingDiffs() }) }) } +func triggerSoundingResultsContoursRecalc(who, breaks string) { + var serialized string + job := &imports.IsoRefresh{ClassBreaks: breaks} + serialized, err := common.ToJSONString(job) + if err != nil { + log.Printf("error: %v\n", err) + return + } + var jobID int64 + if jobID, err = imports.AddJob( + imports.ISRJobKind, + time.Time{}, + nil, + nil, + who, + false, + serialized, + ); err != nil { + log.Printf("error: %v\n", err) + return + } + log.Printf( + "info: Recalculate sounding results contours in job %d.\n", + jobID) + +} + func deleteSoundingDiffs() { // TODO: Better do that in import queue? ctx := context.Background() @@ -279,7 +314,7 @@ } defer getStmt.Close() - reconfigure := map[string]func(){} + reconfigure := map[string]func(*http.Request){} for key, value := range *settings { var old sql.NullString @@ -292,7 +327,7 @@ } if cmp := reconfigureFunc(key); cmp != nil { - var fn func() + var fn func(*http.Request) if fn, err = cmp(old, value); err != nil { return } @@ -311,7 +346,7 @@ } for _, fn := range reconfigure { - fn() + fn(req) } jr = JSONResult{ diff -r 36ee7e621133 -r 77b28832bb64 pkg/imports/isr.go --- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pkg/imports/isr.go Fri Aug 16 15:15:13 2019 +0200 @@ -0,0 +1,199 @@ +// This is Free Software under GNU Affero General Public License v >= 3.0 +// without warranty, see README.md and license for details. +// +// SPDX-License-Identifier: AGPL-3.0-or-later +// License-Filename: LICENSES/AGPL-3.0.txt +// +// Copyright (C) 2018 by via donau +// – Österreichische Wasserstraßen-Gesellschaft mbH +// Software engineering by Intevation GmbH +// +// Author(s): +// * Sascha L. Teichmann + +package imports + +import ( + "context" + "database/sql" + "time" + + "gemma.intevation.de/gemma/pkg/octree" +) + +type IsoRefresh struct { + ClassBreaks string `json:"class-breaks"` +} + +// ISRJobKind is the unique name of this import job type. +const ISRJobKind JobKind = "isr" + +func init() { RegisterJobCreator(ISRJobKind, isrJobCreator{}) } + +type isrJobCreator struct{} + +func (isrJobCreator) Description() string { return "refresh iso lines" } + +func (isrJobCreator) AutoAccept() bool { return true } + +func (isrJobCreator) Create() Job { return new(IsoRefresh) } + +func (isrJobCreator) StageDone(context.Context, *sql.Tx, int64) error { + return nil +} + +func (isrJobCreator) Depends() [2][]string { + return [2][]string{ + {"sounding_results", "sounding_results_contour_lines"}, + {}, + } +} + +const ( + fetchSoundingResultsIDsSQL = ` +SELECT bottleneck_id, id +FROM waterway.sounding_results +ORDER BY bottleneck_id +` + deleteContourLinesSQL = ` +DELETE FROM waterway.sounding_results_contour_lines +WHERE sounding_result_id = $1 +` +) + +func (isr *IsoRefresh) CleanUp() error { return nil } + +type bottleneckSoundingResults struct { + bn string + srs []int64 +} + +func fetchBottleneckResults( + ctx context.Context, + conn *sql.Conn, +) ([]bottleneckSoundingResults, error) { + + rows, err := conn.QueryContext(ctx, fetchSoundingResultsIDsSQL) + if err != nil { + return nil, err + } + defer rows.Close() + + var ids []bottleneckSoundingResults + + for rows.Next() { + var bn string + var sr int64 + if err := rows.Scan(&bn, &sr); err != nil { + return nil, err + } + if len(ids) > 0 { + if ids[len(ids)-1].bn != bn { + ids = append(ids, bottleneckSoundingResults{ + bn: bn, + srs: []int64{sr}, + }) + } else { + ids[len(ids)-1].srs = append(ids[len(ids)-1].srs, sr) + } + } else { + ids = []bottleneckSoundingResults{ + {bn: bn, srs: []int64{sr}}, + } + } + } + if err := rows.Err(); err != nil { + return nil, err + } + return ids, nil +} + +// Do executes the actual refreshing of the iso lines. +func (isr *IsoRefresh) Do( + ctx context.Context, + importID int64, + conn *sql.Conn, + feedback Feedback, +) (interface{}, error) { + + start := time.Now() + defer func() { + feedback.Info( + "Processing all sounding results took %v.", + time.Since(start)) + }() + + heights, err := octree.ParseClassBreaks(isr.ClassBreaks) + if err != nil { + return nil, err + } + + bns, err := fetchBottleneckResults(ctx, conn) + if err != nil { + return nil, err + } + + for i := range bns { + start := time.Now() + feedback.Info("Processing bottleneck '%s' ...", bns[i].bn) + err := isr.processBottleneck( + ctx, conn, + heights, + &bns[i], + ) + feedback.Info("Processing took %v.", time.Since(start)) + if err != nil { + return nil, err + } + } + + return nil, nil +} + +func (isr *IsoRefresh) processBottleneck( + ctx context.Context, + conn *sql.Conn, + heights []float64, + bn *bottleneckSoundingResults, +) error { + // Do one transaction per bottleneck. + tx, err := conn.BeginTx(ctx, nil) + if err != nil { + return nil + } + defer tx.Rollback() + + insertStmt, err := tx.Prepare(insertContourSQL) + if err != nil { + return err + } + + // For all sounding results in bottleneck. + for _, sr := range bn.srs { + tree, err := octree.FetchOctreeDirectly(ctx, tx, sr) + if err != nil { + return err + } + hs := octree.ExtrapolateClassBreaks(heights, tree.Min.Z, tree.Max.Z) + + // Delete the old contour lines. + if _, err := tx.ExecContext(ctx, deleteContourLinesSQL, sr); err != nil { + return err + } + + octree.DoContours(tree, hs, func(res *octree.ContourResult) { + if err == nil && len(res.Lines) > 0 { + _, err = insertStmt.ExecContext( + ctx, + sr, res.Height, tree.EPSG, + res.Lines.AsWKB2D(), + contourTolerance) + } + }) + if err != nil { + return err + } + } + + return nil +} diff -r 36ee7e621133 -r 77b28832bb64 pkg/imports/sr.go --- a/pkg/imports/sr.go Fri Aug 16 10:29:17 2019 +0200 +++ b/pkg/imports/sr.go Fri Aug 16 15:15:13 2019 +0200 @@ -78,9 +78,7 @@ type srJobCreator struct{} -func init() { - RegisterJobCreator(SRJobKind, srJobCreator{}) -} +func init() { RegisterJobCreator(SRJobKind, srJobCreator{}) } func (srJobCreator) Description() string { return "sounding results" } diff -r 36ee7e621133 -r 77b28832bb64 pkg/octree/cache.go --- a/pkg/octree/cache.go Fri Aug 16 10:29:17 2019 +0200 +++ b/pkg/octree/cache.go Fri Aug 16 15:15:13 2019 +0200 @@ -47,6 +47,10 @@ ) const ( + directFetchOctreeSQL = ` +SELECT octree_index FROM waterway.sounding_results +WHERE id = $1 +` fetchOctreeSQL = ` SELECT octree_checksum, octree_index FROM waterway.sounding_results @@ -99,6 +103,20 @@ return cache.get(ctx, conn, bottleneck, date) } +// FetchOctreeDirectly loads an octree directly from the database. +func FetchOctreeDirectly( + ctx context.Context, + tx *sql.Tx, + id int64, +) (*Tree, error) { + var data []byte + err := tx.QueryRowContext(ctx, directFetchOctreeSQL, id).Scan(&data) + if err != nil { + return nil, err + } + return Deserialize(data) +} + func (c *Cache) get( ctx context.Context, conn *sql.Conn, diff -r 36ee7e621133 -r 77b28832bb64 pkg/octree/classbreaks.go --- a/pkg/octree/classbreaks.go Fri Aug 16 10:29:17 2019 +0200 +++ b/pkg/octree/classbreaks.go Fri Aug 16 15:15:13 2019 +0200 @@ -58,22 +58,9 @@ return heights } -func LoadClassBreaks(ctx context.Context, tx *sql.Tx, key string) ([]float64, error) { - - var config sql.NullString - - err := tx.QueryRowContext(ctx, selectClassBreaksSQL, key).Scan(&config) +func ParseClassBreaks(config string) ([]float64, error) { - switch { - case err == sql.ErrNoRows: - return nil, nil - case err != nil: - return nil, err - case !config.Valid: - return nil, errors.New("Invalid config string") - } - - parts := strings.Split(config.String, ",") + parts := strings.Split(config, ",") classes := make([]float64, 0, len(parts)) for _, part := range parts { if idx := strings.IndexRune(part, ':'); idx >= 0 { @@ -93,6 +80,24 @@ return classes, nil } +func LoadClassBreaks(ctx context.Context, tx *sql.Tx, key string) ([]float64, error) { + + var config sql.NullString + + err := tx.QueryRowContext(ctx, selectClassBreaksSQL, key).Scan(&config) + + switch { + case err == sql.ErrNoRows: + return nil, nil + case err != nil: + return nil, err + case !config.Valid: + return nil, errors.New("Invalid config string") + } + + return ParseClassBreaks(config.String) +} + func ExtrapolateClassBreaks(cbs []float64, min, max float64) []float64 { if min > max { min, max = max, min