changeset 611:effd22c0ae5a

Sounding result: Write simple SQL insert dumper. Not deterministic, yet.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 10 Sep 2018 17:32:51 +0200
parents f62ee9d5bff1
children d127856e689d
files cmd/soundingresults/main.go schema/gemma.sql
diffstat 2 files changed, 81 insertions(+), 13 deletions(-) [+]
line wrap: on
line diff
--- a/cmd/soundingresults/main.go	Mon Sep 10 17:01:24 2018 +0200
+++ b/cmd/soundingresults/main.go	Mon Sep 10 17:32:51 2018 +0200
@@ -3,8 +3,10 @@
 import (
 	"bufio"
 	"database/sql"
+	"encoding/hex"
 	"flag"
 	"fmt"
+	"io"
 	"log"
 	"os"
 	"path/filepath"
@@ -18,6 +20,7 @@
 )
 
 var (
+	dump       = flag.Bool("dump", true, "dump SQL insert statements")
 	insecure   = flag.Bool("insecure", false, "skip SSL verification")
 	dbhost     = flag.String("dbhost", "localhost", "database host")
 	dbport     = flag.Uint("dbport", 5432, "database port")
@@ -85,7 +88,7 @@
 	return name
 }
 
-func parseFilename(fname string) (meta, error) {
+func parseFilename(fname string) (*meta, error) {
 
 	base := filepath.Base(fname)
 
@@ -101,27 +104,27 @@
 	base = base[:len(base)-len(filepath.Ext(base))]
 
 	if !strings.HasSuffix(strings.ToUpper(base), "_WGS84") {
-		return meta{}, fmt.Errorf("%s is not in WGS84", base)
+		return nil, fmt.Errorf("%s is not in WGS84", base)
 	}
 
 	base = base[:len(base)-len("_WGS84")]
 
 	idx := strings.IndexRune(base, '_')
 	if idx == -1 {
-		return meta{}, fmt.Errorf("%s has no date", base)
+		return nil, fmt.Errorf("%s has no date", base)
 	}
 
 	datePart := base[:idx]
 
 	date, err := time.Parse("20060102", datePart)
 	if err != nil {
-		return meta{}, fmt.Errorf("error %s: %v\n", datePart, err)
+		return nil, fmt.Errorf("error %s: %v\n", datePart, err)
 	}
 
 	rest := base[idx+1:]
 
 	if idx = strings.LastIndex(rest, "_"); idx == -1 {
-		return meta{}, fmt.Errorf("%s has no depth reference", base)
+		return nil, fmt.Errorf("%s has no depth reference", base)
 	}
 
 	depthReference := rest[idx+1:]
@@ -129,21 +132,27 @@
 	rest = rest[:idx]
 
 	if !strings.HasSuffix(strings.ToUpper(rest), "_MB") {
-		return meta{}, fmt.Errorf("%s is not in WGS84", base)
+		return nil, fmt.Errorf("%s is not in WGS84", base)
 	}
 
 	name := rest[:len(rest)-len("_MB")]
 
 	name = substituteName(fname, name)
 
-	return meta{
+	return &meta{
 		name:           name,
 		depthReference: depthReference,
 		date:           date,
 	}, nil
 }
 
-func processor(fnames <-chan string, wg *sync.WaitGroup) {
+type result struct {
+	m      *meta
+	points points3d
+	wkb    string
+}
+
+func processor(fnames <-chan string, results chan<- result, wg *sync.WaitGroup) {
 	defer wg.Done()
 
 	for fname := range fnames {
@@ -159,23 +168,79 @@
 			log.Printf("error: %v\n", err)
 			continue
 		}
-		fmt.Printf("Number of points: %d\n", len(points))
+		log.Printf("Number of points: %d\n", len(points))
 
 		wkb := points.asWKB()
-		fmt.Printf("WKB size %.2f MB\n", float64(len(wkb))/(1024*1024))
+		log.Printf("WKB size %.2f MB\n", float64(len(wkb))/(1024*1024))
+
+		results <- result{m, points, wkb}
 	}
 }
 
+func quote(s string) string {
+	return "'" + strings.Replace(s, "'", "'''", -1) + "'"
+}
+
 func main() {
 	flag.Parse()
 
 	var wg sync.WaitGroup
 
 	fnames := make(chan string)
+	results := make(chan result)
+	done := make(chan struct{})
+
+	handler := func(result) {}
+	flush := func() {}
+
+	if *dump {
+		out := bufio.NewWriter(os.Stdout)
+		fmt.Fprintln(out, "BEGIN;")
+		flush = func() {
+			fmt.Fprintln(out, "COMMIT;")
+			out.Flush()
+		}
+		handler = func(r result) {
+			if r.m.depthReference != "ADR" {
+				return
+			}
+			fmt.Fprintln(out, "INSERT INTO waterway.sounding_results (")
+			fmt.Fprintln(out, "  bottleneck_id,")
+			fmt.Fprintln(out, "  date_info,")
+			fmt.Fprintln(out, "  depth_reference,")
+			fmt.Fprintln(out, "  point_cloud")
+			fmt.Fprintln(out, ") VALUES (")
+			fmt.Fprintf(out, "  %s,\n", quote(r.m.name))
+			fmt.Fprintf(out, "  '%s'::date,\n", r.m.date.Format("2006-01-02"))
+			fmt.Fprintf(out, "  %s,\n", quote(r.m.depthReference))
+			fmt.Fprintf(out, "  '")
+			io.Copy(hex.NewEncoder(out), strings.NewReader(r.wkb))
+			fmt.Fprintln(out, "'")
+			fmt.Fprintln(out, ");")
+		}
+	} else {
+		// TODO: Implement database stuff.
+	}
+
+	fin := make(chan struct{})
+
+	go func() {
+		defer flush()
+		defer close(fin)
+		for {
+			select {
+			case <-done:
+				log.Println("xxx")
+				return
+			case r := <-results:
+				handler(r)
+			}
+		}
+	}()
 
 	for i, n := 0, runtime.NumCPU(); i < n; i++ {
 		wg.Add(1)
-		go processor(fnames, &wg)
+		go processor(fnames, results, &wg)
 	}
 
 	for _, fname := range flag.Args() {
@@ -185,4 +250,7 @@
 	close(fnames)
 
 	wg.Wait()
+
+	close(done)
+	<-fin
 }
--- a/schema/gemma.sql	Mon Sep 10 17:01:24 2018 +0200
+++ b/schema/gemma.sql	Mon Sep 10 17:32:51 2018 +0200
@@ -372,8 +372,8 @@
         bottleneck_id varchar NOT NULL REFERENCES bottlenecks,
         date_info date NOT NULL,
         PRIMARY KEY (bottleneck_id, date_info),
-        area geography(POLYGON, 4326) NOT NULL,
-        surtyp varchar NOT NULL REFERENCES survey_types,
+        area geography(POLYGON, 4326),
+        surtyp varchar REFERENCES survey_types,
         coverage varchar REFERENCES coverage_types,
         depth_reference char(3) NOT NULL REFERENCES depth_references,
         point_cloud geography(MULTIPOINTZ, 4326),