view pkg/wfs/download.go @ 5490:5f47eeea988d logging

Use own logging package.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 20 Sep 2021 17:45:39 +0200
parents 16259efa828f
children
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0.Reader.
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package wfs

import (
	"bufio"
	"bytes"
	"encoding/xml"
	"errors"
	"fmt"
	"io"
	"net/http"
	"net/url"
	"strconv"
	"strings"

	"golang.org/x/net/html/charset"

	"gemma.intevation.de/gemma/pkg/log"
)

var (
	// ErrNoSuchFeatureType is returned when a feature is not supported.
	ErrNoSuchFeatureType = errors.New("no such feature type")
	// ErrGetFeatureNotSupported is returned when GetFeature is not supported.
	ErrGetFeatureNotSupported = errors.New("method GetFeature not supported")
	// ErrMethodGetNotSupported is returned when the GET is not supported.
	ErrMethodGetNotSupported = errors.New("method GET not supported")
	// ErrNoNumberMatchedFound is returned if feature count cannot be extracted.
	ErrNoNumberMatchedFound = errors.New("no numberMatched attribute found")
	// ErrOutputFormatNotSupported is returned if a output format is
	// not supported.
	ErrOutputFormatNotSupported = errors.New("output format not supported")
)

// GetCapabilities downloads a capabilities document for a given URL.
func GetCapabilities(capURL string) (*Capabilities, error) {

	base, err := url.Parse(capURL)
	if err != nil {
		return nil, err
	}
	v := url.Values{}
	v.Set("SERVICE", "WFS")
	v.Set("REQUEST", "GetCapabilities")
	v.Set("ACCEPTVERSIONS", "2.0.0,1.1.0,1.0.0")
	base.RawQuery = v.Encode()

	baseURL := base.String()
	resp, err := http.Get(baseURL)
	if err != nil {
		return nil, err
	}
	defer resp.Body.Close()
	caps, err := ParseCapabilities(bufio.NewReader(resp.Body))
	if err == nil {
		caps.BaseURL = baseURL
	}
	return caps, err
}

func numberFeaturesGET(u *url.URL, featureType, version string) (int, error) {

	v := url.Values{}
	v.Set("SERVICE", "WFS")
	v.Set("REQUEST", "GetFeature")
	v.Set("resultType", "hits")
	v.Set("VERSION", version)
	v.Set("TYPENAMES", featureType)

	q := *u
	q.RawQuery = v.Encode()

	resp, err := http.Get(q.String())
	if err != nil {
		return 0, err
	}
	defer resp.Body.Close()
	dec := xml.NewDecoder(resp.Body)
	dec.CharsetReader = charset.NewReaderLabel

	var result struct {
		NumberMatched *int `xml:"numberMatched,attr"`
	}

	if err := dec.Decode(&result); err != nil {
		return 0, err
	}

	if result.NumberMatched == nil {
		return 0, ErrNoNumberMatchedFound
	}

	return *result.NumberMatched, nil
}

// GetFeaturesGET constructs a list of URLs to get features
// for a given feature type from a WFS servers.
func GetFeaturesGET(
	caps *Capabilities,
	featureTypeName string,
	outputFormats []string,
	sortBy string,
) ([]string, error) {

	feature := caps.FindFeatureType(featureTypeName)
	if feature == nil {
		return nil, ErrNoSuchFeatureType
	}
	op := caps.FindOperation("GetFeature")
	if op == nil {
		return nil, ErrGetFeatureNotSupported
	}

	if op.DCP.HTTP.Get == nil {
		return nil, ErrMethodGetNotSupported
	}

	getRaw := op.DCP.HTTP.Get.HRef
	getU, err := url.Parse(getRaw)
	if err != nil {
		return nil, err
	}
	// The URL could be relative so resolve against Capabilities URL.
	if !getU.IsAbs() {
		base, err := url.Parse(caps.BaseURL)
		if err != nil {
			return nil, err
		}
		getU = getU.ResolveReference(base)
	}

	var outputFormat string

	if len(outputFormats) > 0 {
		if outputFormat = op.SupportsOutputFormat(outputFormats...); outputFormat == "" {
			return nil, ErrOutputFormatNotSupported
		}
	}

	wfsVersion := caps.HighestWFSVersion(WFS200)

	featuresPerPage, supportsPaging := op.FeaturesPerPage()

	var numFeatures int

	if supportsPaging {
		log.Infof("Paging supported with %d feature per page.\n",
			featuresPerPage)

		if !op.SupportsHits() {
			supportsPaging = false
		} else {
			numFeatures, err = numberFeaturesGET(getU, featureTypeName, wfsVersion)
			if err != nil {
				log.Errorf("%v\n", err)
				supportsPaging = false
			} else if numFeatures == 0 {
				return nil, nil
			} else {
				log.Infof("Number of features: %d\n", numFeatures)
			}
		}
	}

	var downloadURLs []string
	wfs2 := !versionIsLess(wfsVersion, WFS200)

	addNS := func(v url.Values) {
		if len(feature.Namespaces) == 0 {
			return
		}
		// Only use first namespace
		ns := feature.Namespaces[0]
		if wfs2 {
			v.Set(
				"NAMESPACES", fmt.Sprintf("xmlns(%s,%s)", ns.Space, ns.Local))
		} else {
			v.Set("NAMESPACE", fmt.Sprintf("(%s:%s)", ns.Space, ns.Local))
		}
	}

	addOutputFormat := func(v url.Values) {
		if outputFormat != "" {
			v.Set("outputFormat", outputFormat)
		}
	}

	addSortBy := func(v url.Values) {
		if sortBy != "" {
			v.Set("sortBy", sortBy)
		}
	}

	if supportsPaging {
		pagedURL := func(ofs, count int) string {
			v := url.Values{}
			v.Set("SERVICE", "WFS")
			v.Set("REQUEST", "GetFeature")
			v.Set("VERSION", wfsVersion)
			v.Set("startIndex", strconv.Itoa(ofs))
			if wfs2 {
				v.Set("count", strconv.Itoa(count))
			} else {
				v.Set("maxFeatures", strconv.Itoa(count))
			}
			v.Set("TYPENAMES", featureTypeName)
			addNS(v)
			addOutputFormat(v)
			addSortBy(v)
			q := *getU
			q.RawQuery = v.Encode()
			return q.String()
		}
		if numFeatures > 0 {
			if numFeatures <= featuresPerPage {
				log.Infof("all features can be fetched in one page.")
				downloadURLs = []string{pagedURL(0, numFeatures)}
			} else {
				log.Infof("features need to be downloaded in pages.")
				for pos := 0; pos < numFeatures; {
					var count int
					if rest := numFeatures - pos; rest >= numFeatures {
						count = numFeatures
					} else {
						count = rest
					}
					downloadURLs = append(downloadURLs, pagedURL(pos, count))
					pos += count
				}
			}
		}
	} else { // No paging support.
		v := url.Values{}
		v.Set("SERVICE", "WFS")
		v.Set("REQUEST", "GetFeature")
		v.Set("VERSION", wfsVersion)
		v.Set("TYPENAMES", featureTypeName)
		addNS(v)
		addOutputFormat(v)
		addSortBy(v)
		q := *getU
		q.RawQuery = v.Encode()
		downloadURLs = []string{q.String()}
	}

	return downloadURLs, nil
}

func downloadURL(user, password, url string, handler func(string, io.Reader) error) error {
	req, err := http.NewRequest(http.MethodGet, url, nil)
	if err != nil {
		return err
	}

	if user != "" || password != "" {
		req.SetBasicAuth(user, password)
	}

	resp, err := http.DefaultClient.Do(req)

	if err != nil {
		return err
	}
	if resp.StatusCode < 200 || resp.StatusCode > 299 {
		return fmt.Errorf("invalid HTTP status code: %d (%s)",
			resp.StatusCode, resp.Status)
	}
	defer resp.Body.Close()

	var already bytes.Buffer

	// Prevent the XML reader from consuming everything.
	limit := io.LimitReader(resp.Body, 16*1024)
	in := io.TeeReader(limit, &already)

	if err := scanExceptionReport(in); err != nil {
		return err
	}

	multi := io.MultiReader(bytes.NewReader(already.Bytes()), resp.Body)
	return handler(url, multi)
}

// ExceptionReport is an error with the extract code and
// text from an OWS exception document.
type ExceptionReport struct {
	Code string
	Text string
}

func (er *ExceptionReport) Error() string {
	return fmt.Sprintf(
		"WFS GetFeature error: ExceptionCode: '%s' / ExceptionText: %s",
		er.Code, er.Text)
}

func scanExceptionReport(r io.Reader) *ExceptionReport {

	const ows = "http://www.opengis.net/ows/1.1"

	decoder := xml.NewDecoder(r)
	decoder.CharsetReader = charset.NewReaderLabel

	var (
		isError bool
		code    string
		depth   int
		text    strings.Builder
	)

	type tokenFunc func(xml.Token) tokenFunc

	var exceptionReportFn, exceptionFn, exceptionTextFn, collectTextFn tokenFunc

	exceptionReportFn = func(t xml.Token) tokenFunc {
		e, ok := t.(xml.StartElement)
		if !ok {
			return exceptionReportFn
		}
		if e.Name.Local != "ExceptionReport" && e.Name.Space != ows {
			return nil
		}
		isError = true
		return exceptionFn
	}

	exceptionFn = func(t xml.Token) tokenFunc {
		e, ok := t.(xml.StartElement)
		if !ok {
			return exceptionFn
		}
		if e.Name.Local == "Exception" && e.Name.Space == ows {
			for i := range e.Attr {
				at := &e.Attr[i]
				if at.Name.Local == "code" || at.Name.Local == "exceptionCode" {
					code = at.Value
					break
				}
			}
			return exceptionTextFn
		}
		return exceptionFn
	}

	exceptionTextFn = func(t xml.Token) tokenFunc {
		e, ok := t.(xml.StartElement)
		if ok && e.Name.Local == "ExceptionText" && e.Name.Space == ows {
			return collectTextFn
		}
		return exceptionTextFn
	}

	collectTextFn = func(t xml.Token) tokenFunc {
		switch e := t.(type) {
		case xml.StartElement:
			depth++
		case xml.CharData:
			if depth == 0 {
				text.Write(e)
			}
		case xml.EndElement:
			if depth == 0 {
				return nil
			}
			depth--
		}
		return collectTextFn
	}

tokens:
	for fn := exceptionReportFn; fn != nil; {
		tok, err := decoder.Token()
		switch {
		case tok == nil && err == io.EOF:
			break tokens
		case err != nil:
			return nil
		}
		fn = fn(tok)
	}

	if isError {
		return &ExceptionReport{
			Code: code,
			Text: text.String(),
		}
	}

	return nil
}

// DownloadURLs does the actual GetFeature requests downloads
// and hands the resulting io.Readers over to the given handler.
func DownloadURLs(user, password string, urls []string, handler func(string, io.Reader) error) error {
	for _, url := range urls {
		if err := downloadURL(user, password, url, handler); err != nil {
			return err
		}
	}
	return nil
}