view cmd/soundingresults/main.go @ 904:e4b72a199258

New default bottleneck colors Mainly to make the stroke color one actually selectable in the ui. In addition the pink does better match the collors used on the ECDIS layer.
author Sascha Wilde <wilde@intevation.de>
date Tue, 02 Oct 2018 13:34:59 +0200
parents 4e90daa57086
children
line wrap: on
line source

package main

import (
	"bufio"
	"database/sql"
	"encoding/hex"
	"flag"
	"fmt"
	"io"
	"log"
	"os"
	"path/filepath"
	"runtime"
	"sort"
	"strings"
	"sync"
	"time"

	"github.com/jackc/pgx"
	"github.com/jackc/pgx/stdlib"
)

var (
	dump       = flag.Bool("dump", true, "dump SQL insert statements")
	insecure   = flag.Bool("insecure", false, "skip SSL verification")
	dbhost     = flag.String("dbhost", "localhost", "database host")
	dbport     = flag.Uint("dbport", 5432, "database port")
	dbname     = flag.String("dbname", "gemma", "database user")
	dbuser     = flag.String("dbuser", "scott", "database user")
	dbpassword = flag.String("dbpw", "tiger", "database password")
	dbssl      = flag.String("dbssl", "prefer", "database SSL mode")
)

func run(fn func(*sql.DB) error) error {

	// To ease SSL config ride a bit on parsing.
	cc, err := pgx.ParseConnectionString("sslmode=" + *dbssl)
	if err != nil {
		return err
	}

	// Do the rest manually to allow whitespace in user/password.
	cc.Host = *dbhost
	cc.Port = uint16(*dbport)
	cc.User = *dbuser
	cc.Password = *dbpassword
	cc.Database = *dbname

	db := stdlib.OpenDB(cc)
	defer db.Close()

	return fn(db)
}

// TODO: This should come from the depth_references table.
var knownDepthReferences = []string{
	"ADR", "ETRS", "FZP", "GLW", "HBO",
	"HDC", "HNW", "HSW", "IGN", "KP",
	"LDC", "LNW", "NAP", "NGM", "POT",
	"PUL", "RN", "TAW", "WGS", "ZPG",
}

func isKnownDepthReference(ref string) bool {
	ref = strings.ToUpper(ref)
	for _, r := range knownDepthReferences {
		if r == ref {
			return true
		}
	}
	return false
}

type meta struct {
	date           time.Time
	name           string
	depthReference string
}

func substituteName(fname, name string) string {
	dir := filepath.Dir(fname)
	info := filepath.Join(dir, "INFO.txt")
	f, err := os.Open(info)
	if err != nil {
		log.Printf("warn: %v\n", err)
		return name
	}
	defer f.Close()

	s := bufio.NewScanner(f)

	for search := strings.ToLower(name); s.Scan(); {
		line := strings.TrimSpace(s.Text())
		if line == "" || strings.HasPrefix(line, "#") {
			continue
		}

		if parts := strings.SplitN(line, "=", 2); len(parts) == 2 &&
			strings.TrimSpace(strings.ToLower(parts[0])) == search {
			return strings.TrimSpace(parts[1])
		}
	}

	if err := s.Err(); err != nil {
		log.Printf("error: %v\n", err)
	}

	return name
}

func parseFilename(fname string) (*meta, error) {

	base := filepath.Base(fname)

	compressed := strings.ToLower(filepath.Ext(base))
	for _, ext := range []string{".gz", ".bz2"} {
		if ext == compressed {
			base = base[:len(base)-len(ext)]
			break
		}
	}

	// Cut .txt
	base = base[:len(base)-len(filepath.Ext(base))]

	if !strings.HasSuffix(strings.ToUpper(base), "_WGS84") {
		return nil, fmt.Errorf("%s is not in WGS84", base)
	}

	base = base[:len(base)-len("_WGS84")]

	idx := strings.IndexRune(base, '_')
	if idx == -1 {
		return nil, fmt.Errorf("%s has no date", base)
	}

	datePart := base[:idx]

	date, err := time.Parse("20060102", datePart)
	if err != nil {
		return nil, fmt.Errorf("error %s: %v\n", datePart, err)
	}

	rest := base[idx+1:]

	if idx = strings.LastIndex(rest, "_"); idx == -1 {
		return nil, fmt.Errorf("%s has no depth reference", base)
	}

	depthReference := rest[idx+1:]

	if !isKnownDepthReference(depthReference) {
		return nil, fmt.Errorf(
			"%s is not a known depth reference", depthReference)
	}

	rest = rest[:idx]

	if !strings.HasSuffix(strings.ToUpper(rest), "_MB") {
		return nil, fmt.Errorf("%s is not in WGS84", base)
	}

	name := rest[:len(rest)-len("_MB")]

	name = substituteName(fname, name)

	return &meta{
		name:           name,
		depthReference: depthReference,
		date:           date,
	}, nil
}

type result struct {
	m      *meta
	points points3d
	wkb    string
}

func processor(fnames <-chan string, results chan<- result, wg *sync.WaitGroup) {
	defer wg.Done()

	for fname := range fnames {
		log.Printf("Processing %s\n", fname)
		m, err := parseFilename(fname)
		if err != nil {
			log.Printf("error: %v\n", err)
			continue
		}
		_ = m
		points, err := parseXYZ(fname)
		if err != nil {
			log.Printf("error: %v\n", err)
			continue
		}
		log.Printf("Number of points: %d\n", len(points))

		wkb := points.asWKB()
		log.Printf("WKB size %.2f MB\n", float64(len(wkb))/(1024*1024))

		results <- result{m, points, wkb}
	}
}

func quote(s string) string {
	return "'" + strings.Replace(s, "'", "'''", -1) + "'"
}

func main() {
	flag.Parse()

	var wg sync.WaitGroup

	fnames := make(chan string)
	results := make(chan result)
	done := make(chan struct{})

	handler := func(result) {}
	flush := func() {}

	if *dump {
		var results []result
		handler = func(r result) { results = append(results, r) }
		flush = func() {
			sort.Slice(results, func(i, j int) bool {
				if a, b := results[i].m.name, results[j].m.name; a != b {
					return a < b
				}
				return results[i].m.date.Before(results[j].m.date)
			})
			out := bufio.NewWriter(os.Stdout)
			fmt.Fprintln(out, "BEGIN;")
			for i := range results {
				r := &results[i]
				fmt.Fprintln(out, "INSERT INTO waterway.sounding_results (")
				fmt.Fprintln(out, "  bottleneck_id,")
				fmt.Fprintln(out, "  date_info,")
				fmt.Fprintln(out, "  depth_reference,")
				fmt.Fprintln(out, "  point_cloud")
				fmt.Fprintln(out, ") VALUES (")
				fmt.Fprintf(out,
					"  (SELECT bottleneck_id from waterway.bottlenecks where objnam = %s),\n",
					quote(r.m.name))
				fmt.Fprintf(out, "  '%s'::date,\n", r.m.date.Format("2006-01-02"))
				fmt.Fprintf(out, "  %s,\n", quote(r.m.depthReference))
				fmt.Fprintf(out, "  '")
				io.Copy(hex.NewEncoder(out), strings.NewReader(r.wkb))
				fmt.Fprintln(out, "'")
				fmt.Fprintln(out, ");")
			}
			fmt.Fprintln(out, "COMMIT;")
			out.Flush()
		}
	} else {
		// TODO: Implement database stuff.
	}

	fin := make(chan struct{})

	go func() {
		defer func() { flush(); close(fin) }()
		for {
			select {
			case <-done:
				return
			case r := <-results:
				handler(r)
			}
		}
	}()

	for i, n := 0, runtime.NumCPU(); i < n; i++ {
		wg.Add(1)
		go processor(fnames, results, &wg)
	}

	for _, fname := range flag.Args() {
		fnames <- fname
	}

	close(fnames)

	wg.Wait()

	close(done)
	<-fin
}