view cmd/soundingresults/main.go @ 612:d127856e689d

Sounding results: Sort the data sets by name and date to make the output deterministic.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 10 Sep 2018 17:48:56 +0200
parents effd22c0ae5a
children 99cd44f19e86
line wrap: on
line source

package main

import (
	"bufio"
	"database/sql"
	"encoding/hex"
	"flag"
	"fmt"
	"io"
	"log"
	"os"
	"path/filepath"
	"runtime"
	"sort"
	"strings"
	"sync"
	"time"

	"github.com/jackc/pgx"
	"github.com/jackc/pgx/stdlib"
)

var (
	dump       = flag.Bool("dump", true, "dump SQL insert statements")
	insecure   = flag.Bool("insecure", false, "skip SSL verification")
	dbhost     = flag.String("dbhost", "localhost", "database host")
	dbport     = flag.Uint("dbport", 5432, "database port")
	dbname     = flag.String("dbname", "gemma", "database user")
	dbuser     = flag.String("dbuser", "scott", "database user")
	dbpassword = flag.String("dbpw", "tiger", "database password")
	dbssl      = flag.String("dbssl", "prefer", "database SSL mode")
)

func run(fn func(*sql.DB) error) error {

	// To ease SSL config ride a bit on parsing.
	cc, err := pgx.ParseConnectionString("sslmode=" + *dbssl)
	if err != nil {
		return err
	}

	// Do the rest manually to allow whitespace in user/password.
	cc.Host = *dbhost
	cc.Port = uint16(*dbport)
	cc.User = *dbuser
	cc.Password = *dbpassword
	cc.Database = *dbname

	db := stdlib.OpenDB(cc)
	defer db.Close()

	return fn(db)
}

type meta struct {
	date           time.Time
	name           string
	depthReference string
}

func substituteName(fname, name string) string {
	dir := filepath.Dir(fname)
	info := filepath.Join(dir, "INFO.txt")
	f, err := os.Open(info)
	if err != nil {
		log.Printf("warn: %v\n", err)
		return name
	}
	defer f.Close()

	s := bufio.NewScanner(f)

	for search := strings.ToLower(name); s.Scan(); {
		line := strings.TrimSpace(s.Text())
		if line == "" || strings.HasPrefix(line, "#") {
			continue
		}

		if parts := strings.SplitN(line, "=", 2); len(parts) == 2 &&
			strings.TrimSpace(strings.ToLower(parts[0])) == search {
			return strings.TrimSpace(parts[1])
		}
	}

	if err := s.Err(); err != nil {
		log.Printf("error: %v\n", err)
	}

	return name
}

func parseFilename(fname string) (*meta, error) {

	base := filepath.Base(fname)

	compressed := strings.ToLower(filepath.Ext(base))
	for _, ext := range []string{".gz", ".bz2"} {
		if ext == compressed {
			base = base[:len(base)-len(ext)]
			break
		}
	}

	// Cut .txt
	base = base[:len(base)-len(filepath.Ext(base))]

	if !strings.HasSuffix(strings.ToUpper(base), "_WGS84") {
		return nil, fmt.Errorf("%s is not in WGS84", base)
	}

	base = base[:len(base)-len("_WGS84")]

	idx := strings.IndexRune(base, '_')
	if idx == -1 {
		return nil, fmt.Errorf("%s has no date", base)
	}

	datePart := base[:idx]

	date, err := time.Parse("20060102", datePart)
	if err != nil {
		return nil, fmt.Errorf("error %s: %v\n", datePart, err)
	}

	rest := base[idx+1:]

	if idx = strings.LastIndex(rest, "_"); idx == -1 {
		return nil, fmt.Errorf("%s has no depth reference", base)
	}

	depthReference := rest[idx+1:]

	rest = rest[:idx]

	if !strings.HasSuffix(strings.ToUpper(rest), "_MB") {
		return nil, fmt.Errorf("%s is not in WGS84", base)
	}

	name := rest[:len(rest)-len("_MB")]

	name = substituteName(fname, name)

	return &meta{
		name:           name,
		depthReference: depthReference,
		date:           date,
	}, nil
}

type result struct {
	m      *meta
	points points3d
	wkb    string
}

func processor(fnames <-chan string, results chan<- result, wg *sync.WaitGroup) {
	defer wg.Done()

	for fname := range fnames {
		log.Printf("Processing %s\n", fname)
		m, err := parseFilename(fname)
		if err != nil {
			log.Printf("error: %v\n", err)
			continue
		}
		_ = m
		points, err := parseXYZ(fname)
		if err != nil {
			log.Printf("error: %v\n", err)
			continue
		}
		log.Printf("Number of points: %d\n", len(points))

		wkb := points.asWKB()
		log.Printf("WKB size %.2f MB\n", float64(len(wkb))/(1024*1024))

		results <- result{m, points, wkb}
	}
}

func quote(s string) string {
	return "'" + strings.Replace(s, "'", "'''", -1) + "'"
}

func main() {
	flag.Parse()

	var wg sync.WaitGroup

	fnames := make(chan string)
	results := make(chan result)
	done := make(chan struct{})

	handler := func(result) {}
	flush := func() {}

	if *dump {
		var results []result
		handler = func(r result) {
			if r.m.depthReference != "ADR" {
				return
			}
			results = append(results, r)
		}
		flush = func() {
			sort.Slice(results, func(i, j int) bool {
				if a, b := results[i].m.name, results[j].m.name; a != b {
					return a < b
				}
				return results[i].m.date.Before(results[j].m.date)
			})
			out := bufio.NewWriter(os.Stdout)
			fmt.Fprintln(out, "BEGIN;")
			for i := range results {
				r := &results[i]
				fmt.Fprintln(out, "INSERT INTO waterway.sounding_results (")
				fmt.Fprintln(out, "  bottleneck_id,")
				fmt.Fprintln(out, "  date_info,")
				fmt.Fprintln(out, "  depth_reference,")
				fmt.Fprintln(out, "  point_cloud")
				fmt.Fprintln(out, ") VALUES (")
				fmt.Fprintf(out, "  %s,\n", quote(r.m.name))
				fmt.Fprintf(out, "  '%s'::date,\n", r.m.date.Format("2006-01-02"))
				fmt.Fprintf(out, "  %s,\n", quote(r.m.depthReference))
				fmt.Fprintf(out, "  '")
				io.Copy(hex.NewEncoder(out), strings.NewReader(r.wkb))
				fmt.Fprintln(out, "'")
				fmt.Fprintln(out, ");")
			}
			fmt.Fprintln(out, "COMMIT;")
			out.Flush()
		}
	} else {
		// TODO: Implement database stuff.
	}

	fin := make(chan struct{})

	go func() {
		defer func() { flush(); close(fin) }()
		for {
			select {
			case <-done:
				return
			case r := <-results:
				handler(r)
			}
		}
	}()

	for i, n := 0, runtime.NumCPU(); i < n; i++ {
		wg.Add(1)
		go processor(fnames, results, &wg)
	}

	for _, fname := range flag.Args() {
		fnames <- fname
	}

	close(fnames)

	wg.Wait()

	close(done)
	<-fin
}