Mercurial > gemma
view pkg/controllers/diff.go @ 4940:b3b2ba09a450 fairway-marks-import
Add missing fairway mark types
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Mon, 17 Feb 2020 18:38:45 +0100 |
parents | 4847ac70103a |
children | 34bc6041e61e |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018, 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package controllers import ( "context" "database/sql" "errors" "fmt" "log" "net/http" "runtime" "time" "gemma.intevation.de/gemma/pkg/auth" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/mesh" "gemma.intevation.de/gemma/pkg/models" mw "gemma.intevation.de/gemma/pkg/middleware" ) const ( // maxNumberWorkers are processing diffs at a time. maxNumberWorkers = 2 // maxQueueLength are the number of diffs to // be wait for when idle. maxQueueLength = 5 ) const ( contourTolerance = 0.1 contourStep = 0.1 ) const ( // isoCellSize is the side length of a raster cell when tracing // iso areas. isoCellSize = 0.5 ) const ( diffIDSQL = ` SELECT sd.id FROM caching.sounding_differences sd JOIN waterway.sounding_results srm ON sd.minuend = srm.id JOIN waterway.sounding_results srs ON sd.subtrahend = srs.id WHERE srm.bottleneck_id = srs.bottleneck_id AND srm.bottleneck_id = $1 AND srm.date_info = $2::date AND srs.date_info = $3::date ` insertDiffSQL = ` WITH soundings AS ( SELECT sr.id AS id, sr.date_info AS date_info FROM waterway.sounding_results sr WHERE sr.bottleneck_id = $1 ) INSERT INTO caching.sounding_differences (minuend, subtrahend) SELECT m.id, s.id FROM soundings m, soundings s WHERE m.date_info = $2::date AND s.date_info = $3::date RETURNING id ` insertDiffIsoAreasQL = ` INSERT INTO caching.sounding_differences_iso_areas ( sounding_differences_id, height, areas ) SELECT $1, $2, ST_Transform( ST_Multi( ST_Collectionextract( ST_SimplifyPreserveTopology(ST_GeomFromWKB($4, $3::integer), $5), 3 ) ), 4326 ) ` commonDiffBBoxSQL = ` WITH joined AS ( SELECT sr.area AS area, sr.date_info AS date_info FROM waterway.sounding_results sr WHERE sr.bottleneck_id = $1 ), bbox AS ( SELECT ST_Extent(ST_intersection( (SELECT ST_Transform(area::geometry, $2::int) FROM joined WHERE date_info = $3::date), (SELECT ST_Transform(area::geometry, $2::int) FROM joined WHERE date_info = $4::date) )) AS area ) SELECT ST_XMin(area), ST_YMin(area), ST_XMax(area), ST_YMax(area) FROM bbox ` ) type ( diffResult struct { id int64 err error } diffCalculationManager struct { waiting map[string][]chan diffResult cmds chan func(*diffCalculationManager) jobs chan *models.DiffCalculationInput } ) var errBufferFull = errors.New("buffer full") var diffCalcMng = startDiffCalculationManager() func diffCalcHash(dci *models.DiffCalculationInput) string { return dci.Bottleneck + "/" + dci.Minuend.Format(common.DateFormat) + "/" + dci.Subtrahend.Format(common.DateFormat) } func startDiffCalculationManager() *diffCalculationManager { dcm := diffCalculationManager{ waiting: make(map[string][]chan diffResult), cmds: make(chan func(*diffCalculationManager)), jobs: make(chan *models.DiffCalculationInput, maxQueueLength), } go dcm.run() return &dcm } func (dcm *diffCalculationManager) run() { n := runtime.NumCPU() if n > maxNumberWorkers { n = maxNumberWorkers } for i := 0; i < n; i++ { go dcm.calculate() } for cmd := range dcm.cmds { cmd(dcm) } } func doDiff(ctx context.Context, conn *sql.Conn, dci *models.DiffCalculationInput) (int64, error) { start := time.Now() begin := start minuendTree, err := mesh.FromCache( ctx, conn, dci.Bottleneck, dci.Minuend.Time) log.Printf("info: loading minuend mesh took %s\n", time.Since(start)) if err != nil { return 0, err } if minuendTree == nil { return 0, fmt.Errorf("cannot find survey for %s/%s", dci.Bottleneck, dci.Minuend.Format(common.DateFormat)) } epsg := minuendTree.EPSG() var box mesh.Box2D switch err := conn.QueryRowContext( ctx, commonDiffBBoxSQL, dci.Bottleneck, epsg, dci.Minuend.Time, dci.Subtrahend.Time, ).Scan(&box.X1, &box.Y1, &box.X2, &box.Y2); { case err == sql.ErrNoRows: return 0, errors.New("no such intersection") case err != nil: return 0, err } if box.Empty() { return 0, errors.New("intersection is empty") } log.Printf("info: bbox of intersection: (%.2f, %.2f) - (%.2f, %.2f)\n", box.X1, box.Y1, box.X2, box.Y2) start = time.Now() raster := mesh.NewRaster(box, isoCellSize) raster.Rasterize(minuendTree.Value) log.Printf("info: rasterizing minuend took %v\n", time.Since(start)) minuendTree = nil start = time.Now() subtrahendTree, err := mesh.FromCache( ctx, conn, dci.Bottleneck, dci.Subtrahend.Time) log.Printf("info: loading subtrahend mesh took %s\n", time.Since(start)) if err != nil { return 0, err } if subtrahendTree == nil { return 0, fmt.Errorf("cannot find survey for %s/%s", dci.Bottleneck, dci.Subtrahend.Format(common.DateFormat)) } // We need a slow path implementation for this. if epsg != subtrahendTree.EPSG() { return 0, errors.New("calculating differences between two different " + "EPSG code meshes are not supported, yet") } start = time.Now() raster.Diff(subtrahendTree.Value) log.Printf("info: A - B took %v\n", time.Since(start)) subtrahendTree = nil // XXX: Maybe we should start this transaction earlier!? var tx *sql.Tx if tx, err = conn.BeginTx(ctx, nil); err != nil { return 0, err } defer tx.Rollback() zMin, zMax, ok := raster.ZExtent() if !ok { return 0, errors.New("scans do not have common points") } log.Printf("info: z range: %.3f - %.3f\n", zMin, zMax) var heights []float64 heights, err = mesh.LoadClassBreaks( ctx, tx, "morphology_classbreaks_compare") if err != nil { log.Printf("warn: Loading class breaks failed: %v\n", err) err = nil heights = mesh.SampleDiffHeights(zMin, zMax, contourStep) } else { heights = mesh.ExtrapolateClassBreaks(heights, zMin, zMax) } heights = common.DedupFloat64s(heights) log.Printf("info: num heights: %d\n", len(heights)) var isoStmt *sql.Stmt if isoStmt, err = tx.PrepareContext(ctx, insertDiffIsoAreasQL); err != nil { return 0, err } defer isoStmt.Close() var id int64 if err = tx.QueryRowContext( ctx, insertDiffSQL, dci.Bottleneck, dci.Minuend.Time, dci.Subtrahend.Time, ).Scan(&id); err != nil { return 0, err } areas := raster.Trace(heights) raster = nil var size int for i, a := range areas { if len(a) == 0 { continue } wkb := a.AsWKB() size += len(wkb) if _, err = isoStmt.ExecContext( ctx, id, heights[i], epsg, wkb, contourTolerance, ); err != nil { return 0, err } } log.Printf("info: Transferred WKB size: %.2fMB.\n", float64(size)/(1024*1024)) log.Printf("info: calculating and storing iso areas took %v\n", time.Since(start)) if err = tx.Commit(); err != nil { log.Printf("info: difference calculation failed after %v\n", time.Since(begin)) return 0, err } log.Printf("info: difference calculation succeed after %v\n", time.Since(begin)) return id, nil } func (dcm *diffCalculationManager) calculate() { for dci := range dcm.jobs { ctx := context.Background() var id int64 var err error err = auth.RunAs(ctx, "sys_admin", func(conn *sql.Conn) error { id, err = doDiff(ctx, conn, dci) return err }) dcm.cmds <- func(dcm *diffCalculationManager) { msg := diffResult{id: id, err: err} key := diffCalcHash(dci) list := dcm.waiting[key] delete(dcm.waiting, key) for _, ch := range list { ch <- msg } } } } func (dcm *diffCalculationManager) enque(dci *models.DiffCalculationInput) chan diffResult { // buffer to prevent blocking even if receiver is dead. ch := make(chan diffResult, 1) dcm.cmds <- func(dcm *diffCalculationManager) { // When already running add to list of waitings key := diffCalcHash(dci) if list := dcm.waiting[key]; list != nil { dcm.waiting[key] = append(list, ch) return } // look if it is in database var id int64 ctx := context.Background() err := auth.RunAs(ctx, "sys_admin", func(conn *sql.Conn) error { return conn.QueryRowContext( ctx, diffIDSQL, dci.Bottleneck, dci.Minuend.Time, dci.Subtrahend.Time, ).Scan(&id) }) switch { case err == sql.ErrNoRows: // We need to calculate it. case err != nil: // error ch <- diffResult{err: err} return default: // we have it ch <- diffResult{id: id} return } // add to waiting dcm.waiting[key] = []chan diffResult{ch} // feed to workers select { case dcm.jobs <- dci: default: // buffer is full delete(dcm.waiting, key) ch <- diffResult{err: errBufferFull} } } return ch } func (dcm *diffCalculationManager) unregister(dci *models.DiffCalculationInput, ch chan diffResult) { dcm.cmds <- func(dcm *diffCalculationManager) { key := diffCalcHash(dci) list := dcm.waiting[key] for i, x := range list { if x == ch { if i != len(list)-1 { copy(list[i:], list[i+1:]) } list[len(list)-1] = nil list = list[:len(list)-1] if len(list) == 0 { delete(dcm.waiting, key) } else { dcm.waiting[key] = list } return } } } } func diffCalculation(req *http.Request) (jr mw.JSONResult, err error) { dci := mw.JSONInput(req).(*models.DiffCalculationInput) ctx := req.Context() resultCh := diffCalcMng.enque(dci) defer diffCalcMng.unregister(dci, resultCh) select { case res := <-resultCh: switch { case res.err == errBufferFull: err = mw.JSONError{ Code: http.StatusTooManyRequests, Message: "Too many differences are waiting to be processed", } case res.err != nil: err = res.err default: jr = mw.JSONResult{ Result: map[string]int64{"id": res.id}, } } case <-ctx.Done(): log.Println("request canceled") } return }