Mercurial > gemma
view pkg/imports/sr.go @ 1222:a8b682f3d13d
staging area: zoom to location
author | Thomas Junk <thomas.junk@intevation.de> |
---|---|
date | Mon, 19 Nov 2018 16:23:14 +0100 |
parents | 58acc343b1b6 |
children | bc4b642c8d04 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "archive/zip" "bufio" "context" "crypto/sha1" "database/sql" "encoding/hex" "encoding/json" "errors" "fmt" "io" "math" "os" "path" "path/filepath" "strconv" "strings" "time" shp "github.com/jonas-p/go-shp" "gemma.intevation.de/gemma/pkg/octree" ) type SoundingResult string const SoundingResultDateFormat = "2006-01-02" type ( SoundingResultDate struct{ time.Time } SoundingResultMeta struct { Date SoundingResultDate `json:"date"` Bottleneck string `json:"bottleneck"` EPSG uint `json:"epsg"` DepthReference string `json:"depth-reference"` } ) const wgs84 = 4326 const ( contourStepWidth = 0.1 contourTolerance = 0.1 ) const SRJobKind JobKind = "sr" type srJobCreator struct{} func init() { RegisterJobCreator(SRJobKind, srJobCreator{}) } func (srJobCreator) Create(_ JobKind, data string) (Job, error) { return SoundingResult(data), nil } func (srJobCreator) Depends() []string { return []string{ "waterway.sounding_results", "waterway.sounding_results_contour_lines", "waterway.bottlenecks", } } func (srJobCreator) StageDone( tx *sql.Tx, ctx context.Context, id int64, ) error { _, err := tx.ExecContext(ctx, srStageDoneSQL, id) return err } const ( srStageDoneSQL = ` UPDATE waterway.sounding_results SET staging_done = true WHERE id = $1` checkDepthReferenceSQL = ` SELECT true FROM depth_references WHERE depth_reference = $1` checkBottleneckSQL = ` SELECT true FROM waterway.bottlenecks WHERE objnam = $1` insertPointsSQL = ` INSERT INTO waterway.sounding_results ( bottleneck_id, date_info, depth_reference, point_cloud, area ) VALUES ( (SELECT bottleneck_id from waterway.bottlenecks where objnam = $1), $2::date, $3, ST_Transform(ST_GeomFromWKB($4, $6::integer), 4326)::geography, (SELECT CASE WHEN $5::bytea IS NULL THEN ST_Transform( ST_ConcaveHull( ST_Force2D(ST_GeomFromWKB($4, $6::integer)), 0.7), 4326)::geography ELSE ST_Transform(ST_GeomFromWKB($5, $6::integer), 4326)::geography END) ) RETURNING id, CASE WHEN ST_Y(ST_Centroid(point_cloud::geometry)) > 0 THEN 32600 ELSE 32700 END + floor((ST_X(ST_Centroid(point_cloud::geometry))+180)/6)::int + 1` insertOctreeSQL = ` UPDATE waterway.sounding_results SET octree_checksum = $2, octree_index = $3 WHERE id = $1` insertContourSQL = ` INSERT INTO waterway.sounding_results_contour_lines ( sounding_result_id, height, lines ) SELECT $1, $2, ST_Transform( ST_Multi( ST_CollectionExtract( ST_SimplifyPreserveTopology( ST_GeomFromWKB($4, $3::integer), $5 ), 2 ) ), 4326 ) FROM waterway.sounding_results sr WHERE id = $1 ` ) func (sr SoundingResult) Do( importID int64, ctx context.Context, conn *sql.Conn, feedback Feedback, ) error { z, err := zip.OpenReader(filepath.Join(string(sr), "sr.zip")) if err != nil { return err } defer z.Close() feedback.Info("Looking for 'meta.json'") mf := find("meta.json", z.File) if mf == nil { return errors.New("Cannot find 'meta.json'") } m, err := loadMeta(mf) if err != nil { return err } if err := m.validate(conn); err != nil { return err } feedback.Info("Looking for '*.xyz'") xyzf := find(".xyz", z.File) if xyzf == nil { return errors.New("Cannot find any *.xyz file") } xyz, err := loadXYZ(xyzf, feedback) if err != nil { return err } if len(xyz) == 0 { return errors.New("XYZ does not contain any vertices.") } // Is there a boundary shapefile in the ZIP archive? polygon, err := loadBoundary(z) if err != nil { return err } tx, err := conn.BeginTx(ctx, nil) if err != nil { return err } defer tx.Rollback() var id int64 var epsg uint32 start := time.Now() err = tx.QueryRow(insertPointsSQL, m.Bottleneck, m.Date.Time, m.DepthReference, xyz.AsWKB(), polygon.AsWBK(), m.EPSG, ).Scan(&id, &epsg) xyz, polygon = nil, nil // not need from now on. feedback.Info("storing points took %s", time.Since(start)) if err != nil { return err } feedback.Info("Best suited UTM EPSG: %d", epsg) feedback.Info("Triangulate...") start = time.Now() tin, err := octree.GenerateTinByID(conn, ctx, id, epsg) feedback.Info("triangulation took %s", time.Since(start)) if err != nil { return err } if tin == nil { return errors.New("cannot load TIN from database") } feedback.Info("Building octree...") builder := octree.NewBuilder(tin) start = time.Now() builder.Build() octreeIndex, err := builder.Bytes() tin = nil // not needed from now on feedback.Info("building octree took %s", time.Since(start)) if err != nil { return err } feedback.Info("Store octree...") start = time.Now() h := sha1.New() h.Write(octreeIndex) checksum := hex.EncodeToString(h.Sum(nil)) _, err = tx.Exec(insertOctreeSQL, id, checksum, octreeIndex) feedback.Info("storing octree index took %s", time.Since(start)) if err != nil { return err } tree := builder.Tree() builder = nil // not needed from now on feedback.Info("Generate contour lines...") start = time.Now() err = generateContours(tree, tx, id) feedback.Info("generating and storing contour lines took %s", time.Since(start)) if err != nil { return err } // Store for potential later removal. if err = track(tx, ctx, importID, "waterway.sounding_results", id); err != nil { return err } if err = tx.Commit(); err == nil { feedback.Info("Storing sounding result was successful.") } return err } func (srd *SoundingResultDate) UnmarshalJSON(data []byte) error { var s string if err := json.Unmarshal(data, &s); err != nil { return err } d, err := time.Parse(SoundingResultDateFormat, s) if err == nil { *srd = SoundingResultDate{d} } return err } func (sr SoundingResult) CleanUp() error { return os.RemoveAll(string(sr)) } func find(needle string, haystack []*zip.File) *zip.File { needle = strings.ToLower(needle) for _, straw := range haystack { if strings.HasSuffix(strings.ToLower(straw.Name), needle) { return straw } } return nil } func loadMeta(f *zip.File) (*SoundingResultMeta, error) { r, err := f.Open() if err != nil { return nil, err } defer r.Close() var m SoundingResultMeta err = json.NewDecoder(r).Decode(&m) if err == nil { if m.EPSG == 0 { m.EPSG = wgs84 } } return &m, err } func (m *SoundingResultMeta) validate(conn *sql.Conn) error { var b bool err := conn.QueryRowContext(context.Background(), checkDepthReferenceSQL, m.DepthReference).Scan(&b) switch { case err == sql.ErrNoRows: return fmt.Errorf("Unknown depth reference '%s'\n", m.DepthReference) case err != nil: return err case !b: return errors.New("Unexpected depth reference") } err = conn.QueryRowContext(context.Background(), checkBottleneckSQL, m.Bottleneck).Scan(&b) switch { case err == sql.ErrNoRows: return fmt.Errorf("Unknown bottleneck '%s'\n", m.Bottleneck) case err != nil: return err case !b: return errors.New("Unexpected bottleneck") } return nil } func loadXYZReader(r io.Reader, feedback Feedback) (octree.MultiPointZ, error) { mpz := make(octree.MultiPointZ, 0, 250000) s := bufio.NewScanner(r) for line := 1; s.Scan(); line++ { text := s.Text() var p octree.Vertex // fmt.Sscanf(text, "%f,%f,%f") is 4 times slower. idx := strings.IndexByte(text, ',') if idx == -1 { feedback.Warn("format error in line %d", line) continue } var err error if p.X, err = strconv.ParseFloat(text[:idx], 64); err != nil { feedback.Warn("format error in line %d: %v", line, err) continue } text = text[idx+1:] if idx = strings.IndexByte(text, ','); idx == -1 { feedback.Warn("format error in line %d", line) continue } if p.Y, err = strconv.ParseFloat(text[:idx], 64); err != nil { feedback.Warn("format error in line %d: %v", line, err) continue } text = text[idx+1:] if p.Z, err = strconv.ParseFloat(text, 64); err != nil { feedback.Warn("format error in line %d: %v", line, err) continue } mpz = append(mpz, p) } if err := s.Err(); err != nil { return nil, err } return mpz, nil } func loadXYZ(f *zip.File, feedback Feedback) (octree.MultiPointZ, error) { r, err := f.Open() if err != nil { return nil, err } defer r.Close() return loadXYZReader(r, feedback) } func loadBoundary(z *zip.ReadCloser) (Polygon, error) { shpF := find(".shp", z.File) if shpF == nil { return nil, nil } prefix := strings.TrimSuffix(shpF.Name, path.Ext(shpF.Name)) dbfF := find(prefix+".dbf", z.File) if dbfF == nil { return nil, fmt.Errorf("No DBF file found for %s", shpF.Name) } shpR, err := shpF.Open() if err != nil { return nil, err } dbfR, err := dbfF.Open() if err != nil { shpR.Close() return nil, err } sr := shp.SequentialReaderFromExt(shpR, dbfR) defer sr.Close() if !sr.Next() { return nil, sr.Err() } _, s := sr.Shape() if s == nil { return nil, sr.Err() } return shapeToPolygon(s) } func generateContours(tree *octree.Tree, tx *sql.Tx, id int64) error { stmt, err := tx.Prepare(insertContourSQL) if err != nil { return err } defer stmt.Close() // Adjust contour lines heights to multiples of contourStepWidth heights := make([]float64, 0) h := contourStepWidth * math.Ceil(tree.Min.Z/contourStepWidth) for ; h <= tree.Max.Z; h += contourStepWidth { heights = append(heights, h) } octree.DoContours(tree, heights, func(res *octree.ContourResult) { if err == nil { _, err = stmt.Exec( id, res.Height, tree.EPSG, res.Lines.AsWKB2D(), contourTolerance) } }) return err }