changeset 603:3d33c53db1e3

Sounding results: Read point data from xyz files.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 10 Sep 2018 12:10:10 +0200
parents 10f898bbe50f
children 4d97066c311c
files cmd/soundingresults/main.go
diffstat 1 files changed, 119 insertions(+), 8 deletions(-) [+]
line wrap: on
line diff
--- a/cmd/soundingresults/main.go	Mon Sep 10 11:04:28 2018 +0200
+++ b/cmd/soundingresults/main.go	Mon Sep 10 12:10:10 2018 +0200
@@ -2,13 +2,19 @@
 
 import (
 	"bufio"
+	"compress/bzip2"
+	"compress/gzip"
 	"database/sql"
 	"flag"
 	"fmt"
+	"io"
 	"log"
 	"os"
 	"path/filepath"
+	"runtime"
+	"strconv"
 	"strings"
+	"sync"
 	"time"
 
 	"github.com/jackc/pgx"
@@ -52,6 +58,85 @@
 	depthReference string
 }
 
+func wrap(fname string, f io.Reader) (io.Reader, error) {
+
+	switch strings.ToLower(filepath.Ext(fname)) {
+	case ".gz":
+		return gzip.NewReader(f)
+	case ".bz2":
+		return bzip2.NewReader(f), nil
+	}
+
+	return bufio.NewReader(f), nil
+}
+
+type point3d struct {
+	x float64
+	y float64
+	z float64
+}
+
+func parseXYZ(fname string) ([]*point3d, error) {
+	f, err := os.Open(fname)
+	if err != nil {
+		return nil, err
+	}
+	defer f.Close()
+
+	r, err := wrap(fname, f)
+	if err != nil {
+		return nil, err
+	}
+
+	// Alloc in larger chunks to reduce pressure on memory management.
+	var chunk []point3d
+	alloc := func() *point3d {
+		if len(chunk) == 0 {
+			chunk = make([]point3d, 8*1024)
+		}
+		p := &chunk[0]
+		chunk = chunk[1:]
+		return p
+	}
+
+	var points []*point3d
+
+	s := bufio.NewScanner(r)
+	if s.Scan() { // Skip header line.
+		for line := 2; s.Scan(); line++ {
+			p := alloc()
+			text := s.Text()
+			// fmt.Sscanf(text, "%f,%f,%f") is 4 times slower.
+			idx := strings.IndexByte(text, ',')
+			if idx == -1 {
+				log.Printf("format error in line %d\n", line)
+				continue
+			}
+			if p.x, err = strconv.ParseFloat(text[:idx], 64); err != nil {
+				log.Printf("format error in line %d: %v\n", line, err)
+				continue
+			}
+			text = text[idx+1:]
+			if idx = strings.IndexByte(text, ','); idx == -1 {
+				log.Printf("format error in line %d\n", line)
+				continue
+			}
+			if p.y, err = strconv.ParseFloat(text[:idx], 64); err != nil {
+				log.Printf("format error in line %d: %v\n", line, err)
+				continue
+			}
+			text = text[idx+1:]
+			if p.z, err = strconv.ParseFloat(text, 64); err != nil {
+				log.Printf("format error in line %d: %v\n", line, err)
+				continue
+			}
+			points = append(points, p)
+		}
+	}
+
+	return points, s.Err()
+}
+
 func substituteName(fname, name string) string {
 	dir := filepath.Dir(fname)
 	info := filepath.Join(dir, "INFO.txt")
@@ -104,8 +189,6 @@
 
 	base = base[:len(base)-len("_WGS84")]
 
-	log.Printf("base: %s\n", base)
-
 	idx := strings.IndexRune(base, '_')
 	if idx == -1 {
 		return meta{}, fmt.Errorf("%s has no date", base)
@@ -143,15 +226,43 @@
 	}, nil
 }
 
+func processor(fnames <-chan string, wg *sync.WaitGroup) {
+	defer wg.Done()
+
+	for fname := range fnames {
+		log.Printf("Processing %s\n", fname)
+		m, err := parseFilename(fname)
+		if err != nil {
+			log.Printf("error: %v\n", err)
+			continue
+		}
+		_ = m
+		points, err := parseXYZ(fname)
+		if err != nil {
+			log.Printf("error: %v\n", err)
+			continue
+		}
+		fmt.Printf("Number of points: %d\n", len(points))
+	}
+}
+
 func main() {
 	flag.Parse()
 
+	var wg sync.WaitGroup
+
+	fnames := make(chan string)
+
+	for i, n := 0, runtime.NumCPU(); i < n; i++ {
+		wg.Add(1)
+		go processor(fnames, &wg)
+	}
+
 	for _, fname := range flag.Args() {
-		m, err := parseFilename(fname)
-		if err != nil {
-			log.Printf("error: %s\n", err)
-			continue
-		}
-		fmt.Printf("%q\n", m)
+		fnames <- fname
 	}
+
+	close(fnames)
+
+	wg.Wait()
 }