Mercurial > gemma
view pkg/controllers/diff.go @ 5560:f2204f91d286
Join the log lines of imports to the log exports to recover data from them.
Used in SR export to extract information that where in the meta json
but now are only found in the log.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 09 Feb 2022 18:34:40 +0100 |
parents | 5f47eeea988d |
children |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018, 2019 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package controllers import ( "context" "database/sql" "errors" "fmt" "net/http" "runtime" "time" "gemma.intevation.de/gemma/pkg/auth" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/log" "gemma.intevation.de/gemma/pkg/mesh" "gemma.intevation.de/gemma/pkg/models" mw "gemma.intevation.de/gemma/pkg/middleware" ) const ( // maxNumberWorkers are processing diffs at a time. maxNumberWorkers = 2 // maxQueueLength are the number of diffs to // be wait for when idle. maxQueueLength = 5 ) const ( contourTolerance = 0.1 contourStep = 0.1 ) const ( // isoCellSize is the side length of a raster cell when tracing // iso areas. isoCellSize = 0.5 ) const ( diffIDSQL = ` SELECT sd.id FROM caching.sounding_differences sd JOIN waterway.sounding_results srm ON sd.minuend = srm.id JOIN waterway.sounding_results srs ON sd.subtrahend = srs.id WHERE srm.bottleneck_id = srs.bottleneck_id AND srm.bottleneck_id = $1 AND srm.date_info = $2::date AND srs.date_info = $3::date ` insertDiffSQL = ` WITH soundings AS ( SELECT sr.id AS id, sr.date_info AS date_info FROM waterway.sounding_results sr WHERE sr.bottleneck_id = $1 ) INSERT INTO caching.sounding_differences (minuend, subtrahend) SELECT m.id, s.id FROM soundings m, soundings s WHERE m.date_info = $2::date AND s.date_info = $3::date RETURNING id ` insertDiffIsoAreasQL = ` INSERT INTO caching.sounding_differences_iso_areas ( sounding_differences_id, height, areas ) SELECT $1, $2, ST_Transform( ST_Multi( ST_Collectionextract( ST_SimplifyPreserveTopology(ST_GeomFromWKB($4, $3::integer), $5), 3 ) ), 4326 ) ` commonDiffBBoxSQL = ` WITH joined AS ( SELECT sr.area AS area, sr.date_info AS date_info FROM waterway.sounding_results sr WHERE sr.bottleneck_id = $1 ), bbox AS ( SELECT ST_Extent(ST_intersection( (SELECT ST_Transform(area::geometry, $2::int) FROM joined WHERE date_info = $3::date), (SELECT ST_Transform(area::geometry, $2::int) FROM joined WHERE date_info = $4::date) )) AS area ) SELECT ST_XMin(area), ST_YMin(area), ST_XMax(area), ST_YMax(area) FROM bbox ` ) type ( diffResult struct { id int64 err error } diffCalculationManager struct { waiting map[string][]chan diffResult cmds chan func(*diffCalculationManager) jobs chan *models.DiffCalculationInput } ) var errBufferFull = errors.New("buffer full") var diffCalcMng = startDiffCalculationManager() func diffCalcHash(dci *models.DiffCalculationInput) string { return dci.Bottleneck + "/" + dci.Minuend.Format(common.DateFormat) + "/" + dci.Subtrahend.Format(common.DateFormat) } func startDiffCalculationManager() *diffCalculationManager { dcm := diffCalculationManager{ waiting: make(map[string][]chan diffResult), cmds: make(chan func(*diffCalculationManager)), jobs: make(chan *models.DiffCalculationInput, maxQueueLength), } go dcm.run() return &dcm } func (dcm *diffCalculationManager) run() { n := runtime.NumCPU() if n > maxNumberWorkers { n = maxNumberWorkers } for i := 0; i < n; i++ { go dcm.calculate() } for cmd := range dcm.cmds { cmd(dcm) } } func doDiff(ctx context.Context, conn *sql.Conn, dci *models.DiffCalculationInput) (int64, error) { start := time.Now() begin := start minuendTree, err := mesh.FromCache( ctx, conn, dci.Bottleneck, dci.Minuend.Time) log.Infof("loading minuend mesh took %s\n", time.Since(start)) if err != nil { return 0, err } if minuendTree == nil { return 0, fmt.Errorf("cannot find survey for %s/%s", dci.Bottleneck, dci.Minuend.Format(common.DateFormat)) } epsg := minuendTree.EPSG() var box mesh.Box2D switch err := conn.QueryRowContext( ctx, commonDiffBBoxSQL, dci.Bottleneck, epsg, dci.Minuend.Time, dci.Subtrahend.Time, ).Scan(&box.X1, &box.Y1, &box.X2, &box.Y2); { case err == sql.ErrNoRows: return 0, errors.New("no such intersection") case err != nil: return 0, err } if box.Empty() { return 0, errors.New("intersection is empty") } log.Infof("bbox of intersection: (%.2f, %.2f) - (%.2f, %.2f)\n", box.X1, box.Y1, box.X2, box.Y2) start = time.Now() raster := mesh.NewRaster(box, isoCellSize) raster.Rasterize(minuendTree.Value) log.Infof("rasterizing minuend took %v\n", time.Since(start)) minuendTree = nil start = time.Now() subtrahendTree, err := mesh.FromCache( ctx, conn, dci.Bottleneck, dci.Subtrahend.Time) log.Infof("loading subtrahend mesh took %s\n", time.Since(start)) if err != nil { return 0, err } if subtrahendTree == nil { return 0, fmt.Errorf("cannot find survey for %s/%s", dci.Bottleneck, dci.Subtrahend.Format(common.DateFormat)) } // We need a slow path implementation for this. if epsg != subtrahendTree.EPSG() { return 0, errors.New("calculating differences between two different " + "EPSG code meshes are not supported, yet") } start = time.Now() raster.Diff(subtrahendTree.Value) log.Infof("A - B took %v\n", time.Since(start)) subtrahendTree = nil // XXX: Maybe we should start this transaction earlier!? var tx *sql.Tx if tx, err = conn.BeginTx(ctx, nil); err != nil { return 0, err } defer tx.Rollback() zMin, zMax, ok := raster.ZExtent() if !ok { return 0, errors.New("scans do not have common points") } log.Infof("z range: %.3f - %.3f\n", zMin, zMax) var heights mesh.ClassBreaks heights, err = mesh.LoadClassBreaks( ctx, tx, "morphology_classbreaks_compare") if err != nil { log.Warnf("Loading class breaks failed: %v\n", err) err = nil heights = mesh.SampleDiffHeights(zMin, zMax, contourStep) } else { heights = heights.ExtrapolateClassBreaks(zMin, zMax) } heights = common.DedupFloat64s(heights) log.Infof("num heights: %d\n", len(heights)) var isoStmt *sql.Stmt if isoStmt, err = tx.PrepareContext(ctx, insertDiffIsoAreasQL); err != nil { return 0, err } defer isoStmt.Close() var id int64 if err = tx.QueryRowContext( ctx, insertDiffSQL, dci.Bottleneck, dci.Minuend.Time, dci.Subtrahend.Time, ).Scan(&id); err != nil { return 0, err } areas := raster.Trace(heights) raster = nil var size int for i, a := range areas { if len(a) == 0 { continue } wkb := a.AsWKB() size += len(wkb) if _, err = isoStmt.ExecContext( ctx, id, heights[i], epsg, wkb, contourTolerance, ); err != nil { return 0, err } } log.Infof("Transferred WKB size: %.2fMB.\n", float64(size)/(1024*1024)) log.Infof("calculating and storing iso areas took %v\n", time.Since(start)) if err = tx.Commit(); err != nil { log.Infof("difference calculation failed after %v\n", time.Since(begin)) return 0, err } log.Infof("difference calculation succeed after %v\n", time.Since(begin)) return id, nil } func (dcm *diffCalculationManager) calculate() { for dci := range dcm.jobs { ctx := context.Background() var id int64 var err error err = auth.RunAs(ctx, "sys_admin", func(conn *sql.Conn) error { id, err = doDiff(ctx, conn, dci) return err }) dcm.cmds <- func(dcm *diffCalculationManager) { msg := diffResult{id: id, err: err} key := diffCalcHash(dci) list := dcm.waiting[key] delete(dcm.waiting, key) for _, ch := range list { ch <- msg } } } } func (dcm *diffCalculationManager) enque(dci *models.DiffCalculationInput) chan diffResult { // buffer to prevent blocking even if receiver is dead. ch := make(chan diffResult, 1) dcm.cmds <- func(dcm *diffCalculationManager) { // When already running add to list of waitings key := diffCalcHash(dci) if list := dcm.waiting[key]; list != nil { dcm.waiting[key] = append(list, ch) return } // look if it is in database var id int64 ctx := context.Background() err := auth.RunAs(ctx, "sys_admin", func(conn *sql.Conn) error { return conn.QueryRowContext( ctx, diffIDSQL, dci.Bottleneck, dci.Minuend.Time, dci.Subtrahend.Time, ).Scan(&id) }) switch { case err == sql.ErrNoRows: // We need to calculate it. case err != nil: // error ch <- diffResult{err: err} return default: // we have it ch <- diffResult{id: id} return } // add to waiting dcm.waiting[key] = []chan diffResult{ch} // feed to workers select { case dcm.jobs <- dci: default: // buffer is full delete(dcm.waiting, key) ch <- diffResult{err: errBufferFull} } } return ch } func (dcm *diffCalculationManager) unregister(dci *models.DiffCalculationInput, ch chan diffResult) { dcm.cmds <- func(dcm *diffCalculationManager) { key := diffCalcHash(dci) list := dcm.waiting[key] for i, x := range list { if x == ch { if i != len(list)-1 { copy(list[i:], list[i+1:]) } list[len(list)-1] = nil list = list[:len(list)-1] if len(list) == 0 { delete(dcm.waiting, key) } else { dcm.waiting[key] = list } return } } } } func diffCalculation(req *http.Request) (jr mw.JSONResult, err error) { dci := mw.JSONInput(req).(*models.DiffCalculationInput) ctx := req.Context() resultCh := diffCalcMng.enque(dci) defer diffCalcMng.unregister(dci, resultCh) select { case res := <-resultCh: switch { case res.err == errBufferFull: err = mw.JSONError{ Code: http.StatusTooManyRequests, Message: "Too many differences are waiting to be processed", } case res.err != nil: err = res.err default: jr = mw.JSONResult{ Result: map[string]int64{"id": res.id}, } } case <-ctx.Done(): log.Infoln("request canceled") } return }