changeset 346:ad0e47c1fedf

Use httputil.ReverseProxy for WFS proxying.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 06 Aug 2018 16:46:42 +0200
parents b97b3172c61a
children 72c76ab112e9
files controllers/externalwfs.go controllers/routes.go
diffstat 2 files changed, 121 insertions(+), 168 deletions(-) [+]
line wrap: on
line diff
--- a/controllers/externalwfs.go	Mon Aug 06 15:19:05 2018 +0200
+++ b/controllers/externalwfs.go	Mon Aug 06 16:46:42 2018 +0200
@@ -3,11 +3,10 @@
 import (
 	"compress/gzip"
 	"encoding/xml"
-	"fmt"
 	"io"
 	"log"
-	"net"
 	"net/http"
+	"net/url"
 	"strings"
 
 	"github.com/gorilla/mux"
@@ -16,50 +15,128 @@
 	"gemma.intevation.de/gemma/config"
 )
 
-func copyHeader(dst, src http.Header) {
-	for k, vv := range src {
-		log.Printf("%s: %v\n", k, vv)
-		for _, v := range vv {
-			dst.Add(k, v)
-		}
-	}
+type RoundTripFunc func(*http.Request) (*http.Response, error)
+
+func (rtf RoundTripFunc) RoundTrip(req *http.Request) (*http.Response, error) {
+	return rtf(req)
 }
 
-func cloneHeader(h http.Header) http.Header {
-	h2 := make(http.Header, len(h))
-	for k, vv := range h {
-		log.Printf("clone: %s: %v\n", k, vv)
-		vv2 := make([]string, len(vv))
-		copy(vv2, vv)
-		h2[k] = vv2
+func externalWFSDirector(req *http.Request) {
+
+	abort := func(format string, args ...interface{}) {
+		log.Printf(format, args...)
+		panic(http.ErrAbortHandler)
+	}
+
+	external := config.ExternalWFSs()
+	if external == nil || len(external) == 0 {
+		abort("No external WFS proxy config found\n")
+	}
+	wfs := mux.Vars(req)["wfs"]
+
+	alias, found := external[wfs]
+	if !found {
+		abort("No config found for %s\n", wfs)
+	}
+	data, ok := alias.(map[string]interface{})
+	if !ok {
+		abort("error: badly configured external WFS %s\n", wfs)
 	}
-	return h2
+
+	urlS, found := data["url"]
+	if !found {
+		abort("error: missing url for external WFS %s\n", wfs)
+	}
+
+	prefix, ok := urlS.(string)
+	if !ok {
+		abort("error: badly configured url for external WFS %s\n", wfs)
+	}
+
+	log.Printf("%v\n", prefix)
+	nURL := prefix + "?" + req.URL.RawQuery
+	log.Printf("%v\n", nURL)
+
+	u, err := url.Parse(nURL)
+	if err != nil {
+		abort("Invalid url: %v\n", err)
+	}
+	req.URL = u
+	req.Header.Set("X-Gemma-Prefix", prefix)
+	req.Host = u.Host
 }
 
-// Hop-by-hop headers. These are removed when sent to the backend.
-// http://www.w3.org/Protocols/rfc2616/rfc2616-sec13.html
-var hopHeaders = []string{
-	"Connection",
-	"Proxy-Connection", // non-standard but still sent by libcurl and rejected by e.g. google
-	"Keep-Alive",
-	"Proxy-Authenticate",
-	"Proxy-Authorization",
-	"Te",      // canonicalized version of "TE"
-	"Trailer", // not Trailers per URL above; http://www.rfc-editor.org/errata_search.php?eid=4522
-	"Transfer-Encoding",
-	"Upgrade",
+func externalWFSTransport(req *http.Request) (*http.Response, error) {
+
+	prefix := req.Header.Get("X-Gemma-Prefix")
+	req.Header.Del("X-Gemma-Prefix")
+
+	resp, err := http.DefaultTransport.RoundTrip(req)
+	if err != nil {
+		return nil, err
+	}
+
+	resp.Header.Set("X-Gemma-Prefix", prefix)
+	to := useHTTPS(req) + "://" + req.Host
+	if !strings.HasPrefix(req.URL.Path, "/") {
+		to += "/"
+	}
+	to += req.URL.Path
+	resp.Header.Set("X-Gemma-To", to)
+
+	return resp, err
 }
 
-// removeConnectionHeaders removes hop-by-hop headers listed in the "Connection" header of h.
-// See RFC 2616, section 14.10.
-func removeConnectionHeaders(h http.Header) {
-	if c := h.Get("Connection"); c != "" {
-		for _, f := range strings.Split(c, ",") {
-			if f = strings.TrimSpace(f); f != "" {
-				h.Del(f)
+func externalWFSModifyResponse(resp *http.Response) error {
+
+	prefix := resp.Header.Get("X-Gemma-Prefix")
+	to := resp.Header.Get("X-Gemma-To")
+	resp.Header.Del("X-Gemma-Prefix")
+	resp.Header.Del("X-Gemma-To")
+
+	xml := isXML(resp.Header)
+
+	gzipped := strings.Contains(resp.Header.Get("Content-Encoding"), "gzip")
+	if xml && gzipped {
+		resp.Header.Del("Content-Encoding")
+	}
+
+	if xml {
+		log.Printf("rewrite from %s to %s\n", prefix, to)
+		var r io.Reader
+		if gzipped {
+			var err error
+			r, err = gzip.NewReader(resp.Body)
+			if err != nil {
+				return err
 			}
+		} else {
+			r = resp.Body
 		}
+
+		pr, pw := io.Pipe()
+
+		go func(closer io.ReadCloser) {
+			defer func() {
+				pw.Close()
+				closer.Close()
+			}()
+			if _, err := io.Copy(pw, r); err != nil {
+				log.Printf("rewrite failed: %v\n", err)
+				return
+			}
+			/*
+				if err := rewrite(pw, r, prefix, to); err != nil {
+					log.Printf("rewrite failed: %v\n", err)
+					return
+				}
+			*/
+			log.Println("rewrite successful")
+		}(resp.Body)
+
+		resp.Body = pr
 	}
+	return nil
 }
 
 func isXML(h http.Header) bool {
@@ -73,137 +150,6 @@
 	return false
 }
 
-func externalWFSProxy(rw http.ResponseWriter, req *http.Request) {
-
-	external := config.ExternalWFSs()
-	if external == nil || len(external) == 0 {
-		http.NotFound(rw, req)
-		return
-	}
-	wfs := mux.Vars(req)["wfs"]
-
-	alias, found := external[wfs]
-	if !found {
-		http.NotFound(rw, req)
-		return
-	}
-	data, ok := alias.(map[string]interface{})
-	if !ok {
-		log.Printf("error: badly configured external wfs %s\n", wfs)
-		http.Error(rw,
-			http.StatusText(http.StatusInternalServerError),
-			http.StatusInternalServerError)
-		return
-	}
-
-	urlS, found := data["url"]
-	if !found {
-		log.Printf("error: missinf url fore xternal wfs %s\n", wfs)
-		http.Error(rw,
-			http.StatusText(http.StatusInternalServerError),
-			http.StatusInternalServerError)
-		return
-	}
-
-	prefix, ok := urlS.(string)
-	if !ok {
-		log.Printf("error: badly configured url for external wfs %s\n", wfs)
-		http.Error(rw,
-			http.StatusText(http.StatusInternalServerError),
-			http.StatusInternalServerError)
-		return
-	}
-
-	log.Printf("%v\n", prefix)
-	url := prefix + "?" + req.URL.RawQuery
-	log.Printf("%v\n", url)
-
-	remoteReq, err := http.NewRequest(req.Method, url, req.Body)
-	if err != nil {
-		http.Error(rw, fmt.Sprintf("error: %v", err), http.StatusBadRequest)
-		return
-	}
-
-	remoteReq.Header = cloneHeader(req.Header)
-	removeConnectionHeaders(remoteReq.Header)
-
-	// Remove hop-by-hop headers to the backend. Especially
-	// important is "Connection" because we want a persistent
-	// connection, regardless of what the client sent to us.
-	for _, h := range hopHeaders {
-		if remoteReq.Header.Get(h) != "" {
-			remoteReq.Header.Del(h)
-		}
-	}
-
-	if clientIP, _, err := net.SplitHostPort(req.RemoteAddr); err == nil {
-		// If we aren't the first proxy retain prior
-		// X-Forwarded-For information as a comma+space
-		// separated list and fold multiple headers into one.
-		if prior, ok := remoteReq.Header["X-Forwarded-For"]; ok {
-			clientIP = strings.Join(prior, ", ") + ", " + clientIP
-		}
-		remoteReq.Header.Set("X-Forwarded-For", clientIP)
-		log.Printf("X-Forwarded-For: %s\n", clientIP)
-	}
-
-	log.Printf("req: %v\n", remoteReq)
-
-	resp, err := http.DefaultTransport.RoundTrip(remoteReq)
-	//client := &http.Client{}
-	//resp, err := client.Do(remoteReq)
-	if err != nil {
-		http.Error(rw, fmt.Sprintf("error: %v", err), http.StatusBadRequest)
-		return
-	}
-
-	log.Printf("%v\n", resp.Header)
-
-	xml := isXML(resp.Header)
-	log.Printf("is xml: %t\n", xml)
-
-	gzipped := strings.Contains(resp.Header.Get("Content-Encoding"), "gzip")
-	if gzipped {
-		resp.Header.Del("Content-Encoding")
-	}
-
-	removeConnectionHeaders(resp.Header)
-	copyHeader(rw.Header(), resp.Header)
-
-	rw.WriteHeader(resp.StatusCode)
-
-	defer resp.Body.Close()
-
-	if xml {
-		to := useHTTPS(req) + "://" + req.Host
-		if !strings.HasPrefix(req.URL.Path, "/") {
-			to += "/"
-		}
-		to += req.URL.Path
-		var r io.Reader = resp.Body
-		if gzipped {
-			var err error
-			r, err = gzip.NewReader(resp.Body)
-			if err != nil {
-				log.Printf("gzip error: %v\n", err)
-				http.Error(rw, fmt.Sprintf("error: %v", err), http.StatusBadGateway)
-				return
-			}
-		} else {
-			r = resp.Body
-		}
-		log.Printf("rewrite %s to: %s\n", prefix, to)
-		err = rewrite(rw, r, prefix, to)
-	} else {
-		log.Printf("no rewrite")
-		_, err = io.Copy(rw, resp.Body)
-	}
-
-	if err != nil {
-		log.Printf("copy error: %v\n", err)
-	}
-}
-
 func rewrite(w io.Writer, r io.Reader, from, to string) error {
 
 	decoder := xml.NewDecoder(r)
--- a/controllers/routes.go	Mon Aug 06 15:19:05 2018 +0200
+++ b/controllers/routes.go	Mon Aug 06 16:46:42 2018 +0200
@@ -2,6 +2,7 @@
 
 import (
 	"net/http"
+	"net/http/httputil"
 
 	"gemma.intevation.de/gemma/auth"
 
@@ -51,7 +52,13 @@
 	}).Methods(http.MethodGet)
 
 	// Proxy for external WFSs.
-	api.HandleFunc("/externalwfs/{wfs}", externalWFSProxy).
+	externalWFSProxy := &httputil.ReverseProxy{
+		Director:       externalWFSDirector,
+		Transport:      RoundTripFunc(externalWFSTransport),
+		ModifyResponse: externalWFSModifyResponse,
+	}
+
+	api.Handle("/externalwfs/{wfs}", externalWFSProxy).
 		Methods(
 			http.MethodGet, http.MethodPost,
 			http.MethodPut, http.MethodDelete)