Mercurial > gemma
changeset 611:effd22c0ae5a
Sounding result: Write simple SQL insert dumper. Not deterministic, yet.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Mon, 10 Sep 2018 17:32:51 +0200 |
parents | f62ee9d5bff1 |
children | d127856e689d |
files | cmd/soundingresults/main.go schema/gemma.sql |
diffstat | 2 files changed, 81 insertions(+), 13 deletions(-) [+] |
line wrap: on
line diff
--- a/cmd/soundingresults/main.go Mon Sep 10 17:01:24 2018 +0200 +++ b/cmd/soundingresults/main.go Mon Sep 10 17:32:51 2018 +0200 @@ -3,8 +3,10 @@ import ( "bufio" "database/sql" + "encoding/hex" "flag" "fmt" + "io" "log" "os" "path/filepath" @@ -18,6 +20,7 @@ ) var ( + dump = flag.Bool("dump", true, "dump SQL insert statements") insecure = flag.Bool("insecure", false, "skip SSL verification") dbhost = flag.String("dbhost", "localhost", "database host") dbport = flag.Uint("dbport", 5432, "database port") @@ -85,7 +88,7 @@ return name } -func parseFilename(fname string) (meta, error) { +func parseFilename(fname string) (*meta, error) { base := filepath.Base(fname) @@ -101,27 +104,27 @@ base = base[:len(base)-len(filepath.Ext(base))] if !strings.HasSuffix(strings.ToUpper(base), "_WGS84") { - return meta{}, fmt.Errorf("%s is not in WGS84", base) + return nil, fmt.Errorf("%s is not in WGS84", base) } base = base[:len(base)-len("_WGS84")] idx := strings.IndexRune(base, '_') if idx == -1 { - return meta{}, fmt.Errorf("%s has no date", base) + return nil, fmt.Errorf("%s has no date", base) } datePart := base[:idx] date, err := time.Parse("20060102", datePart) if err != nil { - return meta{}, fmt.Errorf("error %s: %v\n", datePart, err) + return nil, fmt.Errorf("error %s: %v\n", datePart, err) } rest := base[idx+1:] if idx = strings.LastIndex(rest, "_"); idx == -1 { - return meta{}, fmt.Errorf("%s has no depth reference", base) + return nil, fmt.Errorf("%s has no depth reference", base) } depthReference := rest[idx+1:] @@ -129,21 +132,27 @@ rest = rest[:idx] if !strings.HasSuffix(strings.ToUpper(rest), "_MB") { - return meta{}, fmt.Errorf("%s is not in WGS84", base) + return nil, fmt.Errorf("%s is not in WGS84", base) } name := rest[:len(rest)-len("_MB")] name = substituteName(fname, name) - return meta{ + return &meta{ name: name, depthReference: depthReference, date: date, }, nil } -func processor(fnames <-chan string, wg *sync.WaitGroup) { +type result struct { + m *meta + points points3d + wkb string +} + +func processor(fnames <-chan string, results chan<- result, wg *sync.WaitGroup) { defer wg.Done() for fname := range fnames { @@ -159,23 +168,79 @@ log.Printf("error: %v\n", err) continue } - fmt.Printf("Number of points: %d\n", len(points)) + log.Printf("Number of points: %d\n", len(points)) wkb := points.asWKB() - fmt.Printf("WKB size %.2f MB\n", float64(len(wkb))/(1024*1024)) + log.Printf("WKB size %.2f MB\n", float64(len(wkb))/(1024*1024)) + + results <- result{m, points, wkb} } } +func quote(s string) string { + return "'" + strings.Replace(s, "'", "'''", -1) + "'" +} + func main() { flag.Parse() var wg sync.WaitGroup fnames := make(chan string) + results := make(chan result) + done := make(chan struct{}) + + handler := func(result) {} + flush := func() {} + + if *dump { + out := bufio.NewWriter(os.Stdout) + fmt.Fprintln(out, "BEGIN;") + flush = func() { + fmt.Fprintln(out, "COMMIT;") + out.Flush() + } + handler = func(r result) { + if r.m.depthReference != "ADR" { + return + } + fmt.Fprintln(out, "INSERT INTO waterway.sounding_results (") + fmt.Fprintln(out, " bottleneck_id,") + fmt.Fprintln(out, " date_info,") + fmt.Fprintln(out, " depth_reference,") + fmt.Fprintln(out, " point_cloud") + fmt.Fprintln(out, ") VALUES (") + fmt.Fprintf(out, " %s,\n", quote(r.m.name)) + fmt.Fprintf(out, " '%s'::date,\n", r.m.date.Format("2006-01-02")) + fmt.Fprintf(out, " %s,\n", quote(r.m.depthReference)) + fmt.Fprintf(out, " '") + io.Copy(hex.NewEncoder(out), strings.NewReader(r.wkb)) + fmt.Fprintln(out, "'") + fmt.Fprintln(out, ");") + } + } else { + // TODO: Implement database stuff. + } + + fin := make(chan struct{}) + + go func() { + defer flush() + defer close(fin) + for { + select { + case <-done: + log.Println("xxx") + return + case r := <-results: + handler(r) + } + } + }() for i, n := 0, runtime.NumCPU(); i < n; i++ { wg.Add(1) - go processor(fnames, &wg) + go processor(fnames, results, &wg) } for _, fname := range flag.Args() { @@ -185,4 +250,7 @@ close(fnames) wg.Wait() + + close(done) + <-fin }
--- a/schema/gemma.sql Mon Sep 10 17:01:24 2018 +0200 +++ b/schema/gemma.sql Mon Sep 10 17:32:51 2018 +0200 @@ -372,8 +372,8 @@ bottleneck_id varchar NOT NULL REFERENCES bottlenecks, date_info date NOT NULL, PRIMARY KEY (bottleneck_id, date_info), - area geography(POLYGON, 4326) NOT NULL, - surtyp varchar NOT NULL REFERENCES survey_types, + area geography(POLYGON, 4326), + surtyp varchar REFERENCES survey_types, coverage varchar REFERENCES coverage_types, depth_reference char(3) NOT NULL REFERENCES depth_references, point_cloud geography(MULTIPOINTZ, 4326),