Mercurial > gemma
view pkg/imports/sr.go @ 3656:2a079d0a71c1
Ensure sounding results are associated to matching bottleneck version
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Thu, 13 Jun 2019 19:13:42 +0200 |
parents | 02951a62e8c6 |
children | 66f2cb789905 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Tom Gottfried <tom@intevation.de> package imports import ( "archive/zip" "bufio" "context" "crypto/sha1" "database/sql" "encoding/hex" "errors" "fmt" "io" "math" "os" "path" "path/filepath" "strconv" "strings" "time" shp "github.com/jonas-p/go-shp" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/misc" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/octree" ) // SoundingResult is a Job to import sounding reults // from a ZIP file into the database. type SoundingResult struct { // Dir is the folder in the file system where the // 'sr.zip' is expect to be. Dir string `json:"dir"` // Override data // Date if given overrides the date value from the meta.json. Date *models.Date `json:"date,omitempty"` // Date if given overrides the name of the bottleneck from the meta.json. Bottleneck *string `json:"bottleneck,omitempty"` // EPSG if given overrides the EPSG code from the meta.json. // Defaults to WGS84. EPSG *uint `json:"epsg,omitempty"` // DepthReference if given overides the DepthReference value // from the meta.json. DepthReference *string `json:"depth-reference,omitempty"` // SingleBeam indicates that the sounding is a single beam scan. SingleBeam *bool `json:"single-beam,omitempty"` } const ( contourStepWidth = 0.1 contourTolerance = 0.1 ) // SRJobKind is the unique name of a SoundingResult import job. const SRJobKind JobKind = "sr" type srJobCreator struct{} func init() { RegisterJobCreator(SRJobKind, srJobCreator{}) } func (srJobCreator) Description() string { return "sounding results" } func (srJobCreator) AutoAccept() bool { return false } func (srJobCreator) Create() Job { return new(SoundingResult) } func (srJobCreator) Depends() [2][]string { return [2][]string{ {"sounding_results", "sounding_results_contour_lines"}, {"bottlenecks"}, } } func (srJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, ) error { _, err := tx.ExecContext(ctx, srStageDoneSQL, id) return err } const ( srStageDoneSQL = ` UPDATE waterway.sounding_results SET staging_done = true WHERE id = ( SELECT key from import.track_imports WHERE import_id = $1 AND relation = 'waterway.sounding_results'::regclass)` insertHullSQL = ` INSERT INTO waterway.sounding_results ( bottleneck_id, bottleneck_validity, date_info, depth_reference, area ) SELECT bottleneck_id, validity, $2::date, $3, (SELECT CASE WHEN $5::bytea IS NULL THEN ST_Transform(ST_ConcaveHull(ST_Force2D(ST_GeomFromWKB($4, $6::integer)), 0.7), 4326)::geography ELSE ST_Transform(ST_GeomFromWKB($5, $6::integer), 4326)::geography END) FROM waterway.bottlenecks WHERE objnam = $1 AND validity @> CAST($2 AS timestamptz) RETURNING id, ST_X(ST_Centroid(area::geometry)), ST_Y(ST_Centroid(area::geometry)), best_utm(area), ST_AsBinary(ST_Transform(area::geometry, best_utm(area))) ` reprojectPointsSQL = ` SELECT ST_AsBinary(ST_Transform(ST_GeomFromWKB($1, $2::integer), $3::integer)) ` insertOctreeSQL = ` UPDATE waterway.sounding_results SET octree_checksum = $2, octree_index = $3 WHERE id = $1` insertContourSQL = ` INSERT INTO waterway.sounding_results_contour_lines ( sounding_result_id, height, lines ) SELECT $1, $2, ST_Transform( ST_Multi( ST_CollectionExtract( ST_SimplifyPreserveTopology( ST_Multi(ST_Collectionextract( ST_MakeValid(ST_GeomFromWKB($4, $3::integer)), 2)), $5 ), 2 ) ), 4326 ) FROM waterway.sounding_results sr WHERE id = $1 ` selectGaugeLDCSQL = ` SELECT grwl.value, grwl.depth_reference FROM waterway.gauges_reference_water_levels grwl JOIN waterway.bottlenecks bns ON grwl.location = bns.gauge_location AND grwl.validity = bns.gauge_validity WHERE bns.objnam = $1 AND bns.validity @> CAST($2 AS timestamptz) AND grwl.depth_reference like 'LDC%' ` reprojectPointsSingleBeamSQL = ` SELECT ST_AsBinary( ST_Transform( ST_GeomFromWKB($1, $2::integer), best_utm(CAST(ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326) AS geography)))), best_utm(CAST(ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326) AS geography)) ` ) func (sr *SoundingResult) isSingleBeam() bool { return sr.SingleBeam != nil && *sr.SingleBeam } // Do executes the actual sounding result import. func (sr *SoundingResult) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { start := time.Now() z, err := zip.OpenReader(filepath.Join(sr.Dir, "sr.zip")) if err != nil { return nil, err } defer z.Close() feedback.Info("Looking for 'meta.json'") mf := common.FindInZIP(z, "meta.json") if mf == nil && !sr.completeOverride() { return nil, errors.New("Cannot find 'meta.json'") } m, err := sr.loadMeta(mf) if err != nil { return nil, err } feedback.Info("Bottleneck: %s", m.Bottleneck) feedback.Info("Survey date: %s", m.Date.Format(common.DateFormat)) var xform vertexTransform if m.DepthReference == "ZPG" { feedback.Info("Found ZPG as reference system -> translating Z values to LDC") var ldc float64 var depthReference string err := conn.QueryRowContext(ctx, selectGaugeLDCSQL, m.Bottleneck, m.Date.Time, ).Scan( &ldc, &depthReference, ) switch { case err == sql.ErrNoRows: return nil, errors.New("Cannot load LDC value") case err != nil: return nil, err } xform = func(v octree.Vertex) octree.Vertex { return octree.Vertex{X: v.X, Y: v.Y, Z: ldc - v.Z} } m.DepthReference = depthReference } if err := m.Validate(ctx, conn); err != nil { return nil, common.ToError(err) } var xyzf *zip.File for _, ext := range []string{".xyz", ".txt"} { feedback.Info("Looking for '*%s'", ext) if xyzf = common.FindInZIP(z, ext); xyzf != nil { break } } if xyzf == nil { return nil, errors.New("Cannot find any *.xyz or *.txt file") } xyz, err := loadXYZ(xyzf, feedback, xform) if err != nil { return nil, err } if len(xyz) == 0 { return nil, errors.New("XYZ does not contain any vertices") } // Is there a boundary shapefile in the ZIP archive? boundary, err := loadBoundary(z) if err != nil { return nil, err } tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() var summary interface{} if sr.isSingleBeam() { summary, err = sr.singleBeamScan( ctx, tx, feedback, importID, m, xyz, boundary, ) } else { summary, err = sr.multiBeamScan( ctx, tx, feedback, importID, m, xyz, boundary, ) } if err != nil { return nil, err } if err = tx.Commit(); err != nil { feedback.Error( "Storing sounding result failed after %v.", time.Since(start)) return nil, err } feedback.Info( "Storing sounding result was successful after %v.", time.Since(start)) return summary, nil } func (sr *SoundingResult) singleBeamScan( ctx context.Context, tx *sql.Tx, feedback Feedback, importID int64, m *models.SoundingResultMeta, xyz octree.MultiPointZ, boundary polygonSlice, ) (interface{}, error) { feedback.Info("Processing as single beam scan.") start := time.Now() xyzWKB := xyz.AsWKB() var reproj []byte var epsg uint if err := tx.QueryRowContext( ctx, reprojectPointsSingleBeamSQL, xyzWKB, m.EPSG, ).Scan(&reproj, &epsg); err != nil { return nil, err } if err := xyz.FromWKB(reproj); err != nil { return nil, err } feedback.Info("Reprojecting points to EPSG %d took %v.", epsg, time.Since(start)) feedback.Info("Number of reprojected points: %d", len(xyz)) start = time.Now() tri, err := octree.Triangulate(xyz) if err != nil { return nil, err } feedback.Info("Triangulation took %v.", time.Since(start)) feedback.Info("Number triangles: %d.", len(tri.Triangles)/3) if boundary == nil { feedback.Info("No boundary given.") feedback.Info("Eliminate triangles with long edges.") tri.ConcaveHull(50.0) } // TODO: Implement me! return nil, errors.New("Not implemented, yet!") } func (sr *SoundingResult) multiBeamScan( ctx context.Context, tx *sql.Tx, feedback Feedback, importID int64, m *models.SoundingResultMeta, xyz octree.MultiPointZ, boundary polygonSlice, ) (interface{}, error) { feedback.Info("Processing as multi beam scan.") var ( id int64 epsg uint32 lat, lon float64 ) start := time.Now() var hull []byte xyzWKB := xyz.AsWKB() err := tx.QueryRowContext( ctx, insertHullSQL, m.Bottleneck, m.Date.Time, m.DepthReference, xyzWKB, boundary.asWKB(), m.EPSG, ).Scan( &id, &lat, &lon, &epsg, &hull, ) xyz, boundary = nil, nil // not need from now on. feedback.Info("Calculating hull took %s.", time.Since(start)) switch { case err == sql.ErrNoRows: return nil, fmt.Errorf( "No bottleneck matching given name and time available") case err != nil: return nil, err } feedback.Info("Best suited UTM EPSG: %d", epsg) start = time.Now() var clippingPolygon octree.Polygon if err := clippingPolygon.FromWKB(hull); err != nil { return nil, err } clippingPolygon.Indexify() feedback.Info("Building clipping polygon took %v.", time.Since(start)) start = time.Now() var reproj []byte if err = tx.QueryRowContext( ctx, reprojectPointsSQL, xyzWKB, m.EPSG, epsg, ).Scan(&reproj); err != nil { return nil, err } if err := xyz.FromWKB(reproj); err != nil { return nil, err } feedback.Info("Reprojecting points took %v.", time.Since(start)) feedback.Info("Number of reprojected points: %d", len(xyz)) start = time.Now() tri, err := octree.Triangulate(xyz) if err != nil { return nil, err } feedback.Info("Triangulation took %v.", time.Since(start)) start = time.Now() tin := tri.Tin() var str octree.STRTree str.Build(tin) feedback.Info("Building STR tree took %v", time.Since(start)) start = time.Now() removed := str.Clip(&clippingPolygon) feedback.Info("Clipping STR tree took %v.", time.Since(start)) feedback.Info("Number of triangles to clip %d.", len(removed)) start = time.Now() tin.EPSG = epsg builder := octree.NewBuilder(tin) builder.Build(removed) octreeIndex, err := builder.Bytes() if err != nil { return nil, err } feedback.Info("Building octree took %v.", time.Since(start)) start = time.Now() h := sha1.New() h.Write(octreeIndex) checksum := hex.EncodeToString(h.Sum(nil)) _, err = tx.ExecContext(ctx, insertOctreeSQL, id, checksum, octreeIndex) if err != nil { return nil, err } feedback.Info("Storing octree index took %s.", time.Since(start)) tree := builder.Tree() start = time.Now() err = generateContours(ctx, tx, tree, id) if err != nil { return nil, err } feedback.Info("Generating and storing contour lines took %s.", time.Since(start)) // Store for potential later removal. if err = track(ctx, tx, importID, "waterway.sounding_results", id); err != nil { return nil, err } summary := struct { Bottleneck string `json:"bottleneck"` Date models.Date `json:"date"` Lat float64 `json:"lat"` Lon float64 `json:"lon"` }{ Bottleneck: m.Bottleneck, Date: m.Date, Lat: lat, Lon: lon, } return &summary, nil } // CleanUp removes the folder containing the ZIP file with the // the sounding result import. func (sr *SoundingResult) CleanUp() error { return os.RemoveAll(sr.Dir) } func (sr *SoundingResult) completeOverride() bool { // sr.EPSG == nil -> WGS84 return sr.Bottleneck != nil && sr.Date != nil && sr.DepthReference != nil && sr.SingleBeam != nil } func (sr *SoundingResult) loadMeta(f *zip.File) (*models.SoundingResultMeta, error) { if f == nil { var epsg uint if sr.EPSG != nil { epsg = *sr.EPSG } else { epsg = models.WGS84 } return &models.SoundingResultMeta{ Date: *sr.Date, Bottleneck: *sr.Bottleneck, EPSG: epsg, DepthReference: *sr.DepthReference, }, nil } r, err := f.Open() if err != nil { return nil, err } defer r.Close() var m models.SoundingResultMeta if err := m.Decode(r); err != nil { return nil, err } // Apply overrides if sr.Date != nil { m.Date = *sr.Date } if sr.Bottleneck != nil { m.Bottleneck = *sr.Bottleneck } if sr.EPSG != nil { m.EPSG = *sr.EPSG } if sr.DepthReference != nil { m.DepthReference = *sr.DepthReference } if sr.SingleBeam != nil { m.SingleBeam = *sr.SingleBeam } return &m, nil } type vertexTransform func(octree.Vertex) octree.Vertex func loadXYZReader(r io.Reader, feedback Feedback, xform vertexTransform) (octree.MultiPointZ, error) { mpz := make(octree.MultiPointZ, 0, 250000) s := bufio.NewScanner(r) var hasNegZ bool warnLimiter := misc.WarningLimiter{Log: feedback.Warn, MaxWarnings: 100} warn := warnLimiter.Warn defer warnLimiter.Close() for line := 1; s.Scan(); line++ { text := s.Text() var p octree.Vertex // fmt.Sscanf(text, "%f,%f,%f") is 4 times slower. idx := strings.IndexByte(text, ',') if idx == -1 { warn("format error in line %d", line) continue } var err error if p.X, err = strconv.ParseFloat(text[:idx], 64); err != nil { warn("format error in line %d: %v", line, err) continue } text = text[idx+1:] if idx = strings.IndexByte(text, ','); idx == -1 { feedback.Warn("format error in line %d", line) continue } if p.Y, err = strconv.ParseFloat(text[:idx], 64); err != nil { warn("format error in line %d: %v", line, err) continue } text = text[idx+1:] if p.Z, err = strconv.ParseFloat(text, 64); err != nil { warn("format error in line %d: %v", line, err) continue } if p.Z < 0 { p.Z = -p.Z if !hasNegZ { hasNegZ = true warn("Negative Z value found: Using -Z") } } if xform != nil { p = xform(p) } mpz = append(mpz, p) } if err := s.Err(); err != nil { return nil, err } return mpz, nil } func loadXYZ(f *zip.File, feedback Feedback, xform vertexTransform) (octree.MultiPointZ, error) { r, err := f.Open() if err != nil { return nil, err } defer r.Close() return loadXYZReader(r, feedback, xform) } func loadBoundary(z *zip.ReadCloser) (polygonSlice, error) { shpF := common.FindInZIP(z, ".shp") if shpF == nil { return nil, nil } prefix := strings.TrimSuffix(shpF.Name, path.Ext(shpF.Name)) dbfF := common.FindInZIP(z, prefix+".dbf") if dbfF == nil { return nil, fmt.Errorf("No DBF file found for %s", shpF.Name) } shpR, err := shpF.Open() if err != nil { return nil, err } dbfR, err := dbfF.Open() if err != nil { shpR.Close() return nil, err } sr := shp.SequentialReaderFromExt(shpR, dbfR) defer sr.Close() if !sr.Next() { return nil, sr.Err() } _, s := sr.Shape() if s == nil { return nil, sr.Err() } return shapeToPolygon(s) } func generateContours( ctx context.Context, tx *sql.Tx, tree *octree.Tree, id int64, ) error { stmt, err := tx.PrepareContext(ctx, insertContourSQL) if err != nil { return err } defer stmt.Close() // Adjust contour lines heights to multiples of contourStepWidth var heights []float64 h := contourStepWidth * math.Ceil(tree.Min.Z/contourStepWidth) for ; h <= tree.Max.Z; h += contourStepWidth { heights = append(heights, h) } octree.DoContours(tree, heights, func(res *octree.ContourResult) { if err == nil && len(res.Lines) > 0 { _, err = stmt.ExecContext( ctx, id, res.Height, tree.EPSG, res.Lines.AsWKB2D(), contourTolerance) } }) return err }