changeset 1681:4d6ce621379e

Waterway axis import: Completed, but the final commit unexpectedly resulted in rollback. TODO: Fix this bug.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 27 Dec 2018 17:08:09 +0100
parents de8089944b19
children 3c99d599503a
files pkg/imports/wx.go
diffstat 1 files changed, 120 insertions(+), 23 deletions(-) [+]
line wrap: on
line diff
--- a/pkg/imports/wx.go	Thu Dec 27 13:35:51 2018 +0100
+++ b/pkg/imports/wx.go	Thu Dec 27 17:08:09 2018 +0100
@@ -14,12 +14,15 @@
 package imports
 
 import (
+	"bytes"
 	"context"
 	"database/sql"
+	"encoding/binary"
 	"encoding/json"
+	"errors"
 	"fmt"
 	"io"
-	"log"
+	"math"
 	"strings"
 	"time"
 
@@ -72,6 +75,47 @@
 	NObjNnm *string `json:"hydro_nobjnm"`
 }
 
+type line [][]float64
+
+const wkbLineString uint32 = 2
+
+func (l line) asWKB() []byte {
+
+	size := 1 + 4 + 4 + len(l)*(1+4+2*8)
+
+	buf := bytes.NewBuffer(make([]byte, 0, size))
+
+	binary.Write(buf, binary.LittleEndian, wkbNDR)
+	binary.Write(buf, binary.LittleEndian, wkbLineString)
+	binary.Write(buf, binary.LittleEndian, uint32(len(l)))
+
+	for i := range l {
+		c := l[i]
+		var lat, lon float64
+		if len(c) > 0 {
+			lat = c[0]
+		}
+		if len(c) > 1 {
+			lon = c[1]
+		}
+		binary.Write(buf, binary.LittleEndian, math.Float64bits(lat))
+		binary.Write(buf, binary.LittleEndian, math.Float64bits(lon))
+	}
+
+	return buf.Bytes()
+}
+
+const (
+	deleteWaterwayAxisSQL = `DELETE FROM waterway.waterway_axis`
+	insertWaterwayAxisSQL = `
+INSERT INTO waterway.waterway_axis (wtwaxs, objnam, nobjnam)
+VALUES (
+  ST_Transform(ST_GeomFromWKB($1, $2::integer), 4326)::geography,
+  $3,
+  $4
+)`
+)
+
 // Do executes the actual waterway exis import.
 func (wx *WaterwayAxis) Do(
 	ctx context.Context,
@@ -98,6 +142,12 @@
 		return nil, err
 	}
 
+	epsg, err := wfs.CRSToEPSG(ft.DefaultCRS)
+	if err != nil {
+		feedback.Error("Unsupported CRS name '%s'", ft.DefaultCRS)
+		return nil, err
+	}
+
 	urls, err := wfs.GetFeaturesGET(
 		caps, wx.FeatureType, "application/json", wx.SortBy)
 	if err != nil {
@@ -105,11 +155,28 @@
 		return nil, err
 	}
 
+	tx, err := conn.BeginTx(ctx, nil)
+	if err != nil {
+		return nil, err
+	}
+	defer tx.Rollback()
+
+	insertStmt, err := tx.PrepareContext(ctx, insertWaterwayAxisSQL)
+	if err != nil {
+		return nil, err
+	}
+	defer insertStmt.Close()
+
+	// Delete the old features.
+	if _, err := tx.ExecContext(ctx, deleteWaterwayAxisSQL); err != nil {
+		return nil, err
+	}
+
 	var (
-		crsName           string
 		unsupportedTypes  = map[string]int{}
 		missingProperties int
 		badProperties     int
+		features          int
 	)
 
 	if err := wfs.DownloadURLs(urls, func(r io.Reader) error {
@@ -117,8 +184,12 @@
 		if err != nil {
 			return err
 		}
-		if crsName != "" && rfc.CRS != nil {
-			crsName = rfc.CRS.Properties.Name
+		if rfc.CRS != nil {
+			crsName := rfc.CRS.Properties.Name
+			if epsg, err = wfs.CRSToEPSG(crsName); err != nil {
+				feedback.Error("Unsupported CRS: %d", crsName)
+				return err
+			}
 		}
 
 		// No features -> ignore.
@@ -127,7 +198,7 @@
 		}
 
 		for _, feature := range rfc.Features {
-			if feature.Properties == nil {
+			if feature.Properties == nil || feature.Geometry.Coordinates == nil {
 				missingProperties++
 				continue
 			}
@@ -139,13 +210,44 @@
 				continue
 			}
 
-			log.Printf("properties: %s\n", props.ObjNam)
+			var nobjnam sql.NullString
+			if props.NObjNnm != nil {
+				nobjnam = sql.NullString{String: *props.NObjNnm, Valid: true}
+			}
 
 			switch feature.Geometry.Type {
 			case "LineString":
-				// TODO: Parse concrete features.
+				var l line
+				if err := json.Unmarshal(*feature.Geometry.Coordinates, &l); err != nil {
+					return err
+				}
+				if _, err := insertStmt.ExecContext(
+					ctx,
+					l.asWKB(),
+					epsg,
+					props.ObjNam,
+					nobjnam,
+				); err != nil {
+					return err
+				}
+				features++
 			case "MultiLineString":
-				// TODO: Parse concrete features.
+				var ls []line
+				if err := json.Unmarshal(*feature.Geometry.Coordinates, &ls); err != nil {
+					return err
+				}
+				for _, l := range ls {
+					if _, err := insertStmt.ExecContext(
+						ctx,
+						l.asWKB(),
+						epsg,
+						props.ObjNam,
+						nobjnam,
+					); err != nil {
+						return err
+					}
+					features++
+				}
 			default:
 				unsupportedTypes[feature.Geometry.Type]++
 			}
@@ -156,6 +258,12 @@
 		return nil, err
 	}
 
+	if features == 0 {
+		err := errors.New("No features found")
+		feedback.Error("%v", err)
+		return nil, err
+	}
+
 	if badProperties > 0 {
 		feedback.Warn("Bad properties: %d", badProperties)
 	}
@@ -175,21 +283,10 @@
 		feedback.Warn("Unsupported types found: %s", b.String())
 	}
 
-	if crsName == "" {
-		crsName = ft.DefaultCRS
+	if err = tx.Commit(); err == nil {
+		feedback.Info("Storing %d features took %s",
+			features, time.Since(start))
 	}
 
-	epsg, err := wfs.CRSToEPSG(crsName)
-	if err != nil {
-		feedback.Error("Unsupported CRS name '%s'", crsName)
-		return nil, err
-	}
-
-	feedback.Info("using ESPG: %d", epsg)
-
-	// TODO: Store extracted features.
-
-	feedback.Info("Storing took %s", time.Since(start))
-
-	return nil, nil
+	return nil, err
 }