Mercurial > gemma
view pkg/controllers/diff.go @ 4686:1d6f28e45696
morphology: Doing ST_SimplifyPreserveTopology before ST_MakeValid when storing geometries to database speeds things up ... a lot.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 16 Oct 2019 11:51:53 +0200 |
parents | 7a9388943840 |
children | c91e759007da |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018, 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package controllers import ( "context" "database/sql" "errors" "fmt" "log" "net/http" "runtime" "sync" "time" "gemma.intevation.de/gemma/pkg/auth" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/octree" mw "gemma.intevation.de/gemma/pkg/middleware" ) const ( // maxNumberWorkers are processing diffs at a time. maxNumberWorkers = 2 // maxQueueLength are the number of diffs to // be wait for when idle. maxQueueLength = 5 ) const ( contourTolerance = 0.1 contourStep = 0.1 ) const ( // isoCellSize is the side length of a raster cell when tracing // iso areas. isoCellSize = 0.5 ) const ( diffIDSQL = ` SELECT sd.id FROM caching.sounding_differences sd JOIN waterway.sounding_results srm ON sd.minuend = srm.id JOIN waterway.sounding_results srs ON sd.subtrahend = srs.id WHERE srm.bottleneck_id = srs.bottleneck_id AND srm.bottleneck_id = $1 AND srm.date_info = $2::date AND srs.date_info = $3::date ` insertDiffSQL = ` WITH soundings AS ( SELECT sr.id AS id, sr.date_info AS date_info FROM waterway.sounding_results sr WHERE sr.bottleneck_id = $1 ) INSERT INTO caching.sounding_differences (minuend, subtrahend) SELECT m.id, s.id FROM soundings m, soundings s WHERE m.date_info = $2::date AND s.date_info = $3::date RETURNING id ` insertDiffIsoAreasQL = ` INSERT INTO caching.sounding_differences_iso_areas ( sounding_differences_id, height, areas ) SELECT $1, $2, ST_Transform( ST_Multi( ST_CollectionExtract( ST_MakeValid( ST_Multi( ST_Collectionextract( ST_SimplifyPreserveTopology(ST_GeomFromWKB($4, $3::integer), $5), 3 ) ) ), 3 ) ), 4326 ) ` ) type ( diffResult struct { id int64 err error } diffCalculationManager struct { waiting map[string][]chan diffResult cmds chan func(*diffCalculationManager) jobs chan *models.DiffCalculationInput } ) var errBufferFull = errors.New("buffer full") var diffCalcMng = startDiffCalculationManager() func diffCalcHash(dci *models.DiffCalculationInput) string { return dci.Bottleneck + "/" + dci.Minuend.Format(common.DateFormat) + "/" + dci.Subtrahend.Format(common.DateFormat) } func startDiffCalculationManager() *diffCalculationManager { dcm := diffCalculationManager{ waiting: make(map[string][]chan diffResult), cmds: make(chan func(*diffCalculationManager)), jobs: make(chan *models.DiffCalculationInput, maxQueueLength), } go dcm.run() return &dcm } func (dcm *diffCalculationManager) run() { n := runtime.NumCPU() if n > maxNumberWorkers { n = maxNumberWorkers } for i := 0; i < n; i++ { go dcm.calculate() } for cmd := range dcm.cmds { cmd(dcm) } } func doDiff(ctx context.Context, conn *sql.Conn, dci *models.DiffCalculationInput) (int64, error) { start := time.Now() begin := start minuendTree, err := octree.FromCache( ctx, conn, dci.Bottleneck, dci.Minuend.Time) log.Printf("info: loading minuend mesh took %s\n", time.Since(start)) if err != nil { return 0, err } if minuendTree == nil { return 0, fmt.Errorf("Cannot find survey for %s/%s.", dci.Bottleneck, dci.Minuend.Format(common.DateFormat)) } start = time.Now() subtrahendTree, err := octree.FromCache( ctx, conn, dci.Bottleneck, dci.Subtrahend.Time) log.Printf("info: loading subtrahend mesh took %s\n", time.Since(start)) if err != nil { return 0, err } if subtrahendTree == nil { return 0, fmt.Errorf("Cannot find survey for %s/%s.", dci.Bottleneck, dci.Subtrahend.Format(common.DateFormat)) } // We need a slow path implementation for this. epsg := minuendTree.EPSG() if epsg != subtrahendTree.EPSG() { return 0, errors.New("Calculating differences between two different " + "EPSG code meshes are not supported, yet.") } start = time.Now() points := minuendTree.Diff(subtrahendTree) log.Printf("info: A - B took %v\n", time.Since(start)) minuendTree, subtrahendTree = nil, nil // The Triangulation and the loading of the clipping // polygon can be done concurrently. jobs := make(chan func()) wg := new(sync.WaitGroup) for i := 0; i < 2; i++ { wg.Add(1) go func() { defer wg.Done() for job := range jobs { job() } }() } var ( tri *octree.Triangulation triErr error clip *octree.Polygon clipErr error ) jobs <- func() { start := time.Now() tri, triErr = points.Triangulate() log.Printf("info: triangulation took %v\n", time.Since(start)) } jobs <- func() { start := time.Now() clip, clipErr = octree.LoadClippingPolygon( ctx, conn, epsg, dci.Bottleneck, dci.Minuend.Time, dci.Subtrahend.Time) log.Printf("info: loading clipping polygon took %v\n", time.Since(start)) } close(jobs) wg.Wait() switch { case triErr != nil && clipErr != nil: return 0, fmt.Errorf("%v %v", triErr, clipErr) case triErr != nil: return 0, triErr case clipErr != nil: return 0, clipErr } start = time.Now() tin := tri.Tin() removed := tin.Clip(clip) clip = nil log.Printf("info: clipping TIN took %v\n", time.Since(start)) log.Printf("info: Number of triangles to clip: %d\n", len(removed)) start = time.Now() var tree octree.STRTree tree.BuildWithout(tin, removed) log.Printf("info: Building final mesh took: %v\n", time.Since(start)) start = time.Now() // XXX: Maybe we should start this transaction earlier!? var tx *sql.Tx if tx, err = conn.BeginTx(ctx, nil); err != nil { return 0, err } defer tx.Rollback() var heights []float64 heights, err = octree.LoadClassBreaks( ctx, tx, "morphology_classbreaks_compare") if err != nil { log.Printf("warn: Loading class breaks failed: %v\n", err) err = nil heights = octree.SampleDiffHeights(tin.Min.Z, tin.Max.Z, contourStep) } else { heights = octree.ExtrapolateClassBreaks(heights, tin.Min.Z, tin.Max.Z) // heights = octree.InBetweenClassBreaks(heights, 0.05, 2) } log.Printf("info: z range: %.3f - %.3f\n", tin.Min.Z, tin.Max.Z) log.Printf("info: num heights: %d\n", len(heights)) var isoStmt *sql.Stmt if isoStmt, err = tx.PrepareContext(ctx, insertDiffIsoAreasQL); err != nil { return 0, err } defer isoStmt.Close() var id int64 if err = tx.QueryRowContext( ctx, insertDiffSQL, dci.Bottleneck, dci.Minuend.Time, dci.Subtrahend.Time, ).Scan(&id); err != nil { return 0, err } heights = common.DedupFloat64s(heights) areas := octree.TraceAreas(heights, isoCellSize, tin.Min, tin.Max, tree.Value) var size int for i, a := range areas { if len(a) == 0 { continue } wkb := a.AsWKB() size += len(wkb) if _, err = isoStmt.ExecContext( ctx, id, heights[i], epsg, wkb, contourTolerance, ); err != nil { return 0, err } } log.Printf("info: Transferred WKB size: %.2fMB.\n", float64(size)/(1024*1024)) log.Printf("info: calculating and storing iso lines took %v\n", time.Since(start)) if err = tx.Commit(); err != nil { log.Printf("info: difference calculation failed after %v\n", time.Since(begin)) return 0, err } log.Printf("info: difference calculation succeed after %v\n", time.Since(begin)) return id, nil } func (dcm *diffCalculationManager) calculate() { for dci := range dcm.jobs { ctx := context.Background() var id int64 var err error err = auth.RunAs(ctx, "sys_admin", func(conn *sql.Conn) error { id, err = doDiff(ctx, conn, dci) return err }) dcm.cmds <- func(dcm *diffCalculationManager) { msg := diffResult{id: id, err: err} key := diffCalcHash(dci) list := dcm.waiting[key] delete(dcm.waiting, key) for _, ch := range list { ch <- msg } } } } func (dcm *diffCalculationManager) enque(dci *models.DiffCalculationInput) chan diffResult { // buffer to prevent blocking even if receiver is dead. ch := make(chan diffResult, 1) dcm.cmds <- func(dcm *diffCalculationManager) { // When already running add to list of waitings key := diffCalcHash(dci) if list := dcm.waiting[key]; list != nil { dcm.waiting[key] = append(list, ch) return } // look if it is in database var id int64 ctx := context.Background() err := auth.RunAs(ctx, "sys_admin", func(conn *sql.Conn) error { return conn.QueryRowContext( ctx, diffIDSQL, dci.Bottleneck, dci.Minuend.Time, dci.Subtrahend.Time, ).Scan(&id) }) switch { case err == sql.ErrNoRows: // We need to calculate it. case err != nil: // error ch <- diffResult{err: err} return default: // we have it ch <- diffResult{id: id} return } // add to waiting dcm.waiting[key] = []chan diffResult{ch} // feed to workers select { case dcm.jobs <- dci: default: // buffer is full delete(dcm.waiting, key) ch <- diffResult{err: errBufferFull} } } return ch } func (dcm *diffCalculationManager) unregister(dci *models.DiffCalculationInput, ch chan diffResult) { dcm.cmds <- func(dcm *diffCalculationManager) { key := diffCalcHash(dci) list := dcm.waiting[key] for i, x := range list { if x == ch { if i != len(list)-1 { copy(list[i:], list[i+1:]) } list[len(list)-1] = nil list = list[:len(list)-1] if len(list) == 0 { delete(dcm.waiting, key) } else { dcm.waiting[key] = list } return } } } } func diffCalculation(req *http.Request) (jr mw.JSONResult, err error) { dci := mw.JSONInput(req).(*models.DiffCalculationInput) ctx := req.Context() resultCh := diffCalcMng.enque(dci) defer diffCalcMng.unregister(dci, resultCh) select { case res := <-resultCh: switch { case res.err == errBufferFull: err = mw.JSONError{ Code: http.StatusTooManyRequests, Message: "Too many differences are waiting to be processed", } case res.err != nil: err = res.err default: jr = mw.JSONResult{ Result: map[string]int64{"id": res.id}, } } case <-ctx.Done(): log.Println("request canceled") } return }