Mercurial > gemma
view pkg/imports/sr.go @ 3748:4bb5dfa0b7e3
SR import: Accept TXT file for uploads.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Mon, 24 Jun 2019 16:28:12 +0200 |
parents | 879c297c47e9 |
children | 98d5dd2f0ca1 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Tom Gottfried <tom@intevation.de> package imports import ( "archive/zip" "bufio" "context" "crypto/sha1" "database/sql" "encoding/hex" "errors" "fmt" "io" "math" "os" "path" "path/filepath" "strconv" "strings" "time" shp "github.com/jonas-p/go-shp" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/misc" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/octree" ) // SoundingResult is a Job to import sounding reults // from a ZIP file into the database. type SoundingResult struct { // Dir is the folder in the file system where the // 'sr.zip' is expect to be. Dir string `json:"dir"` // Override data // Date if given overrides the date value from the meta.json. Date *models.Date `json:"date,omitempty"` // Date if given overrides the name of the bottleneck from the meta.json. Bottleneck *string `json:"bottleneck,omitempty"` // EPSG if given overrides the EPSG code from the meta.json. // Defaults to WGS84. EPSG *uint `json:"epsg,omitempty"` // DepthReference if given overides the DepthReference value // from the meta.json. DepthReference *string `json:"depth-reference,omitempty"` // SingleBeam indicates that the sounding is a single beam scan. SingleBeam *bool `json:"single-beam,omitempty"` } const ( contourStepWidth = 0.1 contourTolerance = 0.1 ) const ( pointsPerSquareMeter = 2 ) // SRJobKind is the unique name of a SoundingResult import job. const SRJobKind JobKind = "sr" type srJobCreator struct{} func init() { RegisterJobCreator(SRJobKind, srJobCreator{}) } func (srJobCreator) Description() string { return "sounding results" } func (srJobCreator) AutoAccept() bool { return false } func (srJobCreator) Create() Job { return new(SoundingResult) } func (srJobCreator) Depends() [2][]string { return [2][]string{ {"sounding_results", "sounding_results_contour_lines"}, {"bottlenecks"}, } } func (srJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, ) error { _, err := tx.ExecContext(ctx, srStageDoneSQL, id) return err } const ( srStageDoneSQL = ` UPDATE waterway.sounding_results SET staging_done = true WHERE id = ( SELECT key from import.track_imports WHERE import_id = $1 AND relation = 'waterway.sounding_results'::regclass)` insertHullSQL = ` INSERT INTO waterway.sounding_results ( bottleneck_id, bottleneck_validity, date_info, depth_reference, area ) SELECT bottleneck_id, validity, $2::date, $3, (SELECT CASE WHEN $5::bytea IS NULL THEN ST_Transform(ST_ConcaveHull(ST_Force2D(ST_GeomFromWKB($4, $6::integer)), 0.7), 4326)::geography ELSE ST_MakeValid(ST_Transform(ST_GeomFromWKB($5, $6::integer), 4326))::geography END) FROM waterway.bottlenecks WHERE objnam = $1 AND validity @> CAST($2 AS timestamptz) RETURNING id, ST_X(ST_Centroid(area::geometry)), ST_Y(ST_Centroid(area::geometry)), best_utm(area), ST_AsBinary(ST_Transform(area::geometry, best_utm(area))) ` reprojectPointsSQL = ` SELECT ST_AsBinary(ST_Transform(ST_GeomFromWKB($1, $2::integer), $3::integer))` reprojectPointsBufferedSQL = ` SELECT ST_AsBinary(ST_Transform(ST_GeomFromWKB($1, $2::integer), $3::integer)), ST_AsBinary(ST_Buffer(ST_Transform(ST_GeomFromWKB($1, $2::integer), $3::integer), 0.1)), ST_Area(ST_Transform(ST_GeomFromWKB($1, $2::integer), $3::integer))` insertOctreeSQL = ` UPDATE waterway.sounding_results SET octree_checksum = $2, octree_index = $3 WHERE id = $1` repairBoundarySQL = ` SELECT ST_AsBinary(ST_Buffer(ST_MakeValid(ST_GeomFromWKB($1, $2::integer)), 0.0)), ST_AsBinary(ST_Buffer(ST_MakeValid(ST_GeomFromWKB($1, $2::integer)), 0.1))` insertContourSQL = ` INSERT INTO waterway.sounding_results_contour_lines ( sounding_result_id, height, lines ) SELECT $1, $2, ST_Transform( ST_Multi( ST_CollectionExtract( ST_SimplifyPreserveTopology( ST_Multi(ST_Collectionextract( ST_MakeValid(ST_GeomFromWKB($4, $3::integer)), 2)), $5 ), 2 ) ), 4326 ) FROM waterway.sounding_results sr WHERE id = $1 ` selectGaugeLDCSQL = ` SELECT grwl.value, grwl.depth_reference FROM waterway.gauges_reference_water_levels grwl JOIN waterway.bottlenecks bns ON grwl.location = bns.gauge_location AND grwl.validity = bns.gauge_validity WHERE bns.objnam = $1 AND bns.validity @> CAST($2 AS timestamptz) AND grwl.depth_reference like 'LDC%' ` reprojectPointsSingleBeamSQL = ` SELECT ST_AsBinary( ST_Transform( ST_GeomFromWKB($1, $2::integer), best_utm(CAST(ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326) AS geography)))), best_utm(CAST(ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326) AS geography)) ` ) func (sr *SoundingResult) isSingleBeam() bool { return sr.SingleBeam != nil && *sr.SingleBeam } // Do executes the actual sounding result import. func (sr *SoundingResult) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() zpath := filepath.Join(sr.Dir, "sr.zip") z, err := zip.OpenReader(zpath) if err != nil { feedback.Warn("Expected ZIP file: %v", err) feedback.Warn("Falling back to TXT file mode.") z = nil } if z != nil { defer z.Close() } feedback.Info("Looking for 'meta.json'") mf := common.FindInZIP(z, "meta.json") if mf == nil && !sr.completeOverride() { return nil, errors.New("Cannot find 'meta.json'") } m, err := sr.loadMeta(mf) if err != nil { return nil, err } feedback.Info("Bottleneck: %s", m.Bottleneck) feedback.Info("Survey date: %s", m.Date.Format(common.DateFormat)) var xform vertexTransform if m.DepthReference == "ZPG" { feedback.Info("Found ZPG as reference system -> translating Z values to LDC") var ldc float64 var depthReference string err := conn.QueryRowContext(ctx, selectGaugeLDCSQL, m.Bottleneck, m.Date.Time, ).Scan( &ldc, &depthReference, ) switch { case err == sql.ErrNoRows: return nil, errors.New("Cannot load LDC value") case err != nil: return nil, err } xform = func(v octree.Vertex) octree.Vertex { return octree.Vertex{X: v.X, Y: v.Y, Z: ldc - v.Z} } m.DepthReference = depthReference } if err := m.Validate(ctx, conn); err != nil { return nil, common.ToError(err) } var xyz octree.MultiPointZ if z != nil { // Scanning ZIP file for *.xyz file. var xyzf *zip.File for _, ext := range []string{".xyz", ".txt"} { feedback.Info("Looking for '*%s'", ext) if xyzf = common.FindInZIP(z, ext); xyzf != nil { break } } if xyzf == nil { return nil, errors.New("Cannot find any *.xyz or *.txt file") } xyz, err = loadXYZ(xyzf, feedback, xform) } else { // TXT file mode xyz, err = loadXYZFile(zpath, feedback, xform) } if err != nil { return nil, err } if len(xyz) == 0 { return nil, errors.New("XYZ does not contain any vertices") } // Is there a boundary shapefile in the ZIP archive? boundary, err := loadBoundary(z) if err != nil { return nil, err } tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() var summary interface{} if sr.isSingleBeam() { summary, err = sr.processScan( ctx, tx, feedback, true, importID, m, xyz, boundary, ) } else { summary, err = sr.processScan( ctx, tx, feedback, false, importID, m, xyz, boundary, ) } if err != nil { return nil, err } if err = tx.Commit(); err != nil { feedback.Error( "Storing sounding result failed after %v.", time.Since(start)) return nil, err } feedback.Info( "Storing sounding result was successful after %v.", time.Since(start)) return summary, nil } func (sr *SoundingResult) processScan( ctx context.Context, tx *sql.Tx, feedback Feedback, singleBeam bool, importID int64, m *models.SoundingResultMeta, xyz octree.MultiPointZ, boundary polygonSlice, ) (interface{}, error) { if singleBeam { feedback.Info("Processing as single beam scan.") } else { feedback.Info("Processing as multi beam scan.") } feedback.Info("Reproject XYZ data.") start := time.Now() xyzWKB := xyz.AsWKB() var reproj []byte var epsg uint32 if err := tx.QueryRowContext( ctx, reprojectPointsSingleBeamSQL, xyzWKB, m.EPSG, ).Scan(&reproj, &epsg); err != nil { return nil, err } if err := xyz.FromWKB(reproj); err != nil { return nil, err } feedback.Info("Reprojecting points to EPSG %d took %v.", epsg, time.Since(start)) feedback.Info("Number of reprojected points: %d", len(xyz)) feedback.Info("Triangulate XYZ data.") start = time.Now() tri, err := octree.Triangulate(xyz) if err != nil { return nil, err } feedback.Info("Triangulation took %v.", time.Since(start)) feedback.Info("Number triangles: %d.", len(tri.Triangles)/3) var ( clippingPolygon octree.Polygon clippingPolygonBuffered octree.Polygon removed map[int32]struct{} polygonArea float64 clippingPolygonWKB []byte tin *octree.Tin ) if boundary == nil { feedback.Info("No boundary given. Calulate from XYZ data.") tooLongEdge := tri.EstimateTooLong() feedback.Info("Eliminate triangles with edges longer than %.2f meters.", tooLongEdge) var polygon octree.LineStringZ start = time.Now() polygon, removed = tri.ConcaveHull(tooLongEdge) polygonArea = polygon.Area() if polygonArea < 0.0 { // counter clockwise polygonArea = -polygonArea polygon.Reverse() } clippingPolygon.FromLineStringZ(polygon) var buffered []byte if err := tx.QueryRowContext( ctx, repairBoundarySQL, clippingPolygon.AsWKB(), epsg, ).Scan(&clippingPolygonWKB, &buffered); err != nil { return nil, err } if err := clippingPolygon.FromWKB(clippingPolygonWKB); err != nil { return nil, err } if err := clippingPolygonBuffered.FromWKB(buffered); err != nil { return nil, err } feedback.Info("Calculating took %v.", time.Since(start)) } else { // has Boundary feedback.Info("Using uploaded boundary polygon.") var buffered []byte if err = tx.QueryRowContext( ctx, reprojectPointsBufferedSQL, boundary.asWKB(), m.EPSG, epsg, ).Scan(&clippingPolygonWKB, &buffered, &polygonArea); err != nil { return nil, err } if err := clippingPolygon.FromWKB(clippingPolygonWKB); err != nil { return nil, err } if err := clippingPolygonBuffered.FromWKB(buffered); err != nil { return nil, err } tin := tri.Tin() tin.EPSG = epsg var str octree.STRTree str.Build(tin) removed = str.Clip(&clippingPolygon) } if singleBeam { // Build the first mesh to generate random points on. feedback.Info("Build virtual DEM based on original XYZ data.") start = time.Now() var tree *octree.Tree { builder := octree.NewBuilder(tri.Tin()) builder.Build(removed) tree = builder.Tree() } feedback.Info("Building took %v", time.Since(start)) feedback.Info("Boundary area: %.2fm²", polygonArea) numPoints := int(math.Ceil(polygonArea * pointsPerSquareMeter)) feedback.Info("Generate %d random points for an average density of ~%d points/m².", numPoints, pointsPerSquareMeter) start = time.Now() generated := make(octree.LineStringZ, 0, numPoints+clippingPolygon.NumVertices(0)) tree.GenerateRandomVertices(numPoints, func(vertices []octree.Vertex) { generated = append(generated, vertices...) }) feedback.Info("Generating %d points took %v.", len(generated), time.Since(start)) // Add the boundary to new point cloud. dupes := map[[2]float64]struct{}{} clippingPolygon.Vertices(0, func(x, y float64) { key := [2]float64{x, y} if _, found := dupes[key]; found { return } dupes[key] = struct{}{} if z, ok := tree.Value(x, y); ok { generated = append(generated, octree.Vertex{X: x, Y: y, Z: z}) } }) feedback.Info("Triangulate new point cloud.") xyz = octree.MultiPointZ(generated) start = time.Now() tri, err = octree.Triangulate(xyz) if err != nil { return nil, err } feedback.Info("Second triangulation took %v.", time.Since(start)) feedback.Info("Number triangles: %d.", len(tri.Triangles)/3) feedback.Info("Clipping triangles from new mesh.") } else { // multi beam // Nothing special } start = time.Now() tin = tri.Tin() tin.EPSG = epsg var str octree.STRTree str.Build(tin) feedback.Info("Building STR tree took %v", time.Since(start)) start = time.Now() clippingPolygonBuffered.Indexify() removed = str.Clip(&clippingPolygonBuffered) feedback.Info("Clipping STR tree took %v.", time.Since(start)) feedback.Info("Number of triangles to clip %d.", len(removed)) feedback.Info("Build final octree index") builder := octree.NewBuilder(tin) builder.Build(removed) octreeIndex, err := builder.Bytes() if err != nil { return nil, err } feedback.Info("Building octree took %v.", time.Since(start)) feedback.Info("Store octree.") start = time.Now() var ( id int64 dummy uint32 lat, lon float64 ) var hull []byte err = tx.QueryRowContext( ctx, insertHullSQL, m.Bottleneck, m.Date.Time, m.DepthReference, nil, clippingPolygonWKB, epsg, ).Scan( &id, &lat, &lon, &dummy, &hull, ) switch { case err == sql.ErrNoRows: return nil, fmt.Errorf( "No matching bottleneck of given name or time available: %v", err) case err != nil: return nil, err } h := sha1.New() h.Write(octreeIndex) checksum := hex.EncodeToString(h.Sum(nil)) _, err = tx.ExecContext(ctx, insertOctreeSQL, id, checksum, octreeIndex) if err != nil { return nil, err } feedback.Info("Storing octree index took %s.", time.Since(start)) feedback.Info("Generate contour lines") start = time.Now() err = generateContours(ctx, tx, builder.Tree(), id) if err != nil { return nil, err } feedback.Info("Generating contour lines took %s.", time.Since(start)) // Store for potential later removal. if err = track(ctx, tx, importID, "waterway.sounding_results", id); err != nil { return nil, err } summary := struct { Bottleneck string `json:"bottleneck"` Date models.Date `json:"date"` Lat float64 `json:"lat"` Lon float64 `json:"lon"` }{ Bottleneck: m.Bottleneck, Date: m.Date, Lat: lat, Lon: lon, } return &summary, nil } // CleanUp removes the folder containing the ZIP file with the // the sounding result import. func (sr *SoundingResult) CleanUp() error { return os.RemoveAll(sr.Dir) } func (sr *SoundingResult) completeOverride() bool { // sr.EPSG == nil -> WGS84 return sr.Bottleneck != nil && sr.Date != nil && sr.DepthReference != nil && sr.SingleBeam != nil } func (sr *SoundingResult) loadMeta(f *zip.File) (*models.SoundingResultMeta, error) { if f == nil { var epsg uint if sr.EPSG != nil { epsg = *sr.EPSG } else { epsg = models.WGS84 } return &models.SoundingResultMeta{ Date: *sr.Date, Bottleneck: *sr.Bottleneck, EPSG: epsg, DepthReference: *sr.DepthReference, }, nil } r, err := f.Open() if err != nil { return nil, err } defer r.Close() var m models.SoundingResultMeta if err := m.Decode(r); err != nil { return nil, err } // Apply overrides if sr.Date != nil { m.Date = *sr.Date } if sr.Bottleneck != nil { m.Bottleneck = *sr.Bottleneck } if sr.EPSG != nil { m.EPSG = *sr.EPSG } if sr.DepthReference != nil { m.DepthReference = *sr.DepthReference } if sr.SingleBeam != nil { m.SingleBeam = *sr.SingleBeam } return &m, nil } type vertexTransform func(octree.Vertex) octree.Vertex func loadXYZReader(r io.Reader, feedback Feedback, xform vertexTransform) (octree.MultiPointZ, error) { mpz := make(octree.MultiPointZ, 0, 250000) s := bufio.NewScanner(r) var hasNegZ bool warnLimiter := misc.WarningLimiter{Log: feedback.Warn, MaxWarnings: 100} warn := warnLimiter.Warn defer warnLimiter.Close() for line := 1; s.Scan(); line++ { text := s.Text() var p octree.Vertex // fmt.Sscanf(text, "%f,%f,%f") is 4 times slower. idx := strings.IndexByte(text, ',') if idx == -1 { warn("format error in line %d", line) continue } var err error if p.X, err = strconv.ParseFloat(text[:idx], 64); err != nil { warn("format error in line %d: %v", line, err) continue } text = text[idx+1:] if idx = strings.IndexByte(text, ','); idx == -1 { feedback.Warn("format error in line %d", line) continue } if p.Y, err = strconv.ParseFloat(text[:idx], 64); err != nil { warn("format error in line %d: %v", line, err) continue } text = text[idx+1:] if p.Z, err = strconv.ParseFloat(text, 64); err != nil { warn("format error in line %d: %v", line, err) continue } if p.Z < 0 { p.Z = -p.Z if !hasNegZ { hasNegZ = true warn("Negative Z value found: Using -Z") } } if xform != nil { p = xform(p) } mpz = append(mpz, p) } if err := s.Err(); err != nil { return nil, err } return mpz, nil } func loadXYZ(f *zip.File, feedback Feedback, xform vertexTransform) (octree.MultiPointZ, error) { r, err := f.Open() if err != nil { return nil, err } defer r.Close() return loadXYZReader(r, feedback, xform) } func loadXYZFile(f string, feedback Feedback, xform vertexTransform) (octree.MultiPointZ, error) { r, err := os.Open(f) if err != nil { return nil, err } defer r.Close() return loadXYZReader(r, feedback, xform) } func loadBoundary(z *zip.ReadCloser) (polygonSlice, error) { shpF := common.FindInZIP(z, ".shp") if shpF == nil { return nil, nil } prefix := strings.TrimSuffix(shpF.Name, path.Ext(shpF.Name)) dbfF := common.FindInZIP(z, prefix+".dbf") if dbfF == nil { return nil, fmt.Errorf("No DBF file found for %s", shpF.Name) } shpR, err := shpF.Open() if err != nil { return nil, err } dbfR, err := dbfF.Open() if err != nil { shpR.Close() return nil, err } sr := shp.SequentialReaderFromExt(shpR, dbfR) defer sr.Close() if !sr.Next() { return nil, sr.Err() } _, s := sr.Shape() if s == nil { return nil, sr.Err() } return shapeToPolygon(s) } func generateContours( ctx context.Context, tx *sql.Tx, tree *octree.Tree, id int64, ) error { stmt, err := tx.PrepareContext(ctx, insertContourSQL) if err != nil { return err } defer stmt.Close() // Adjust contour lines heights to multiples of contourStepWidth var heights []float64 h := contourStepWidth * math.Ceil(tree.Min.Z/contourStepWidth) for ; h <= tree.Max.Z; h += contourStepWidth { heights = append(heights, h) } octree.DoContours(tree, heights, func(res *octree.ContourResult) { if err == nil && len(res.Lines) > 0 { _, err = stmt.ExecContext( ctx, id, res.Height, tree.EPSG, res.Lines.AsWKB2D(), contourTolerance) } }) return err }