Mercurial > gemma
view pkg/imports/sr.go @ 4751:dbf07d0c364e
SR import: If the point density is greater than 0.2 values per meter² the interpolating step is omitted.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Fri, 18 Oct 2019 17:07:24 +0200 |
parents | 56bd9ba0354c |
children | 64979fec89a7 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Tom Gottfried <tom@intevation.de> package imports import ( "archive/zip" "bufio" "context" "crypto/sha1" "database/sql" "encoding/hex" "errors" "fmt" "io" "math" "os" "path" "path/filepath" "strconv" "strings" "time" shp "github.com/jonas-p/go-shp" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/octree" "gemma.intevation.de/gemma/pkg/wkb" ) // SoundingResult is a Job to import sounding reults // from a ZIP file into the database. type SoundingResult struct { // Dir is the folder in the file system where the // 'sr.zip' is expect to be. Dir string `json:"dir"` // Override data // Date if given overrides the date value from the meta.json. Date *models.Date `json:"date,omitempty"` // Date if given overrides the name of the bottleneck from the meta.json. Bottleneck *string `json:"bottleneck,omitempty"` // EPSG if given overrides the EPSG code from the meta.json. // Defaults to WGS84. EPSG *uint `json:"epsg,omitempty"` // DepthReference if given overides the DepthReference value // from the meta.json. DepthReference *string `json:"depth-reference,omitempty"` // SingleBeam indicates that the sounding is a single beam scan. SingleBeam *bool `json:"single-beam,omitempty"` // NegateZ indicated that the Z values of thy XYZ input should be // multiplied by -1. NegateZ *bool `json:"negate-z,omitempty"` } const ( contourStepWidth = 0.1 contourTolerance = 0.1 ) const ( // multiBeamThreshold is the number of points m² which // is assumed that greater values are indicators for // an already interpolated point cloud. multiBeamThreshold = 1.0 / 5.0 ) const ( // pointsPerSquareMeter is the average number of points // when generating a artifical height model for single beam scans. pointsPerSquareMeter = 2 ) const ( // isoCellSize is the side length of a raster cell when tracing // iso areas. isoCellSize = 0.5 ) // SRJobKind is the unique name of this import job type. const SRJobKind JobKind = "sr" type srJobCreator struct{} func init() { RegisterJobCreator(SRJobKind, srJobCreator{}) } func (srJobCreator) Description() string { return "sounding results" } func (srJobCreator) AutoAccept() bool { return false } func (srJobCreator) Create() Job { return new(SoundingResult) } func (srJobCreator) Depends() [2][]string { return [2][]string{ {"sounding_results", "sounding_results_iso_areas"}, {"bottlenecks"}, } } func (srJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, ) error { _, err := tx.ExecContext(ctx, srStageDoneSQL, id) return err } const ( srStageDoneSQL = ` UPDATE waterway.sounding_results SET staging_done = true WHERE id = ( SELECT key from import.track_imports WHERE import_id = $1 AND relation = 'waterway.sounding_results'::regclass)` insertHullSQL = ` INSERT INTO waterway.sounding_results ( bottleneck_id, date_info, depth_reference, area, surtyp ) SELECT bottleneck_id, $2::date, $3, (SELECT CASE WHEN $5::bytea IS NULL THEN ST_Transform(ST_ConcaveHull(ST_Force2D(ST_GeomFromWKB($4, $6::integer)), 0.7), 4326)::geography ELSE ST_MakeValid(ST_Transform(ST_GeomFromWKB($5, $6::integer), 4326))::geography END), $7 FROM waterway.bottlenecks WHERE objnam = $1 AND validity @> CAST($2 AS timestamptz) RETURNING id, ST_X(ST_Centroid(area::geometry)), ST_Y(ST_Centroid(area::geometry)), best_utm(area), ST_AsBinary(ST_Transform(area::geometry, best_utm(area))) ` reprojectPointsBufferedSQL = ` SELECT ST_AsBinary(ST_Transform(ST_GeomFromWKB($1, $2::integer), $3::integer)), ST_AsBinary(ST_Buffer(ST_Transform(ST_GeomFromWKB($1, $2::integer), $3::integer), 0.1)), ST_Area(ST_Transform(ST_GeomFromWKB($1, $2::integer), $3::integer))` insertMeshSQL = ` UPDATE waterway.sounding_results SET mesh_checksum = $2, mesh_index = $3 WHERE id = $1` repairBoundarySQL = ` SELECT ST_AsBinary(ST_Buffer(ST_MakeValid(ST_GeomFromWKB($1, $2::integer)), 0.0)), ST_AsBinary(ST_Buffer(ST_MakeValid(ST_GeomFromWKB($1, $2::integer)), 0.1))` insertIsoAreasSQL = ` INSERT INTO waterway.sounding_results_iso_areas ( sounding_result_id, height, areas ) SELECT $1, $2, ST_Transform( ST_Multi( ST_Collectionextract( ST_SimplifyPreserveTopology(ST_GeomFromWKB($4, $3::integer), $5), 3 ) ), 4326 ) FROM waterway.sounding_results sr WHERE id = $1 ` selectGaugeLDCSQL = ` SELECT grwl.value, grwl.depth_reference FROM waterway.gauges_reference_water_levels grwl JOIN waterway.bottlenecks bns ON grwl.location = bns.gauge_location AND grwl.validity @> CAST($2 AS timestamptz) WHERE bns.objnam = $1 AND bns.validity @> CAST($2 AS timestamptz) AND grwl.depth_reference like 'LDC%' ` reprojectPointsSingleBeamSQL = ` SELECT ST_AsBinary( ST_Transform( ST_GeomFromWKB($1, $2::integer), best_utm(CAST(ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326) AS geography)))), best_utm(CAST(ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326) AS geography)) ` ) func (sr *SoundingResult) singleBeam() bool { return sr.SingleBeam != nil && *sr.SingleBeam } func (sr *SoundingResult) surtype() string { if sr.singleBeam() { return "single" } return "multi" } func (sr *SoundingResult) negateZ() bool { return sr.NegateZ != nil && *sr.NegateZ } // Do executes the actual sounding result import. func (sr *SoundingResult) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() zpath := filepath.Join(sr.Dir, "sr.zip") z, err := zip.OpenReader(zpath) if err != nil { feedback.Info("%v", err) feedback.Info("Falling back to TXT file mode.") z = nil } if z != nil { defer z.Close() } feedback.Info("Looking for 'meta.json'") mf := common.FindInZIP(z, "meta.json") if mf == nil && !sr.completeOverride() { return nil, errors.New("cannot find 'meta.json'") } m, err := sr.loadMeta(mf) if err != nil { return nil, err } feedback.Info("Bottleneck: %s", m.Bottleneck) feedback.Info("Survey date: %s", m.Date.Format(common.DateFormat)) var xform vertexTransform if sr.negateZ() { xform = negateZTransform } else { xform = identityTransform } if m.DepthReference == "ZPG" { feedback.Info("Found ZPG as reference system -> translating Z values to LDC") var ldc float64 var depthReference string err := conn.QueryRowContext(ctx, selectGaugeLDCSQL, m.Bottleneck, m.Date.Time, ).Scan( &ldc, &depthReference, ) switch { case err == sql.ErrNoRows: return nil, errors.New("cannot load LDC value") case err != nil: return nil, err } // LDC is cm. The data is in m. ldc /= 100 xform = chainTransforms( xform, func(v octree.Vertex) octree.Vertex { return octree.Vertex{X: v.X, Y: v.Y, Z: v.Z + ldc} }) m.DepthReference = depthReference } if err := m.Validate(ctx, conn); err != nil { return nil, common.ToError(err) } var xyz octree.MultiPointZ if z != nil { // Scanning ZIP file for *.xyz file. var xyzf *zip.File for _, ext := range []string{".xyz", ".txt"} { feedback.Info("Looking for '*%s'", ext) if xyzf = common.FindInZIP(z, ext); xyzf != nil { break } } if xyzf == nil { return nil, errors.New("cannot find any *.xyz or *.txt file") } xyz, err = loadXYZ(xyzf, feedback, xform) } else { // TXT file mode xyz, err = loadXYZFile(zpath, feedback, xform) } if err != nil { return nil, err } if len(xyz) == 0 { return nil, errors.New("XYZ does not contain any vertices") } // Is there a boundary shapefile in the ZIP archive? boundary, err := loadBoundary(z) if err != nil { return nil, err } tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() summary, err := sr.processScan( ctx, tx, feedback, importID, m, xyz, boundary, ) if err != nil { return nil, err } if err = tx.Commit(); err != nil { feedback.Error( "Storing sounding result failed after %v.", time.Since(start)) return nil, err } feedback.Info( "Storing sounding result was successful after %v.", time.Since(start)) return summary, nil } func (sr *SoundingResult) processScan( ctx context.Context, tx *sql.Tx, feedback Feedback, importID int64, m *models.SoundingResultMeta, xyz octree.MultiPointZ, boundary polygonSlice, ) (interface{}, error) { if sr.singleBeam() { feedback.Info("Processing as single beam scan.") } else { feedback.Info("Processing as multi beam scan.") } feedback.Info("Reproject XYZ data.") start := time.Now() xyzWKB := xyz.AsWKB() var reproj []byte var epsg uint32 if err := tx.QueryRowContext( ctx, reprojectPointsSingleBeamSQL, xyzWKB, m.EPSG, ).Scan(&reproj, &epsg); err != nil { return nil, err } if err := xyz.FromWKB(reproj); err != nil { return nil, err } feedback.Info("Reprojecting points to EPSG %d took %v.", epsg, time.Since(start)) feedback.Info("Number of reprojected points: %d", len(xyz)) feedback.Info("Triangulate XYZ data.") start = time.Now() tri, err := octree.Triangulate(xyz) if err != nil { return nil, err } feedback.Info("Triangulation took %v.", time.Since(start)) feedback.Info("Number triangles: %d.", len(tri.Triangles)/3) var ( clippingPolygon octree.Polygon clippingPolygonBuffered octree.Polygon removed map[int32]struct{} polygonArea float64 clippingPolygonWKB []byte tin *octree.Tin ) if boundary == nil { feedback.Info("No boundary given. Calulate from XYZ data.") tooLongEdge := tri.EstimateTooLong() feedback.Info("Eliminate triangles with edges longer than %.2f meters.", tooLongEdge) var polygon octree.LineStringZ start = time.Now() polygon, removed = tri.ConcaveHull(tooLongEdge) polygonArea = polygon.Area() if polygonArea < 0.0 { // counter clockwise polygonArea = -polygonArea polygon.Reverse() } clippingPolygon.FromLineStringZ(polygon) var buffered []byte if err := tx.QueryRowContext( ctx, repairBoundarySQL, clippingPolygon.AsWKB(), epsg, ).Scan(&clippingPolygonWKB, &buffered); err != nil { return nil, err } if err := clippingPolygon.FromWKB(clippingPolygonWKB); err != nil { return nil, err } if err := clippingPolygonBuffered.FromWKB(buffered); err != nil { return nil, err } feedback.Info("Calculating took %v.", time.Since(start)) } else { // has Boundary feedback.Info("Using uploaded boundary polygon.") var buffered []byte if err = tx.QueryRowContext( ctx, reprojectPointsBufferedSQL, boundary.asWKB(), m.EPSG, epsg, ).Scan(&clippingPolygonWKB, &buffered, &polygonArea); err != nil { return nil, err } if err := clippingPolygon.FromWKB(clippingPolygonWKB); err != nil { return nil, err } if err := clippingPolygonBuffered.FromWKB(buffered); err != nil { return nil, err } tin := tri.Tin() tin.EPSG = epsg var str octree.STRTree str.Build(tin) removed = str.Clip(&clippingPolygon) } if sr.singleBeam() { origDensity := float64(len(xyz)) / polygonArea feedback.Info("Boundary area: %.2fm²", polygonArea) feedback.Info("Original point density: %.2f points/m²", origDensity) if origDensity > multiBeamThreshold { feedback.Warn("The density is greater than %.2f points/m².") feedback.Warn("It is assumed that the data is already interpolated.") goto multibeam } // Build the first mesh to generate random points on. feedback.Info("Build virtual DEM based on original XYZ data.") start = time.Now() tin := tri.Tin() var virtual octree.STRTree virtual.BuildWithout(tin, removed) feedback.Info("Building took %v", time.Since(start)) numPoints := int(math.Ceil(polygonArea * pointsPerSquareMeter)) feedback.Info("Generate %d random points for an average density of ~%d points/m².", numPoints, pointsPerSquareMeter) start = time.Now() generated := make(octree.LineStringZ, 0, numPoints+clippingPolygon.NumVertices(0)) octree.GenerateRandomVertices( numPoints, tin.Min, tin.Max, virtual.Value, func(vertices []octree.Vertex) { generated = append(generated, vertices...) }) feedback.Info("Generating %d points took %v.", len(generated), time.Since(start)) // Add the boundary to new point cloud. dupes := map[[2]float64]struct{}{} clippingPolygon.Vertices(0, func(x, y float64) { key := [2]float64{x, y} if _, found := dupes[key]; found { return } dupes[key] = struct{}{} if z, ok := virtual.Value(x, y); ok { generated = append(generated, octree.Vertex{X: x, Y: y, Z: z}) } }) feedback.Info("Triangulate new point cloud.") xyz = octree.MultiPointZ(generated) start = time.Now() tri, err = octree.Triangulate(xyz) if err != nil { return nil, err } feedback.Info("Second triangulation took %v.", time.Since(start)) feedback.Info("Number triangles: %d.", len(tri.Triangles)/3) feedback.Info("Clipping triangles from new mesh.") } multibeam: start = time.Now() tin = tri.Tin() tin.EPSG = epsg var str octree.STRTree str.Build(tin) feedback.Info("Building clipping index took %v", time.Since(start)) start = time.Now() clippingPolygonBuffered.Indexify() removed = str.Clip(&clippingPolygonBuffered) feedback.Info("Clipping took %v.", time.Since(start)) feedback.Info("Number of triangles to clip %d.", len(removed)) start = time.Now() final := octree.STRTree{Entries: 16} final.BuildWithout(tin, removed) feedback.Info("Building final mesh took %v.", time.Since(start)) feedback.Info("Store mesh.") start = time.Now() var ( id int64 dummy uint32 lat, lon float64 ) var hull []byte err = tx.QueryRowContext( ctx, insertHullSQL, m.Bottleneck, m.Date.Time, m.DepthReference, nil, clippingPolygonWKB, epsg, sr.surtype(), ).Scan( &id, &lat, &lon, &dummy, &hull, ) switch { case err == sql.ErrNoRows: return nil, fmt.Errorf( "no matching bottleneck of given name or time available: %v", err) case err != nil: return nil, err } index, err := final.Bytes() if err != nil { return nil, err } h := sha1.New() h.Write(index) checksum := hex.EncodeToString(h.Sum(nil)) _, err = tx.ExecContext(ctx, insertMeshSQL, id, checksum, index) if err != nil { return nil, err } feedback.Info("Storing mesh index took %s.", time.Since(start)) err = generateIsos(ctx, tx, feedback, &final, id) if err != nil { return nil, err } // Store for potential later removal. if err = track(ctx, tx, importID, "waterway.sounding_results", id); err != nil { return nil, err } summary := struct { Bottleneck string `json:"bottleneck"` Date models.Date `json:"date"` Lat float64 `json:"lat"` Lon float64 `json:"lon"` }{ Bottleneck: m.Bottleneck, Date: m.Date, Lat: lat, Lon: lon, } return &summary, nil } // CleanUp removes the folder containing the ZIP file with the // the sounding result import. func (sr *SoundingResult) CleanUp() error { return os.RemoveAll(sr.Dir) } func (sr *SoundingResult) completeOverride() bool { // sr.EPSG == nil -> WGS84 return sr.Bottleneck != nil && sr.Date != nil && sr.DepthReference != nil && sr.SingleBeam != nil && sr.NegateZ != nil } func (sr *SoundingResult) loadMeta(f *zip.File) (*models.SoundingResultMeta, error) { if f == nil { var epsg uint if sr.EPSG != nil { epsg = *sr.EPSG } else { epsg = models.WGS84 } return &models.SoundingResultMeta{ Date: *sr.Date, Bottleneck: *sr.Bottleneck, EPSG: epsg, DepthReference: *sr.DepthReference, SingleBeam: sr.singleBeam(), NegateZ: sr.negateZ(), }, nil } r, err := f.Open() if err != nil { return nil, err } defer r.Close() var m models.SoundingResultMeta if err := m.Decode(r); err != nil { return nil, err } // Apply overrides if sr.Date != nil { m.Date = *sr.Date } if sr.Bottleneck != nil { m.Bottleneck = *sr.Bottleneck } if sr.EPSG != nil { m.EPSG = *sr.EPSG } if sr.DepthReference != nil { m.DepthReference = *sr.DepthReference } if sr.SingleBeam != nil { m.SingleBeam = *sr.SingleBeam } if sr.NegateZ != nil { m.NegateZ = *sr.NegateZ } return &m, nil } type vertexTransform func(octree.Vertex) octree.Vertex func identityTransform(v octree.Vertex) octree.Vertex { return v } func negateZTransform(v octree.Vertex) octree.Vertex { return octree.Vertex{X: v.X, Y: v.Y, Z: -v.Z} } func chainTransforms(a, b vertexTransform) vertexTransform { return func(v octree.Vertex) octree.Vertex { return b(a(v)) } } func loadXYZReader(r io.Reader, feedback Feedback, xform vertexTransform) (octree.MultiPointZ, error) { mpz := make(octree.MultiPointZ, 0, 250000) s := bufio.NewScanner(r) warnLimiter := common.WarningLimiter{Log: feedback.Warn, MaxWarnings: 100} warn := warnLimiter.Warn defer warnLimiter.Close() for line := 1; s.Scan(); line++ { text := s.Text() var p octree.Vertex // fmt.Sscanf(text, "%f,%f,%f") is 4 times slower. idx := strings.IndexByte(text, ',') if idx == -1 { warn("format error in line %d", line) continue } var err error if p.X, err = strconv.ParseFloat(text[:idx], 64); err != nil { warn("format error in line %d: %v", line, err) continue } text = text[idx+1:] if idx = strings.IndexByte(text, ','); idx == -1 { feedback.Warn("format error in line %d", line) continue } if p.Y, err = strconv.ParseFloat(text[:idx], 64); err != nil { warn("format error in line %d: %v", line, err) continue } text = text[idx+1:] if p.Z, err = strconv.ParseFloat(text, 64); err != nil { warn("format error in line %d: %v", line, err) continue } p = xform(p) mpz = append(mpz, p) } if err := s.Err(); err != nil { return nil, err } return mpz, nil } func loadXYZ(f *zip.File, feedback Feedback, xform vertexTransform) (octree.MultiPointZ, error) { r, err := f.Open() if err != nil { return nil, err } defer r.Close() return loadXYZReader(r, feedback, xform) } func loadXYZFile(f string, feedback Feedback, xform vertexTransform) (octree.MultiPointZ, error) { r, err := os.Open(f) if err != nil { return nil, err } defer r.Close() return loadXYZReader(r, feedback, xform) } func loadBoundary(z *zip.ReadCloser) (polygonSlice, error) { shpF := common.FindInZIP(z, ".shp") if shpF == nil { return nil, nil } prefix := strings.TrimSuffix(shpF.Name, path.Ext(shpF.Name)) dbfF := common.FindInZIP(z, prefix+".dbf") if dbfF == nil { return nil, fmt.Errorf("no DBF file found for %s", shpF.Name) } shpR, err := shpF.Open() if err != nil { return nil, err } dbfR, err := dbfF.Open() if err != nil { shpR.Close() return nil, err } sr := shp.SequentialReaderFromExt(shpR, dbfR) defer sr.Close() if !sr.Next() { return nil, sr.Err() } _, s := sr.Shape() if s == nil { return nil, sr.Err() } return shapeToPolygon(s) } func generateIsos( ctx context.Context, tx *sql.Tx, feedback Feedback, tree *octree.STRTree, id int64, ) error { heights, err := octree.LoadClassBreaks( ctx, tx, "morphology_classbreaks", ) minZ, maxZ := tree.Min().Z, tree.Max().Z if err != nil { feedback.Warn("Loading class breaks failed: %v", err) feedback.Info("Using default class breaks") heights = nil h := contourStepWidth * math.Ceil(minZ/contourStepWidth) for ; h <= maxZ; h += contourStepWidth { heights = append(heights, h) } } else { heights = octree.ExtrapolateClassBreaks(heights, minZ, maxZ) // We set steps for InBetweenClassBreaks to 1, so it // becomes a null operation. The extra class breaks // were considered unexpected and confusing by the // users. Once we get filled polygones the visual will // be considerably different anyway. -- sw // heights = octree.InBetweenClassBreaks(heights, 0.05, 1) } /* for i, v := range heights { fmt.Printf("%d %.2f\n", i, v) } log.Printf("%.2f - %.2f\n", tree.Min.Z, tree.Max.Z) */ heights = common.DedupFloat64s(heights) return generateIsoAreas(ctx, tx, feedback, tree, heights, id) } func generateIsoAreas( ctx context.Context, tx *sql.Tx, feedback Feedback, tree *octree.STRTree, heights []float64, id int64, ) error { feedback.Info("Generate iso areas") total := time.Now() defer func() { feedback.Info("Generating iso areas took %s.", time.Since(total)) }() areas := octree.TraceAreas(heights, isoCellSize, tree.Min(), tree.Max(), tree.Value) return storeAreas( ctx, tx, feedback, areas, tree.EPSG(), heights, id) } func storeAreas( ctx context.Context, tx *sql.Tx, feedback Feedback, areas []wkb.MultiPolygonGeom, epsg uint32, heights []float64, id int64, ) error { feedback.Info("Store iso areas.") total := time.Now() defer func() { feedback.Info("Storing iso areas took %v.", time.Since(total)) }() stmt, err := tx.PrepareContext(ctx, insertIsoAreasSQL) if err != nil { return err } defer stmt.Close() var size int for i, a := range areas { if len(a) == 0 { continue } wkb := a.AsWKB() size += len(wkb) if _, err := stmt.ExecContext( ctx, id, heights[i], epsg, wkb, contourTolerance, ); err != nil { return err } } feedback.Info("Transferred WKB size: %.2fMB.", float64(size)/(1024*1024)) return nil }