Mercurial > gemma
view pkg/imports/sr.go @ 2722:1c0307207162
Merged.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Tue, 19 Mar 2019 12:28:42 +0100 |
parents | c7ce8b011bcb |
children | 1b6840093eac |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> // * Tom Gottfried <tom@intevation.de> package imports import ( "archive/zip" "bufio" "context" "crypto/sha1" "database/sql" "encoding/hex" "errors" "fmt" "io" "math" "os" "path" "path/filepath" "strconv" "strings" "time" shp "github.com/jonas-p/go-shp" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/octree" ) // SoundingResult is a Job to import sounding reults // from a ZIP file into the database. type SoundingResult struct { // Dir is the folder in the file system where the // 'sr.zip' is expect to be. Dir string `json:"dir"` // Override data // Date if given overrides the date value from the meta.json. Date *models.Date `json:"date,omitempty"` // Date if given overrides the name of the bottleneck from the meta.json. Bottleneck *string `json:"bottleneck,omitempty"` // EPSG if given overrides the EPSG code from the meta.json. // Defaults to WGS84. EPSG *uint `json:"epsg,omitempty"` // DepthReference if given overides the DepthReference value // from the meta.json. DepthReference *string `json:"depth-reference,omitempty"` } const ( contourStepWidth = 0.1 contourTolerance = 0.1 ) // SRJobKind is the unique name of a SoundingResult import job. const SRJobKind JobKind = "sr" type srJobCreator struct{} func init() { RegisterJobCreator(SRJobKind, srJobCreator{}) } func (srJobCreator) Description() string { return "sounding results" } func (srJobCreator) AutoAccept() bool { return false } func (srJobCreator) Create() Job { return new(SoundingResult) } func (srJobCreator) Depends() []string { return []string{ "sounding_results", "sounding_results_contour_lines", "bottlenecks", } } func (srJobCreator) StageDone( ctx context.Context, tx *sql.Tx, id int64, ) error { _, err := tx.ExecContext(ctx, srStageDoneSQL, id) return err } const ( srStageDoneSQL = ` UPDATE waterway.sounding_results SET staging_done = true WHERE id = ( SELECT key from import.track_imports WHERE import_id = $1 AND relation = 'waterway.sounding_results'::regclass)` insertHullSQL = ` INSERT INTO waterway.sounding_results ( bottleneck_id, date_info, depth_reference, area ) SELECT (SELECT id from waterway.bottlenecks where objnam = $1), $2::date, $3, (SELECT CASE WHEN $5::bytea IS NULL THEN ST_Transform(ST_ConcaveHull(ST_Force2D(ST_GeomFromWKB($4, $6::integer)), 0.7), 4326)::geography ELSE ST_Transform(ST_GeomFromWKB($5, $6::integer), 4326)::geography END) RETURNING id, ST_X(ST_Centroid(area::geometry)), ST_Y(ST_Centroid(area::geometry)), best_utm(area::geometry), ST_AsBinary(ST_Transform(area::geometry, best_utm(area::geometry))) ` reprojectPointsSQL = ` SELECT ST_AsBinary(ST_Transform(ST_GeomFromWKB($1, $2::integer), $3::integer)) ` insertOctreeSQL = ` UPDATE waterway.sounding_results SET octree_checksum = $2, octree_index = $3 WHERE id = $1` insertContourSQL = ` INSERT INTO waterway.sounding_results_contour_lines ( sounding_result_id, height, lines ) SELECT $1, $2, ST_Transform( ST_Multi( ST_CollectionExtract( ST_SimplifyPreserveTopology( ST_Multi(ST_Collectionextract( ST_MakeValid(ST_GeomFromWKB($4, $3::integer)), 2)), $5 ), 2 ) ), 4326 ) FROM waterway.sounding_results sr WHERE id = $1 ` ) // Do executes the actual sounding result import. func (sr *SoundingResult) Do( ctx context.Context, importID int64, conn *sql.Conn, feedback Feedback, ) (interface{}, error) { begin := time.Now() z, err := zip.OpenReader(filepath.Join(sr.Dir, "sr.zip")) if err != nil { return nil, err } defer z.Close() feedback.Info("Looking for 'meta.json'") mf := common.FindInZIP(z, "meta.json") if mf == nil && !sr.completeOverride() { return nil, errors.New("Cannot find 'meta.json'") } m, err := sr.loadMeta(mf) if err != nil { return nil, err } if err := m.Validate(ctx, conn); err != nil { return nil, common.ToError(err) } var xyzf *zip.File for _, ext := range []string{".xyz", ".txt"} { feedback.Info("Looking for '*%s'", ext) if xyzf = common.FindInZIP(z, ext); xyzf != nil { break } } if xyzf == nil { return nil, errors.New("Cannot find any *.xyz or *.txt file") } xyz, err := loadXYZ(xyzf, feedback) if err != nil { return nil, err } if len(xyz) == 0 { return nil, errors.New("XYZ does not contain any vertices") } // Is there a boundary shapefile in the ZIP archive? polygon, err := loadBoundary(z) if err != nil { return nil, err } tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() var ( id int64 epsg uint32 lat, lon float64 ) start := time.Now() var hull []byte xyzWKB := xyz.AsWKB() err = tx.QueryRow(insertHullSQL, m.Bottleneck, m.Date.Time, m.DepthReference, xyzWKB, polygon.asWKB(), m.EPSG, ).Scan( &id, &lat, &lon, &epsg, &hull, ) xyz, polygon = nil, nil // not need from now on. feedback.Info("Calculating hull took %s.", time.Since(start)) if err != nil { return nil, err } feedback.Info("Best suited UTM EPSG: %d", epsg) start = time.Now() var clippingPolygon octree.Polygon if err := clippingPolygon.FromWKB(hull); err != nil { return nil, err } clippingPolygon.Indexify() feedback.Info("Building clipping polygon took %v.", time.Since(start)) start = time.Now() var reproj []byte if err = tx.QueryRow(reprojectPointsSQL, xyzWKB, m.EPSG, epsg, ).Scan(&reproj); err != nil { return nil, err } if err := xyz.FromWKB(reproj); err != nil { return nil, err } feedback.Info("Reprojecting points took %v.", time.Since(start)) feedback.Info("Number of reprojected points: %d", len(xyz)) start = time.Now() tri, err := octree.Triangulate(xyz) if err != nil { return nil, err } feedback.Info("Triangulation took %v.", time.Since(start)) start = time.Now() tin := tri.Tin() var str octree.STRTree str.Build(tin) feedback.Info("Building STR tree took %v", time.Since(start)) start = time.Now() removed := str.Clip(&clippingPolygon) feedback.Info("Clipping STR tree took %v.", time.Since(start)) feedback.Info("Number of triangles to clip %d.", len(removed)) start = time.Now() tin.EPSG = epsg builder := octree.NewBuilder(tin) builder.Build(removed) octreeIndex, err := builder.Bytes() if err != nil { return nil, err } feedback.Info("Building octree took %v.", time.Since(start)) start = time.Now() h := sha1.New() h.Write(octreeIndex) checksum := hex.EncodeToString(h.Sum(nil)) _, err = tx.Exec(insertOctreeSQL, id, checksum, octreeIndex) if err != nil { return nil, err } feedback.Info("Storing octree index took %s.", time.Since(start)) tree := builder.Tree() start = time.Now() err = generateContours(tree, tx, id) if err != nil { return nil, err } feedback.Info("Generating and storing contour lines took %s.", time.Since(start)) // Store for potential later removal. if err = track(ctx, tx, importID, "waterway.sounding_results", id); err != nil { return nil, err } if err = tx.Commit(); err != nil { feedback.Error( "Storing sounding result failed after %v.", time.Since(begin)) return nil, err } feedback.Info( "Storing sounding result was successful after %v.", time.Since(begin)) summary := struct { Bottleneck string `json:"bottleneck"` Date models.Date `json:"date"` Lat float64 `json:"lat"` Lon float64 `json:"lon"` }{ Bottleneck: m.Bottleneck, Date: m.Date, Lat: lat, Lon: lon, } return &summary, nil } // CleanUp removes the folder containing the ZIP file with the // the sounding result import. func (sr *SoundingResult) CleanUp() error { return os.RemoveAll(sr.Dir) } func (sr *SoundingResult) completeOverride() bool { // sr.EPSG == nil -> WGS84 return sr.Bottleneck != nil && sr.Date != nil && sr.DepthReference != nil } func (sr *SoundingResult) loadMeta(f *zip.File) (*models.SoundingResultMeta, error) { if f == nil { var epsg uint if sr.EPSG != nil { epsg = *sr.EPSG } else { epsg = models.WGS84 } return &models.SoundingResultMeta{ Date: *sr.Date, Bottleneck: *sr.Bottleneck, EPSG: epsg, DepthReference: *sr.DepthReference, }, nil } r, err := f.Open() if err != nil { return nil, err } defer r.Close() var m models.SoundingResultMeta if err := m.Decode(r); err != nil { return nil, err } // Apply overrides if sr.Date != nil { m.Date = *sr.Date } if sr.Bottleneck != nil { m.Bottleneck = *sr.Bottleneck } if sr.EPSG != nil { m.EPSG = *sr.EPSG } if sr.DepthReference != nil { m.DepthReference = *sr.DepthReference } return &m, nil } func loadXYZReader(r io.Reader, feedback Feedback) (octree.MultiPointZ, error) { mpz := make(octree.MultiPointZ, 0, 250000) s := bufio.NewScanner(r) var hasNegZ bool for line := 1; s.Scan(); line++ { text := s.Text() var p octree.Vertex // fmt.Sscanf(text, "%f,%f,%f") is 4 times slower. idx := strings.IndexByte(text, ',') if idx == -1 { feedback.Warn("format error in line %d", line) continue } var err error if p.X, err = strconv.ParseFloat(text[:idx], 64); err != nil { feedback.Warn("format error in line %d: %v", line, err) continue } text = text[idx+1:] if idx = strings.IndexByte(text, ','); idx == -1 { feedback.Warn("format error in line %d", line) continue } if p.Y, err = strconv.ParseFloat(text[:idx], 64); err != nil { feedback.Warn("format error in line %d: %v", line, err) continue } text = text[idx+1:] if p.Z, err = strconv.ParseFloat(text, 64); err != nil { feedback.Warn("format error in line %d: %v", line, err) continue } if p.Z < 0 { p.Z = -p.Z if !hasNegZ { hasNegZ = true feedback.Warn("Negative Z value found: Using -Z") } } mpz = append(mpz, p) } if err := s.Err(); err != nil { return nil, err } return mpz, nil } func loadXYZ(f *zip.File, feedback Feedback) (octree.MultiPointZ, error) { r, err := f.Open() if err != nil { return nil, err } defer r.Close() return loadXYZReader(r, feedback) } func loadBoundary(z *zip.ReadCloser) (polygonSlice, error) { shpF := common.FindInZIP(z, ".shp") if shpF == nil { return nil, nil } prefix := strings.TrimSuffix(shpF.Name, path.Ext(shpF.Name)) dbfF := common.FindInZIP(z, prefix+".dbf") if dbfF == nil { return nil, fmt.Errorf("No DBF file found for %s", shpF.Name) } shpR, err := shpF.Open() if err != nil { return nil, err } dbfR, err := dbfF.Open() if err != nil { shpR.Close() return nil, err } sr := shp.SequentialReaderFromExt(shpR, dbfR) defer sr.Close() if !sr.Next() { return nil, sr.Err() } _, s := sr.Shape() if s == nil { return nil, sr.Err() } return shapeToPolygon(s) } func generateContours(tree *octree.Tree, tx *sql.Tx, id int64) error { stmt, err := tx.Prepare(insertContourSQL) if err != nil { return err } defer stmt.Close() // Adjust contour lines heights to multiples of contourStepWidth var heights []float64 h := contourStepWidth * math.Ceil(tree.Min.Z/contourStepWidth) for ; h <= tree.Max.Z; h += contourStepWidth { heights = append(heights, h) } octree.DoContours(tree, heights, func(res *octree.ContourResult) { if err == nil && len(res.Lines) > 0 { _, err = stmt.Exec( id, res.Height, tree.EPSG, res.Lines.AsWKB2D(), contourTolerance) } }) return err }