Mercurial > gemma
view pkg/imports/sr.go @ 5403:85f19e924a43 marking-single-beam
Adjusted the import model to be able to handle marking single beam scans.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Tue, 06 Jul 2021 01:20:44 +0200 |
parents | 12a5e128255f |
children | 850f5847d18a |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Tom Gottfried <tom@intevation.de> package imports import ( "archive/zip" "bufio" "context" "crypto/sha1" "database/sql" "encoding/hex" "errors" "fmt" "io" "math" "os" "path" "path/filepath" "strconv" "strings" "time" shp "github.com/jonas-p/go-shp" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/mesh" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/wkb" ) // SoundingResult is a Job to import sounding reults // from a ZIP file into the database. type SoundingResult struct { // Dir is the folder in the file system where the // 'sr.zip' is expect to be. Dir string `json:"dir"` // Override data // Date if given overrides the date value from the meta.json. Date *models.Date `json:"date,omitempty"` // Date if given overrides the name of the bottleneck from the meta.json. Bottleneck *string `json:"bottleneck,omitempty"` // EPSG if given overrides the EPSG code from the meta.json. // Defaults to WGS84. EPSG *uint `json:"epsg,omitempty"` // DepthReference if given overides the DepthReference value // from the meta.json. DepthReference *string `json:"depth-reference,omitempty"` // SurveyType indicates that the sounding is a single beam scan. SurveyType *models.SurveyType `json:"survey-type,omitempty"` // NegateZ indicated that the Z values of thy XYZ input should be // multiplied by -1. NegateZ *bool `json:"negate-z,omitempty"` } const ( contourStepWidth = 0.1 contourTolerance = 0.1 ) const ( // multiBeamThreshold is the number of points m² which // is assumed that greater values are indicators for // an already interpolated point cloud. multiBeamThreshold = 1.0 / 5.0 ) const ( // pointsPerSquareMeter is the average number of points // when generating a artifical height model for single beam scans. pointsPerSquareMeter = 1.0 ) const ( // isoCellSize is the side length of a raster cell when tracing // iso areas. isoCellSize = 0.5 ) // SRJobKind is the unique name of this import job type. const SRJobKind JobKind = "sr" type srJobCreator struct{} func init() { RegisterJobCreator(SRJobKind, srJobCreator{}) } func (srJobCreator) Description() string { return "sounding results" } func (srJobCreator) AutoAccept() bool { return false } func (srJobCreator) Create() Job { return new(SoundingResult) } func (srJobCreator) Depends() [2][]string { return [2][]string{ {"sounding_results", "sounding_results_iso_areas"}, {"bottlenecks"}, } } // StageDone moves the imported sounding result out of the staging area. func (srJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, _ Feedback, ) error { _, err := tx.ExecContext(ctx, srStageDoneSQL, id) return err } const ( srStageDoneSQL = ` UPDATE waterway.sounding_results SET staging_done = true WHERE id = ( SELECT key from import.track_imports WHERE import_id = $1 AND relation = 'waterway.sounding_results'::regclass)` insertHullSQL = ` INSERT INTO waterway.sounding_results ( bottleneck_id, date_info, depth_reference, area, surtyp, zpg_exception ) SELECT bottleneck_id, $2::date, $3, (SELECT CASE WHEN $5::bytea IS NULL THEN ST_Transform(ST_ConcaveHull(ST_Force2D(ST_GeomFromWKB($4, $6::integer)), 0.7), 4326)::geography ELSE ST_MakeValid(ST_Transform(ST_GeomFromWKB($5, $6::integer), 4326))::geography END), $7, $8 FROM waterway.bottlenecks WHERE objnam = $1 AND validity @> CAST($2 AS timestamptz) RETURNING id, ST_X(ST_Centroid(area::geometry)), ST_Y(ST_Centroid(area::geometry)), best_utm(area), ST_AsBinary(ST_Transform(area::geometry, best_utm(area))) ` reprojectPointsBufferedSQL = ` SELECT ST_AsBinary(ST_Transform(ST_GeomFromWKB($1, $2::integer), $3::integer)), ST_AsBinary(ST_Buffer(ST_Transform(ST_GeomFromWKB($1, $2::integer), $3::integer), 0.1)), ST_Area(ST_Transform(ST_GeomFromWKB($1, $2::integer), $3::integer))` insertMeshSQL = ` UPDATE waterway.sounding_results SET mesh_checksum = $2, mesh_index = $3 WHERE id = $1` repairBoundarySQL = ` SELECT ST_AsBinary(ST_Buffer(ST_MakeValid(ST_GeomFromWKB($1, $2::integer)), 0.0)), ST_AsBinary(ST_Buffer(ST_MakeValid(ST_GeomFromWKB($1, $2::integer)), 0.1))` insertIsoAreasSQL = ` INSERT INTO waterway.sounding_results_iso_areas ( sounding_result_id, height, areas ) SELECT $1, $2, ST_Transform( ST_Multi( ST_Collectionextract( ST_SimplifyPreserveTopology(ST_GeomFromWKB($4, $3::integer), $5), 3 ) ), 4326 ) FROM waterway.sounding_results sr WHERE id = $1 ` selectGaugeLDCSQL = ` SELECT grwl.value, grwl.depth_reference FROM waterway.gauges_reference_water_levels grwl JOIN waterway.bottlenecks bns ON grwl.location = bns.gauge_location AND grwl.validity @> CAST($2 AS timestamptz) WHERE bns.objnam = $1 AND bns.validity @> CAST($2 AS timestamptz) AND grwl.depth_reference like 'LDC%' ` selectZPGExceptionAllowedSQL = ` SELECT rolname = 'sys_admin' OR country IN ('BG', 'RO') FROM users.list_users WHERE username = current_user` reprojectPointsSingleBeamSQL = ` SELECT ST_AsBinary( ST_Transform( ST_GeomFromWKB($1, $2::integer), best_utm(CAST(ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326) AS geography)))), best_utm(CAST(ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326) AS geography)) ` ) // Description gives a short info about relevant facts of this import. func (sr *SoundingResult) Description() (string, error) { var descs []string if sr.Bottleneck != nil { descs = append(descs, *sr.Bottleneck) } if sr.Date != nil { descs = append(descs, (*sr).Date.Format(common.DateFormat)) } if sr.NegateZ != nil && *sr.NegateZ { descs = append(descs, "negateZ") } return strings.Join(descs, "|"), nil } func (sr *SoundingResult) surveyType() models.SurveyType { if sr.SurveyType != nil { return *sr.SurveyType } return models.SurveyTypeMultiBeam } func (sr *SoundingResult) surtype() string { return string(sr.surveyType()) } func (sr *SoundingResult) negateZ() bool { return sr.NegateZ != nil && *sr.NegateZ } // Do executes the actual sounding result import. func (sr *SoundingResult) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() zpath := filepath.Join(sr.Dir, "sr.zip") z, err := zip.OpenReader(zpath) if err != nil { feedback.Info("%v", err) feedback.Info("Falling back to TXT file mode.") z = nil } if z != nil { defer z.Close() } feedback.Info("Looking for 'meta.json'") mf := common.FindInZIP(z, "meta.json") if mf == nil && !sr.completeOverride() { return nil, errors.New("cannot find 'meta.json'") } m, err := sr.loadMeta(mf) if err != nil { return nil, err } feedback.Info("Bottleneck: %s", m.Bottleneck) feedback.Info("Survey date: %s", m.Date.Format(common.DateFormat)) var xform vertexTransform if sr.negateZ() { xform = negateZTransform } else { xform = identityTransform } var zpgException bool if m.DepthReference == "ZPG" { feedback.Info("Found ZPG as reference system -> translating Z values to LDC") var ldc float64 var depthReference string err := conn.QueryRowContext(ctx, selectGaugeLDCSQL, m.Bottleneck, m.Date.Time, ).Scan( &ldc, &depthReference, ) switch { case err == sql.ErrNoRows: if err := conn.QueryRowContext( ctx, selectZPGExceptionAllowedSQL).Scan(&zpgException); err != nil { return nil, err } if !zpgException { return nil, errors.New("cannot load LDC value") } feedback.Warn("Not LCD found, but ZPG exception granted") case err != nil: return nil, err } if !zpgException { // LDC is cm. The data is in m. ldc /= 100 xform = chainTransforms( xform, func(v mesh.Vertex) mesh.Vertex { return mesh.Vertex{X: v.X, Y: v.Y, Z: v.Z + ldc} }) m.DepthReference = depthReference } } if err := m.Validate(ctx, conn); err != nil { return nil, common.ToError(err) } var xyz mesh.MultiPointZ if z != nil { // Scanning ZIP file for *.xyz file. var xyzf *zip.File for _, ext := range []string{".xyz", ".txt"} { feedback.Info("Looking for '*%s'", ext) if xyzf = common.FindInZIP(z, ext); xyzf != nil { break } } if xyzf == nil { return nil, errors.New("cannot find any *.xyz or *.txt file") } xyz, err = loadXYZ(xyzf, feedback, xform) } else { // TXT file mode xyz, err = loadXYZFile(zpath, feedback, xform) } if err != nil { return nil, err } if len(xyz) == 0 { return nil, errors.New("XYZ does not contain any vertices") } // Is there a boundary shapefile in the ZIP archive? boundary, err := loadBoundary(z) if err != nil { return nil, err } tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() summary, err := sr.processScan( ctx, tx, feedback, importID, m, xyz, boundary, zpgException, ) if err != nil { return nil, err } if err = tx.Commit(); err != nil { feedback.Error( "Storing sounding result failed after %v.", time.Since(start)) return nil, err } feedback.Info( "Storing sounding result was successful after %v.", time.Since(start)) return summary, nil } func (sr *SoundingResult) processScan( ctx context.Context, tx *sql.Tx, feedback Feedback, importID int64, m *models.SoundingResultMeta, xyz mesh.MultiPointZ, boundary polygonSlice, zpgException bool, ) (interface{}, error) { feedback.Info("Processing as %s beam scan.", sr.surtype()) feedback.Info("Reproject XYZ data.") start := time.Now() xyzWKB := xyz.AsWKB() var reproj []byte var epsg uint32 if err := tx.QueryRowContext( ctx, reprojectPointsSingleBeamSQL, xyzWKB, m.EPSG, ).Scan(&reproj, &epsg); err != nil { return nil, err } if err := xyz.FromWKB(reproj); err != nil { return nil, err } feedback.Info("Reprojecting points to EPSG %d took %v.", epsg, time.Since(start)) feedback.Info("Number of reprojected points: %d", len(xyz)) feedback.Info("Triangulate XYZ data.") start = time.Now() tri, err := mesh.Triangulate(xyz) if err != nil { return nil, err } feedback.Info("Triangulation took %v.", time.Since(start)) feedback.Info("Number triangles: %d.", len(tri.Triangles)/3) var ( clippingPolygon mesh.Polygon clippingPolygonBuffered mesh.Polygon removed map[int32]struct{} polygonArea float64 clippingPolygonWKB []byte tin *mesh.Tin ) if boundary == nil { feedback.Info("No boundary given. Calulate from XYZ data.") tooLongEdge := tri.EstimateTooLong() feedback.Info("Eliminate triangles with edges longer than %.2f meters.", tooLongEdge) var polygon mesh.LineStringZ start = time.Now() polygon, removed = tri.ConcaveHull(tooLongEdge) polygonArea = polygon.Area() if polygonArea < 0.0 { // counter clockwise polygonArea = -polygonArea polygon.Reverse() } clippingPolygon.FromLineStringZ(polygon) var buffered []byte if err := tx.QueryRowContext( ctx, repairBoundarySQL, clippingPolygon.AsWKB(), epsg, ).Scan(&clippingPolygonWKB, &buffered); err != nil { return nil, err } if err := clippingPolygon.FromWKB(clippingPolygonWKB); err != nil { return nil, err } if err := clippingPolygonBuffered.FromWKB(buffered); err != nil { return nil, err } feedback.Info("Calculating took %v.", time.Since(start)) } else { // has Boundary feedback.Info("Using uploaded boundary polygon.") var buffered []byte if err = tx.QueryRowContext( ctx, reprojectPointsBufferedSQL, boundary.asWKB(), m.EPSG, epsg, ).Scan(&clippingPolygonWKB, &buffered, &polygonArea); err != nil { return nil, err } if err := clippingPolygon.FromWKB(clippingPolygonWKB); err != nil { return nil, err } if err := clippingPolygonBuffered.FromWKB(buffered); err != nil { return nil, err } tin := tri.Tin() tin.EPSG = epsg var str mesh.STRTree str.Build(tin) removed = str.Clip(&clippingPolygon) } switch sr.surveyType() { case models.SurveyTypeMarking: // TODO: Implement me! return nil, errors.New("Not implemented, yet!") case models.SurveyTypeSingleBeam: origDensity := float64(len(xyz)) / polygonArea feedback.Info("Boundary area: %.2fm²", polygonArea) feedback.Info("Original point density: %.2f points/m²", origDensity) if origDensity > multiBeamThreshold { feedback.Warn("The density is greater than %.2f points/m².", multiBeamThreshold) feedback.Warn("It is assumed that the data is already interpolated.") goto multibeam } // Build the first mesh to generate random points on. feedback.Info("Build virtual DEM based on original XYZ data.") start = time.Now() tin := tri.Tin() var virtual mesh.STRTree virtual.BuildWithout(tin, removed) feedback.Info("Building took %v", time.Since(start)) numPoints := int(math.Ceil(polygonArea * pointsPerSquareMeter)) feedback.Info("Generate %d random points for an average density of ~%.2f points/m².", numPoints, pointsPerSquareMeter) start = time.Now() generated := make(mesh.LineStringZ, 0, numPoints+clippingPolygon.NumVertices(0)) mesh.GenerateRandomVertices( numPoints, tin.Min, tin.Max, virtual.Value, func(vertices []mesh.Vertex) { generated = append(generated, vertices...) }) feedback.Info("Generating %d points took %v.", len(generated), time.Since(start)) // Add the boundary to new point cloud. dupes := map[[2]float64]struct{}{} clippingPolygon.Vertices(0, func(x, y float64) { key := [2]float64{x, y} if _, found := dupes[key]; found { return } dupes[key] = struct{}{} if z, ok := virtual.Value(x, y); ok { generated = append(generated, mesh.Vertex{X: x, Y: y, Z: z}) } }) feedback.Info("Triangulate new point cloud.") xyz = mesh.MultiPointZ(generated) start = time.Now() tri, err = mesh.Triangulate(xyz) if err != nil { return nil, err } feedback.Info("Second triangulation took %v.", time.Since(start)) feedback.Info("Number triangles: %d.", len(tri.Triangles)/3) feedback.Info("Clipping triangles from new mesh.") } multibeam: start = time.Now() tin = tri.Tin() tin.EPSG = epsg var str mesh.STRTree str.Build(tin) feedback.Info("Building clipping index took %v", time.Since(start)) start = time.Now() clippingPolygonBuffered.Indexify() removed = str.Clip(&clippingPolygonBuffered) feedback.Info("Clipping took %v.", time.Since(start)) feedback.Info("Number of triangles to clip %d.", len(removed)) start = time.Now() final := mesh.STRTree{Entries: 16} final.BuildWithout(tin, removed) feedback.Info("Building final mesh took %v.", time.Since(start)) feedback.Info("Store mesh.") start = time.Now() var ( id int64 dummy uint32 lat, lon float64 ) var hull []byte err = tx.QueryRowContext( ctx, insertHullSQL, m.Bottleneck, m.Date.Time, m.DepthReference, nil, clippingPolygonWKB, epsg, sr.surtype(), zpgException, ).Scan( &id, &lat, &lon, &dummy, &hull, ) switch { case err == sql.ErrNoRows: return nil, fmt.Errorf( "no matching bottleneck of given name or time available: %v", err) case err != nil: return nil, err } index, err := final.Bytes() if err != nil { return nil, err } h := sha1.New() h.Write(index) checksum := hex.EncodeToString(h.Sum(nil)) _, err = tx.ExecContext(ctx, insertMeshSQL, id, checksum, index) if err != nil { return nil, err } feedback.Info("Storing mesh index took %s.", time.Since(start)) err = generateIsos(ctx, tx, feedback, &final, id) if err != nil { return nil, err } // Store for potential later removal. if err = track(ctx, tx, importID, "waterway.sounding_results", id); err != nil { return nil, err } summary := struct { Bottleneck string `json:"bottleneck"` Date models.Date `json:"date"` Lat float64 `json:"lat"` Lon float64 `json:"lon"` }{ Bottleneck: m.Bottleneck, Date: m.Date, Lat: lat, Lon: lon, } return &summary, nil } // CleanUp removes the folder containing the ZIP file with the // the sounding result import. func (sr *SoundingResult) CleanUp() error { return os.RemoveAll(sr.Dir) } func (sr *SoundingResult) completeOverride() bool { // sr.EPSG == nil -> WGS84 return sr.Bottleneck != nil && sr.Date != nil && sr.DepthReference != nil && sr.SurveyType != nil && sr.NegateZ != nil } func (sr *SoundingResult) loadMeta(f *zip.File) (*models.SoundingResultMeta, error) { if f == nil { var epsg uint if sr.EPSG != nil { epsg = *sr.EPSG } else { epsg = models.WGS84 } return &models.SoundingResultMeta{ Date: *sr.Date, Bottleneck: *sr.Bottleneck, EPSG: epsg, DepthReference: *sr.DepthReference, SurveyType: sr.surveyType(), NegateZ: sr.negateZ(), }, nil } r, err := f.Open() if err != nil { return nil, err } defer r.Close() var m models.SoundingResultMeta if err := m.Decode(r); err != nil { return nil, err } // Apply overrides if sr.Date != nil { m.Date = *sr.Date } if sr.Bottleneck != nil { m.Bottleneck = *sr.Bottleneck } if sr.EPSG != nil { m.EPSG = *sr.EPSG } if sr.DepthReference != nil { m.DepthReference = *sr.DepthReference } if sr.SurveyType != nil { m.SurveyType = *sr.SurveyType } if sr.NegateZ != nil { m.NegateZ = *sr.NegateZ } return &m, nil } type vertexTransform func(mesh.Vertex) mesh.Vertex func identityTransform(v mesh.Vertex) mesh.Vertex { return v } func negateZTransform(v mesh.Vertex) mesh.Vertex { return mesh.Vertex{X: v.X, Y: v.Y, Z: -v.Z} } func chainTransforms(a, b vertexTransform) vertexTransform { return func(v mesh.Vertex) mesh.Vertex { return b(a(v)) } } func loadXYZReader(r io.Reader, feedback Feedback, xform vertexTransform) (mesh.MultiPointZ, error) { mpz := make(mesh.MultiPointZ, 0, 250000) s := bufio.NewScanner(r) warnLimiter := common.WarningLimiter{Log: feedback.Warn, MaxWarnings: 100} warn := warnLimiter.Warn defer warnLimiter.Close() for line := 1; s.Scan(); line++ { text := s.Text() var p mesh.Vertex // fmt.Sscanf(text, "%f,%f,%f") is 4 times slower. idx := strings.IndexByte(text, ',') if idx == -1 { warn("format error in line %d", line) continue } var err error if p.X, err = strconv.ParseFloat(text[:idx], 64); err != nil { warn("format error in line %d: %v", line, err) continue } text = text[idx+1:] if idx = strings.IndexByte(text, ','); idx == -1 { feedback.Warn("format error in line %d", line) continue } if p.Y, err = strconv.ParseFloat(text[:idx], 64); err != nil { warn("format error in line %d: %v", line, err) continue } text = text[idx+1:] if p.Z, err = strconv.ParseFloat(text, 64); err != nil { warn("format error in line %d: %v", line, err) continue } p = xform(p) mpz = append(mpz, p) } if err := s.Err(); err != nil { return nil, err } return mpz, nil } func loadXYZ(f *zip.File, feedback Feedback, xform vertexTransform) (mesh.MultiPointZ, error) { r, err := f.Open() if err != nil { return nil, err } defer r.Close() return loadXYZReader(r, feedback, xform) } func loadXYZFile(f string, feedback Feedback, xform vertexTransform) (mesh.MultiPointZ, error) { r, err := os.Open(f) if err != nil { return nil, err } defer r.Close() return loadXYZReader(r, feedback, xform) } func loadBoundary(z *zip.ReadCloser) (polygonSlice, error) { shpF := common.FindInZIP(z, ".shp") if shpF == nil { return nil, nil } prefix := strings.TrimSuffix(shpF.Name, path.Ext(shpF.Name)) dbfF := common.FindInZIP(z, prefix+".dbf") if dbfF == nil { return nil, fmt.Errorf("no DBF file found for %s", shpF.Name) } shpR, err := shpF.Open() if err != nil { return nil, err } dbfR, err := dbfF.Open() if err != nil { shpR.Close() return nil, err } sr := shp.SequentialReaderFromExt(shpR, dbfR) defer sr.Close() if !sr.Next() { return nil, sr.Err() } _, s := sr.Shape() if s == nil { return nil, sr.Err() } return shapeToPolygon(s) } func generateIsos( ctx context.Context, tx *sql.Tx, feedback Feedback, tree *mesh.STRTree, id int64, ) error { heights, err := mesh.LoadClassBreaks( ctx, tx, "morphology_classbreaks", ) minZ, maxZ := tree.Min().Z, tree.Max().Z if err != nil { feedback.Warn("Loading class breaks failed: %v", err) feedback.Info("Using default class breaks") heights = nil h := contourStepWidth * math.Ceil(minZ/contourStepWidth) for ; h <= maxZ; h += contourStepWidth { heights = append(heights, h) } } else { heights = mesh.ExtrapolateClassBreaks(heights, minZ, maxZ) } /* for i, v := range heights { fmt.Printf("%d %.2f\n", i, v) } log.Printf("%.2f - %.2f\n", tree.Min.Z, tree.Max.Z) */ heights = common.DedupFloat64s(heights) return generateIsoAreas(ctx, tx, feedback, tree, heights, id) } func generateIsoAreas( ctx context.Context, tx *sql.Tx, feedback Feedback, tree *mesh.STRTree, heights []float64, id int64, ) error { feedback.Info("Generate iso areas") total := time.Now() defer func() { feedback.Info("Generating iso areas took %s.", time.Since(total)) }() box := mesh.Box2D{ X1: tree.Min().X, Y1: tree.Min().Y, X2: tree.Max().X, Y2: tree.Max().Y, } raster := mesh.NewRaster(box, isoCellSize) raster.Rasterize(tree.Value) areas := raster.Trace(heights) return storeAreas( ctx, tx, feedback, areas, tree.EPSG(), heights, id) } func storeAreas( ctx context.Context, tx *sql.Tx, feedback Feedback, areas []wkb.MultiPolygonGeom, epsg uint32, heights []float64, id int64, ) error { feedback.Info("Store iso areas.") total := time.Now() defer func() { feedback.Info("Storing iso areas took %v.", time.Since(total)) }() stmt, err := tx.PrepareContext(ctx, insertIsoAreasSQL) if err != nil { return err } defer stmt.Close() var size int for i, a := range areas { if len(a) == 0 { continue } wkb := a.AsWKB() size += len(wkb) if _, err := stmt.ExecContext( ctx, id, heights[i], epsg, wkb, contourTolerance, ); err != nil { return err } } feedback.Info("Transferred WKB size: %.2fMB.", float64(size)/(1024*1024)) return nil }