Mercurial > gemma
view cmd/soundingresults/main.go @ 715:23b68cd4bed3
client: re-enable loading of full river bank marks
* Remove the bboxFilter that restricted loading of external distance_marks
to the extend of the Austrian Danbue. Because of the improved loading
strategy, this is more feasable, but still there maybe issue when
trying to display everything.
author | Bernhard Reiter <bernhard@intevation.de> |
---|---|
date | Fri, 21 Sep 2018 13:57:40 +0200 |
parents | 4e90daa57086 |
children |
line wrap: on
line source
package main import ( "bufio" "database/sql" "encoding/hex" "flag" "fmt" "io" "log" "os" "path/filepath" "runtime" "sort" "strings" "sync" "time" "github.com/jackc/pgx" "github.com/jackc/pgx/stdlib" ) var ( dump = flag.Bool("dump", true, "dump SQL insert statements") insecure = flag.Bool("insecure", false, "skip SSL verification") dbhost = flag.String("dbhost", "localhost", "database host") dbport = flag.Uint("dbport", 5432, "database port") dbname = flag.String("dbname", "gemma", "database user") dbuser = flag.String("dbuser", "scott", "database user") dbpassword = flag.String("dbpw", "tiger", "database password") dbssl = flag.String("dbssl", "prefer", "database SSL mode") ) func run(fn func(*sql.DB) error) error { // To ease SSL config ride a bit on parsing. cc, err := pgx.ParseConnectionString("sslmode=" + *dbssl) if err != nil { return err } // Do the rest manually to allow whitespace in user/password. cc.Host = *dbhost cc.Port = uint16(*dbport) cc.User = *dbuser cc.Password = *dbpassword cc.Database = *dbname db := stdlib.OpenDB(cc) defer db.Close() return fn(db) } // TODO: This should come from the depth_references table. var knownDepthReferences = []string{ "ADR", "ETRS", "FZP", "GLW", "HBO", "HDC", "HNW", "HSW", "IGN", "KP", "LDC", "LNW", "NAP", "NGM", "POT", "PUL", "RN", "TAW", "WGS", "ZPG", } func isKnownDepthReference(ref string) bool { ref = strings.ToUpper(ref) for _, r := range knownDepthReferences { if r == ref { return true } } return false } type meta struct { date time.Time name string depthReference string } func substituteName(fname, name string) string { dir := filepath.Dir(fname) info := filepath.Join(dir, "INFO.txt") f, err := os.Open(info) if err != nil { log.Printf("warn: %v\n", err) return name } defer f.Close() s := bufio.NewScanner(f) for search := strings.ToLower(name); s.Scan(); { line := strings.TrimSpace(s.Text()) if line == "" || strings.HasPrefix(line, "#") { continue } if parts := strings.SplitN(line, "=", 2); len(parts) == 2 && strings.TrimSpace(strings.ToLower(parts[0])) == search { return strings.TrimSpace(parts[1]) } } if err := s.Err(); err != nil { log.Printf("error: %v\n", err) } return name } func parseFilename(fname string) (*meta, error) { base := filepath.Base(fname) compressed := strings.ToLower(filepath.Ext(base)) for _, ext := range []string{".gz", ".bz2"} { if ext == compressed { base = base[:len(base)-len(ext)] break } } // Cut .txt base = base[:len(base)-len(filepath.Ext(base))] if !strings.HasSuffix(strings.ToUpper(base), "_WGS84") { return nil, fmt.Errorf("%s is not in WGS84", base) } base = base[:len(base)-len("_WGS84")] idx := strings.IndexRune(base, '_') if idx == -1 { return nil, fmt.Errorf("%s has no date", base) } datePart := base[:idx] date, err := time.Parse("20060102", datePart) if err != nil { return nil, fmt.Errorf("error %s: %v\n", datePart, err) } rest := base[idx+1:] if idx = strings.LastIndex(rest, "_"); idx == -1 { return nil, fmt.Errorf("%s has no depth reference", base) } depthReference := rest[idx+1:] if !isKnownDepthReference(depthReference) { return nil, fmt.Errorf( "%s is not a known depth reference", depthReference) } rest = rest[:idx] if !strings.HasSuffix(strings.ToUpper(rest), "_MB") { return nil, fmt.Errorf("%s is not in WGS84", base) } name := rest[:len(rest)-len("_MB")] name = substituteName(fname, name) return &meta{ name: name, depthReference: depthReference, date: date, }, nil } type result struct { m *meta points points3d wkb string } func processor(fnames <-chan string, results chan<- result, wg *sync.WaitGroup) { defer wg.Done() for fname := range fnames { log.Printf("Processing %s\n", fname) m, err := parseFilename(fname) if err != nil { log.Printf("error: %v\n", err) continue } _ = m points, err := parseXYZ(fname) if err != nil { log.Printf("error: %v\n", err) continue } log.Printf("Number of points: %d\n", len(points)) wkb := points.asWKB() log.Printf("WKB size %.2f MB\n", float64(len(wkb))/(1024*1024)) results <- result{m, points, wkb} } } func quote(s string) string { return "'" + strings.Replace(s, "'", "'''", -1) + "'" } func main() { flag.Parse() var wg sync.WaitGroup fnames := make(chan string) results := make(chan result) done := make(chan struct{}) handler := func(result) {} flush := func() {} if *dump { var results []result handler = func(r result) { results = append(results, r) } flush = func() { sort.Slice(results, func(i, j int) bool { if a, b := results[i].m.name, results[j].m.name; a != b { return a < b } return results[i].m.date.Before(results[j].m.date) }) out := bufio.NewWriter(os.Stdout) fmt.Fprintln(out, "BEGIN;") for i := range results { r := &results[i] fmt.Fprintln(out, "INSERT INTO waterway.sounding_results (") fmt.Fprintln(out, " bottleneck_id,") fmt.Fprintln(out, " date_info,") fmt.Fprintln(out, " depth_reference,") fmt.Fprintln(out, " point_cloud") fmt.Fprintln(out, ") VALUES (") fmt.Fprintf(out, " (SELECT bottleneck_id from waterway.bottlenecks where objnam = %s),\n", quote(r.m.name)) fmt.Fprintf(out, " '%s'::date,\n", r.m.date.Format("2006-01-02")) fmt.Fprintf(out, " %s,\n", quote(r.m.depthReference)) fmt.Fprintf(out, " '") io.Copy(hex.NewEncoder(out), strings.NewReader(r.wkb)) fmt.Fprintln(out, "'") fmt.Fprintln(out, ");") } fmt.Fprintln(out, "COMMIT;") out.Flush() } } else { // TODO: Implement database stuff. } fin := make(chan struct{}) go func() { defer func() { flush(); close(fin) }() for { select { case <-done: return case r := <-results: handler(r) } } }() for i, n := 0, runtime.NumCPU(); i < n; i++ { wg.Add(1) go processor(fnames, results, &wg) } for _, fname := range flag.Args() { fnames <- fname } close(fnames) wg.Wait() close(done) <-fin }