# HG changeset patch # User Sascha L. Teichmann # Date 1536574210 -7200 # Node ID 3d33c53db1e35b294ea6f48db1e6bc153cccf467 # Parent 10f898bbe50f1eed28edff1609e7fb7c420c4f27 Sounding results: Read point data from xyz files. diff -r 10f898bbe50f -r 3d33c53db1e3 cmd/soundingresults/main.go --- a/cmd/soundingresults/main.go Mon Sep 10 11:04:28 2018 +0200 +++ b/cmd/soundingresults/main.go Mon Sep 10 12:10:10 2018 +0200 @@ -2,13 +2,19 @@ import ( "bufio" + "compress/bzip2" + "compress/gzip" "database/sql" "flag" "fmt" + "io" "log" "os" "path/filepath" + "runtime" + "strconv" "strings" + "sync" "time" "github.com/jackc/pgx" @@ -52,6 +58,85 @@ depthReference string } +func wrap(fname string, f io.Reader) (io.Reader, error) { + + switch strings.ToLower(filepath.Ext(fname)) { + case ".gz": + return gzip.NewReader(f) + case ".bz2": + return bzip2.NewReader(f), nil + } + + return bufio.NewReader(f), nil +} + +type point3d struct { + x float64 + y float64 + z float64 +} + +func parseXYZ(fname string) ([]*point3d, error) { + f, err := os.Open(fname) + if err != nil { + return nil, err + } + defer f.Close() + + r, err := wrap(fname, f) + if err != nil { + return nil, err + } + + // Alloc in larger chunks to reduce pressure on memory management. + var chunk []point3d + alloc := func() *point3d { + if len(chunk) == 0 { + chunk = make([]point3d, 8*1024) + } + p := &chunk[0] + chunk = chunk[1:] + return p + } + + var points []*point3d + + s := bufio.NewScanner(r) + if s.Scan() { // Skip header line. + for line := 2; s.Scan(); line++ { + p := alloc() + text := s.Text() + // fmt.Sscanf(text, "%f,%f,%f") is 4 times slower. + idx := strings.IndexByte(text, ',') + if idx == -1 { + log.Printf("format error in line %d\n", line) + continue + } + if p.x, err = strconv.ParseFloat(text[:idx], 64); err != nil { + log.Printf("format error in line %d: %v\n", line, err) + continue + } + text = text[idx+1:] + if idx = strings.IndexByte(text, ','); idx == -1 { + log.Printf("format error in line %d\n", line) + continue + } + if p.y, err = strconv.ParseFloat(text[:idx], 64); err != nil { + log.Printf("format error in line %d: %v\n", line, err) + continue + } + text = text[idx+1:] + if p.z, err = strconv.ParseFloat(text, 64); err != nil { + log.Printf("format error in line %d: %v\n", line, err) + continue + } + points = append(points, p) + } + } + + return points, s.Err() +} + func substituteName(fname, name string) string { dir := filepath.Dir(fname) info := filepath.Join(dir, "INFO.txt") @@ -104,8 +189,6 @@ base = base[:len(base)-len("_WGS84")] - log.Printf("base: %s\n", base) - idx := strings.IndexRune(base, '_') if idx == -1 { return meta{}, fmt.Errorf("%s has no date", base) @@ -143,15 +226,43 @@ }, nil } +func processor(fnames <-chan string, wg *sync.WaitGroup) { + defer wg.Done() + + for fname := range fnames { + log.Printf("Processing %s\n", fname) + m, err := parseFilename(fname) + if err != nil { + log.Printf("error: %v\n", err) + continue + } + _ = m + points, err := parseXYZ(fname) + if err != nil { + log.Printf("error: %v\n", err) + continue + } + fmt.Printf("Number of points: %d\n", len(points)) + } +} + func main() { flag.Parse() + var wg sync.WaitGroup + + fnames := make(chan string) + + for i, n := 0, runtime.NumCPU(); i < n; i++ { + wg.Add(1) + go processor(fnames, &wg) + } + for _, fname := range flag.Args() { - m, err := parseFilename(fname) - if err != nil { - log.Printf("error: %s\n", err) - continue - } - fmt.Printf("%q\n", m) + fnames <- fname } + + close(fnames) + + wg.Wait() }