Mercurial > gemma
view pkg/imports/sr.go @ 3705:7006b92c0334
Handle updates (vs. historized and new versions) separately.
We need this distinction as updated data currently can not be
reviewed. More precisely: it can not be declined after review, as the
old data is updated in place.
The current exclusion from the review is a workaround and not meant to
be the final solution. Note that there are additional minor problems,
like the fact that the updated data is not counted as changed data for
the import.
author | Sascha Wilde <wilde@intevation.de> |
---|---|
date | Wed, 19 Jun 2019 17:00:08 +0200 |
parents | 58508f50d192 |
children | ec86a7155377 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Tom Gottfried <tom@intevation.de> package imports import ( "archive/zip" "bufio" "context" "crypto/sha1" "database/sql" "encoding/hex" "errors" "fmt" "io" "math" "os" "path" "path/filepath" "strconv" "strings" "time" shp "github.com/jonas-p/go-shp" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/misc" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/octree" ) // SoundingResult is a Job to import sounding reults // from a ZIP file into the database. type SoundingResult struct { // Dir is the folder in the file system where the // 'sr.zip' is expect to be. Dir string `json:"dir"` // Override data // Date if given overrides the date value from the meta.json. Date *models.Date `json:"date,omitempty"` // Date if given overrides the name of the bottleneck from the meta.json. Bottleneck *string `json:"bottleneck,omitempty"` // EPSG if given overrides the EPSG code from the meta.json. // Defaults to WGS84. EPSG *uint `json:"epsg,omitempty"` // DepthReference if given overides the DepthReference value // from the meta.json. DepthReference *string `json:"depth-reference,omitempty"` // SingleBeam indicates that the sounding is a single beam scan. SingleBeam *bool `json:"single-beam,omitempty"` } const ( contourStepWidth = 0.1 contourTolerance = 0.1 ) const ( tooLongEdge = 50.0 pointsPerSquareMeter = 2 ) // SRJobKind is the unique name of a SoundingResult import job. const SRJobKind JobKind = "sr" type srJobCreator struct{} func init() { RegisterJobCreator(SRJobKind, srJobCreator{}) } func (srJobCreator) Description() string { return "sounding results" } func (srJobCreator) AutoAccept() bool { return false } func (srJobCreator) Create() Job { return new(SoundingResult) } func (srJobCreator) Depends() [2][]string { return [2][]string{ {"sounding_results", "sounding_results_contour_lines"}, {"bottlenecks"}, } } func (srJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, ) error { _, err := tx.ExecContext(ctx, srStageDoneSQL, id) return err } const ( srStageDoneSQL = ` UPDATE waterway.sounding_results SET staging_done = true WHERE id = ( SELECT key from import.track_imports WHERE import_id = $1 AND relation = 'waterway.sounding_results'::regclass)` insertHullSQL = ` INSERT INTO waterway.sounding_results ( bottleneck_id, bottleneck_validity, date_info, depth_reference, area ) SELECT bottleneck_id, validity, $2::date, $3, (SELECT CASE WHEN $5::bytea IS NULL THEN ST_Transform(ST_ConcaveHull(ST_Force2D(ST_GeomFromWKB($4, $6::integer)), 0.7), 4326)::geography ELSE ST_MakeValid(ST_Transform(ST_GeomFromWKB($5, $6::integer), 4326))::geography END) FROM waterway.bottlenecks WHERE objnam = $1 AND validity @> CAST($2 AS timestamptz) RETURNING id, ST_X(ST_Centroid(area::geometry)), ST_Y(ST_Centroid(area::geometry)), best_utm(area), ST_AsBinary(ST_Transform(area::geometry, best_utm(area))) ` reprojectPointsSQL = ` SELECT ST_AsBinary(ST_Transform(ST_GeomFromWKB($1, $2::integer), $3::integer))` reprojectPointsBufferedSQL = ` SELECT ST_AsBinary(ST_Transform(ST_GeomFromWKB($1, $2::integer), $3::integer)), ST_AsBinary(ST_Buffer(ST_Transform(ST_GeomFromWKB($1, $2::integer), $3::integer), 0.1)), ST_Area(ST_Transform(ST_GeomFromWKB($1, $2::integer), $3::integer))` insertOctreeSQL = ` UPDATE waterway.sounding_results SET octree_checksum = $2, octree_index = $3 WHERE id = $1` repairBoundarySQL = ` SELECT ST_AsBinary(ST_Buffer(ST_MakeValid(ST_GeomFromWKB($1, $2::integer)), 0.0)), ST_AsBinary(ST_Buffer(ST_MakeValid(ST_GeomFromWKB($1, $2::integer)), 0.1))` insertContourSQL = ` INSERT INTO waterway.sounding_results_contour_lines ( sounding_result_id, height, lines ) SELECT $1, $2, ST_Transform( ST_Multi( ST_CollectionExtract( ST_SimplifyPreserveTopology( ST_Multi(ST_Collectionextract( ST_MakeValid(ST_GeomFromWKB($4, $3::integer)), 2)), $5 ), 2 ) ), 4326 ) FROM waterway.sounding_results sr WHERE id = $1 ` selectGaugeLDCSQL = ` SELECT grwl.value, grwl.depth_reference FROM waterway.gauges_reference_water_levels grwl JOIN waterway.bottlenecks bns ON grwl.location = bns.gauge_location AND grwl.validity = bns.gauge_validity WHERE bns.objnam = $1 AND bns.validity @> CAST($2 AS timestamptz) AND grwl.depth_reference like 'LDC%' ` reprojectPointsSingleBeamSQL = ` SELECT ST_AsBinary( ST_Transform( ST_GeomFromWKB($1, $2::integer), best_utm(CAST(ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326) AS geography)))), best_utm(CAST(ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326) AS geography)) ` ) func (sr *SoundingResult) isSingleBeam() bool { return sr.SingleBeam != nil && *sr.SingleBeam } // Do executes the actual sounding result import. func (sr *SoundingResult) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() z, err := zip.OpenReader(filepath.Join(sr.Dir, "sr.zip")) if err != nil { return nil, err } defer z.Close() feedback.Info("Looking for 'meta.json'") mf := common.FindInZIP(z, "meta.json") if mf == nil && !sr.completeOverride() { return nil, errors.New("Cannot find 'meta.json'") } m, err := sr.loadMeta(mf) if err != nil { return nil, err } feedback.Info("Bottleneck: %s", m.Bottleneck) feedback.Info("Survey date: %s", m.Date.Format(common.DateFormat)) var xform vertexTransform if m.DepthReference == "ZPG" { feedback.Info("Found ZPG as reference system -> translating Z values to LDC") var ldc float64 var depthReference string err := conn.QueryRowContext(ctx, selectGaugeLDCSQL, m.Bottleneck, m.Date.Time, ).Scan( &ldc, &depthReference, ) switch { case err == sql.ErrNoRows: return nil, errors.New("Cannot load LDC value") case err != nil: return nil, err } xform = func(v octree.Vertex) octree.Vertex { return octree.Vertex{X: v.X, Y: v.Y, Z: ldc - v.Z} } m.DepthReference = depthReference } if err := m.Validate(ctx, conn); err != nil { return nil, common.ToError(err) } var xyzf *zip.File for _, ext := range []string{".xyz", ".txt"} { feedback.Info("Looking for '*%s'", ext) if xyzf = common.FindInZIP(z, ext); xyzf != nil { break } } if xyzf == nil { return nil, errors.New("Cannot find any *.xyz or *.txt file") } xyz, err := loadXYZ(xyzf, feedback, xform) if err != nil { return nil, err } if len(xyz) == 0 { return nil, errors.New("XYZ does not contain any vertices") } // Is there a boundary shapefile in the ZIP archive? boundary, err := loadBoundary(z) if err != nil { return nil, err } tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() var summary interface{} if sr.isSingleBeam() { summary, err = sr.singleBeamScan( ctx, tx, feedback, importID, m, xyz, boundary, ) } else { summary, err = sr.multiBeamScan( ctx, tx, feedback, importID, m, xyz, boundary, ) } if err != nil { return nil, err } if err = tx.Commit(); err != nil { feedback.Error( "Storing sounding result failed after %v.", time.Since(start)) return nil, err } feedback.Info( "Storing sounding result was successful after %v.", time.Since(start)) return summary, nil } func (sr *SoundingResult) singleBeamScan( ctx context.Context, tx *sql.Tx, feedback Feedback, importID int64, m *models.SoundingResultMeta, xyz octree.MultiPointZ, boundary polygonSlice, ) (interface{}, error) { feedback.Info("Processing as single beam scan.") feedback.Info("Reproject XYZ data.") start := time.Now() xyzWKB := xyz.AsWKB() var reproj []byte var epsg uint if err := tx.QueryRowContext( ctx, reprojectPointsSingleBeamSQL, xyzWKB, m.EPSG, ).Scan(&reproj, &epsg); err != nil { return nil, err } if err := xyz.FromWKB(reproj); err != nil { return nil, err } feedback.Info("Reprojecting points to EPSG %d took %v.", epsg, time.Since(start)) feedback.Info("Number of reprojected points: %d", len(xyz)) feedback.Info("Triangulate XYZ data.") start = time.Now() tri, err := octree.Triangulate(xyz) if err != nil { return nil, err } feedback.Info("Triangulation took %v.", time.Since(start)) feedback.Info("Number triangles: %d.", len(tri.Triangles)/3) var ( clippingPolygon octree.Polygon clippingPolygonBuffered octree.Polygon removed map[int32]struct{} polygonArea float64 clippingPolygonWKB []byte ) if boundary == nil { feedback.Info("No boundary given. Calulate from XYZ data.") feedback.Info("Eliminate triangles with edges longer than %.2f meters.", tooLongEdge) var polygon octree.LineStringZ start = time.Now() polygon, removed = tri.ConcaveHull(tooLongEdge) polygonArea = polygon.Area() if polygonArea < 0.0 { // counter clockwise polygonArea = -polygonArea polygon.Reverse() } clippingPolygon.FromLineStringZ(polygon) var buffered []byte if err := tx.QueryRowContext( ctx, repairBoundarySQL, clippingPolygon.AsWKB(), epsg, ).Scan(&clippingPolygonWKB, &buffered); err != nil { return nil, err } if err := clippingPolygon.FromWKB(clippingPolygonWKB); err != nil { return nil, err } if err := clippingPolygonBuffered.FromWKB(buffered); err != nil { return nil, err } feedback.Info("Calculating took %v.", time.Since(start)) } else { // has Boundary feedback.Info("Using uploaded boundary polygon.") var buffered []byte if err = tx.QueryRowContext( ctx, reprojectPointsBufferedSQL, boundary.asWKB(), m.EPSG, epsg, ).Scan(&clippingPolygonWKB, &buffered, &polygonArea); err != nil { return nil, err } if err := clippingPolygon.FromWKB(clippingPolygonWKB); err != nil { return nil, err } if err := clippingPolygonBuffered.FromWKB(buffered); err != nil { return nil, err } tin := tri.Tin() tin.EPSG = uint32(epsg) var str octree.STRTree str.Build(tin) removed = str.Clip(&clippingPolygon) } // Build the first mesh to generate random points on. feedback.Info("Build virtual DEM based on original XYZ data.") start = time.Now() var tree *octree.Tree { builder := octree.NewBuilder(tri.Tin()) builder.Build(removed) tree = builder.Tree() } feedback.Info("Building took %v", time.Since(start)) feedback.Info("Boundary area: %.2fm²", polygonArea) numPoints := int(math.Ceil(polygonArea * pointsPerSquareMeter)) feedback.Info("Generate %d random points for an average density of ~%d points/m².", numPoints, pointsPerSquareMeter) start = time.Now() generated := make(octree.LineStringZ, 0, numPoints+clippingPolygon.NumVertices(0)) tree.GenerateRandomVertices(numPoints, func(vertices []octree.Vertex) { generated = append(generated, vertices...) }) feedback.Info("Generating %d points took %v.", len(generated), time.Since(start)) // Add the boundary to new point cloud. dupes := map[[2]float64]struct{}{} clippingPolygon.Vertices(0, func(x, y float64) { key := [2]float64{x, y} if _, found := dupes[key]; found { return } dupes[key] = struct{}{} if z, ok := tree.Value(x, y); ok { generated = append(generated, octree.Vertex{X: x, Y: y, Z: z}) } }) feedback.Info("Triangulate new point cloud.") start = time.Now() xyz = octree.MultiPointZ(generated) tri, err = octree.Triangulate(xyz) if err != nil { return nil, err } feedback.Info("Second triangulation took %v.", time.Since(start)) feedback.Info("Number triangles: %d.", len(tri.Triangles)/3) feedback.Info("Clipping triangles from new mesh.") start = time.Now() tin := tri.Tin() tin.EPSG = uint32(epsg) var str octree.STRTree str.Build(tin) feedback.Info("Building STR tree took %v", time.Since(start)) start = time.Now() clippingPolygonBuffered.Indexify() removed = str.Clip(&clippingPolygonBuffered) feedback.Info("Clipping STR tree took %v.", time.Since(start)) feedback.Info("Number of triangles to clip %d.", len(removed)) start = time.Now() feedback.Info("Build final octree index") builder := octree.NewBuilder(tin) builder.Build(removed) octreeIndex, err := builder.Bytes() if err != nil { return nil, err } feedback.Info("Building octree took %v.", time.Since(start)) feedback.Info("Store octree.") start = time.Now() var ( id int64 dummy uint32 lat, lon float64 ) var hull []byte err = tx.QueryRowContext( ctx, insertHullSQL, m.Bottleneck, m.Date.Time, m.DepthReference, nil, clippingPolygonWKB, epsg, ).Scan( &id, &lat, &lon, &dummy, &hull, ) switch { case err == sql.ErrNoRows: return nil, fmt.Errorf( "No matching bottleneck of given name or time available: %v", err) case err != nil: return nil, err } h := sha1.New() h.Write(octreeIndex) checksum := hex.EncodeToString(h.Sum(nil)) _, err = tx.ExecContext(ctx, insertOctreeSQL, id, checksum, octreeIndex) if err != nil { return nil, err } feedback.Info("Storing octree index took %s.", time.Since(start)) feedback.Info("Generate contour lines") start = time.Now() err = generateContours(ctx, tx, builder.Tree(), id) if err != nil { return nil, err } feedback.Info("Generating contour lines took %s.", time.Since(start)) // Store for potential later removal. if err = track(ctx, tx, importID, "waterway.sounding_results", id); err != nil { return nil, err } summary := struct { Bottleneck string `json:"bottleneck"` Date models.Date `json:"date"` Lat float64 `json:"lat"` Lon float64 `json:"lon"` }{ Bottleneck: m.Bottleneck, Date: m.Date, Lat: lat, Lon: lon, } return &summary, nil } func (sr *SoundingResult) multiBeamScan( ctx context.Context, tx *sql.Tx, feedback Feedback, importID int64, m *models.SoundingResultMeta, xyz octree.MultiPointZ, boundary polygonSlice, ) (interface{}, error) { feedback.Info("Processing as multi beam scan.") var ( id int64 epsg uint32 lat, lon float64 ) start := time.Now() var hull []byte xyzWKB := xyz.AsWKB() err := tx.QueryRowContext( ctx, insertHullSQL, m.Bottleneck, m.Date.Time, m.DepthReference, xyzWKB, boundary.asWKB(), m.EPSG, ).Scan( &id, &lat, &lon, &epsg, &hull, ) xyz, boundary = nil, nil // not need from now on. feedback.Info("Calculating hull took %s.", time.Since(start)) switch { case err == sql.ErrNoRows: return nil, fmt.Errorf( "No matching bottleneck of given name or time available: %v", err) case err != nil: return nil, err } feedback.Info("Best suited UTM EPSG: %d", epsg) start = time.Now() var clippingPolygon octree.Polygon if err := clippingPolygon.FromWKB(hull); err != nil { return nil, err } clippingPolygon.Indexify() feedback.Info("Building clipping polygon took %v.", time.Since(start)) start = time.Now() var reproj []byte if err = tx.QueryRowContext( ctx, reprojectPointsSQL, xyzWKB, m.EPSG, epsg, ).Scan(&reproj); err != nil { return nil, err } if err := xyz.FromWKB(reproj); err != nil { return nil, err } feedback.Info("Reprojecting points took %v.", time.Since(start)) feedback.Info("Number of reprojected points: %d", len(xyz)) start = time.Now() tri, err := octree.Triangulate(xyz) if err != nil { return nil, err } feedback.Info("Triangulation took %v.", time.Since(start)) start = time.Now() tin := tri.Tin() var str octree.STRTree str.Build(tin) feedback.Info("Building STR tree took %v", time.Since(start)) start = time.Now() removed := str.Clip(&clippingPolygon) feedback.Info("Clipping STR tree took %v.", time.Since(start)) feedback.Info("Number of triangles to clip %d.", len(removed)) start = time.Now() tin.EPSG = epsg builder := octree.NewBuilder(tin) builder.Build(removed) octreeIndex, err := builder.Bytes() if err != nil { return nil, err } feedback.Info("Building octree took %v.", time.Since(start)) start = time.Now() h := sha1.New() h.Write(octreeIndex) checksum := hex.EncodeToString(h.Sum(nil)) _, err = tx.ExecContext(ctx, insertOctreeSQL, id, checksum, octreeIndex) if err != nil { return nil, err } feedback.Info("Storing octree index took %s.", time.Since(start)) tree := builder.Tree() start = time.Now() err = generateContours(ctx, tx, tree, id) if err != nil { return nil, err } feedback.Info("Generating and storing contour lines took %s.", time.Since(start)) // Store for potential later removal. if err = track(ctx, tx, importID, "waterway.sounding_results", id); err != nil { return nil, err } summary := struct { Bottleneck string `json:"bottleneck"` Date models.Date `json:"date"` Lat float64 `json:"lat"` Lon float64 `json:"lon"` }{ Bottleneck: m.Bottleneck, Date: m.Date, Lat: lat, Lon: lon, } return &summary, nil } // CleanUp removes the folder containing the ZIP file with the // the sounding result import. func (sr *SoundingResult) CleanUp() error { return os.RemoveAll(sr.Dir) } func (sr *SoundingResult) completeOverride() bool { // sr.EPSG == nil -> WGS84 return sr.Bottleneck != nil && sr.Date != nil && sr.DepthReference != nil && sr.SingleBeam != nil } func (sr *SoundingResult) loadMeta(f *zip.File) (*models.SoundingResultMeta, error) { if f == nil { var epsg uint if sr.EPSG != nil { epsg = *sr.EPSG } else { epsg = models.WGS84 } return &models.SoundingResultMeta{ Date: *sr.Date, Bottleneck: *sr.Bottleneck, EPSG: epsg, DepthReference: *sr.DepthReference, }, nil } r, err := f.Open() if err != nil { return nil, err } defer r.Close() var m models.SoundingResultMeta if err := m.Decode(r); err != nil { return nil, err } // Apply overrides if sr.Date != nil { m.Date = *sr.Date } if sr.Bottleneck != nil { m.Bottleneck = *sr.Bottleneck } if sr.EPSG != nil { m.EPSG = *sr.EPSG } if sr.DepthReference != nil { m.DepthReference = *sr.DepthReference } if sr.SingleBeam != nil { m.SingleBeam = *sr.SingleBeam } return &m, nil } type vertexTransform func(octree.Vertex) octree.Vertex func loadXYZReader(r io.Reader, feedback Feedback, xform vertexTransform) (octree.MultiPointZ, error) { mpz := make(octree.MultiPointZ, 0, 250000) s := bufio.NewScanner(r) var hasNegZ bool warnLimiter := misc.WarningLimiter{Log: feedback.Warn, MaxWarnings: 100} warn := warnLimiter.Warn defer warnLimiter.Close() for line := 1; s.Scan(); line++ { text := s.Text() var p octree.Vertex // fmt.Sscanf(text, "%f,%f,%f") is 4 times slower. idx := strings.IndexByte(text, ',') if idx == -1 { warn("format error in line %d", line) continue } var err error if p.X, err = strconv.ParseFloat(text[:idx], 64); err != nil { warn("format error in line %d: %v", line, err) continue } text = text[idx+1:] if idx = strings.IndexByte(text, ','); idx == -1 { feedback.Warn("format error in line %d", line) continue } if p.Y, err = strconv.ParseFloat(text[:idx], 64); err != nil { warn("format error in line %d: %v", line, err) continue } text = text[idx+1:] if p.Z, err = strconv.ParseFloat(text, 64); err != nil { warn("format error in line %d: %v", line, err) continue } if p.Z < 0 { p.Z = -p.Z if !hasNegZ { hasNegZ = true warn("Negative Z value found: Using -Z") } } if xform != nil { p = xform(p) } mpz = append(mpz, p) } if err := s.Err(); err != nil { return nil, err } return mpz, nil } func loadXYZ(f *zip.File, feedback Feedback, xform vertexTransform) (octree.MultiPointZ, error) { r, err := f.Open() if err != nil { return nil, err } defer r.Close() return loadXYZReader(r, feedback, xform) } func loadBoundary(z *zip.ReadCloser) (polygonSlice, error) { shpF := common.FindInZIP(z, ".shp") if shpF == nil { return nil, nil } prefix := strings.TrimSuffix(shpF.Name, path.Ext(shpF.Name)) dbfF := common.FindInZIP(z, prefix+".dbf") if dbfF == nil { return nil, fmt.Errorf("No DBF file found for %s", shpF.Name) } shpR, err := shpF.Open() if err != nil { return nil, err } dbfR, err := dbfF.Open() if err != nil { shpR.Close() return nil, err } sr := shp.SequentialReaderFromExt(shpR, dbfR) defer sr.Close() if !sr.Next() { return nil, sr.Err() } _, s := sr.Shape() if s == nil { return nil, sr.Err() } return shapeToPolygon(s) } func generateContours( ctx context.Context, tx *sql.Tx, tree *octree.Tree, id int64, ) error { stmt, err := tx.PrepareContext(ctx, insertContourSQL) if err != nil { return err } defer stmt.Close() // Adjust contour lines heights to multiples of contourStepWidth var heights []float64 h := contourStepWidth * math.Ceil(tree.Min.Z/contourStepWidth) for ; h <= tree.Max.Z; h += contourStepWidth { heights = append(heights, h) } octree.DoContours(tree, heights, func(res *octree.ContourResult) { if err == nil && len(res.Lines) > 0 { _, err = stmt.ExecContext( ctx, id, res.Height, tree.EPSG, res.Lines.AsWKB2D(), contourTolerance) } }) return err }