view pkg/wfs/download.go @ 5591:0011f50cf216 surveysperbottleneckid

Removed no longer used alternative api for surveys/ endpoint. As bottlenecks in the summary for SR imports are now identified by their id and no longer by the (not guarantied to be unique!) name, there is no longer the need to request survey data by the name+date tuple (which isn't reliable anyway). So the workaround was now reversed.
author Sascha Wilde <wilde@sha-bang.de>
date Wed, 06 Apr 2022 13:30:29 +0200
parents 5f47eeea988d
children
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0.Reader.
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package wfs

import (
	"bufio"
	"bytes"
	"encoding/xml"
	"errors"
	"fmt"
	"io"
	"net/http"
	"net/url"
	"strconv"
	"strings"

	"golang.org/x/net/html/charset"

	"gemma.intevation.de/gemma/pkg/log"
)

var (
	// ErrNoSuchFeatureType is returned when a feature is not supported.
	ErrNoSuchFeatureType = errors.New("no such feature type")
	// ErrGetFeatureNotSupported is returned when GetFeature is not supported.
	ErrGetFeatureNotSupported = errors.New("method GetFeature not supported")
	// ErrMethodGetNotSupported is returned when the GET is not supported.
	ErrMethodGetNotSupported = errors.New("method GET not supported")
	// ErrNoNumberMatchedFound is returned if feature count cannot be extracted.
	ErrNoNumberMatchedFound = errors.New("no numberMatched attribute found")
	// ErrOutputFormatNotSupported is returned if a output format is
	// not supported.
	ErrOutputFormatNotSupported = errors.New("output format not supported")
)

// GetCapabilities downloads a capabilities document for a given URL.
func GetCapabilities(capURL string) (*Capabilities, error) {

	base, err := url.Parse(capURL)
	if err != nil {
		return nil, err
	}
	v := url.Values{}
	v.Set("SERVICE", "WFS")
	v.Set("REQUEST", "GetCapabilities")
	v.Set("ACCEPTVERSIONS", "2.0.0,1.1.0,1.0.0")
	base.RawQuery = v.Encode()

	baseURL := base.String()
	resp, err := http.Get(baseURL)
	if err != nil {
		return nil, err
	}
	defer resp.Body.Close()
	caps, err := ParseCapabilities(bufio.NewReader(resp.Body))
	if err == nil {
		caps.BaseURL = baseURL
	}
	return caps, err
}

func numberFeaturesGET(u *url.URL, featureType, version string) (int, error) {

	v := url.Values{}
	v.Set("SERVICE", "WFS")
	v.Set("REQUEST", "GetFeature")
	v.Set("resultType", "hits")
	v.Set("VERSION", version)
	v.Set("TYPENAMES", featureType)

	q := *u
	q.RawQuery = v.Encode()

	resp, err := http.Get(q.String())
	if err != nil {
		return 0, err
	}
	defer resp.Body.Close()
	dec := xml.NewDecoder(resp.Body)
	dec.CharsetReader = charset.NewReaderLabel

	var result struct {
		NumberMatched *int `xml:"numberMatched,attr"`
	}

	if err := dec.Decode(&result); err != nil {
		return 0, err
	}

	if result.NumberMatched == nil {
		return 0, ErrNoNumberMatchedFound
	}

	return *result.NumberMatched, nil
}

// GetFeaturesGET constructs a list of URLs to get features
// for a given feature type from a WFS servers.
func GetFeaturesGET(
	caps *Capabilities,
	featureTypeName string,
	outputFormats []string,
	sortBy string,
) ([]string, error) {

	feature := caps.FindFeatureType(featureTypeName)
	if feature == nil {
		return nil, ErrNoSuchFeatureType
	}
	op := caps.FindOperation("GetFeature")
	if op == nil {
		return nil, ErrGetFeatureNotSupported
	}

	if op.DCP.HTTP.Get == nil {
		return nil, ErrMethodGetNotSupported
	}

	getRaw := op.DCP.HTTP.Get.HRef
	getU, err := url.Parse(getRaw)
	if err != nil {
		return nil, err
	}
	// The URL could be relative so resolve against Capabilities URL.
	if !getU.IsAbs() {
		base, err := url.Parse(caps.BaseURL)
		if err != nil {
			return nil, err
		}
		getU = getU.ResolveReference(base)
	}

	var outputFormat string

	if len(outputFormats) > 0 {
		if outputFormat = op.SupportsOutputFormat(outputFormats...); outputFormat == "" {
			return nil, ErrOutputFormatNotSupported
		}
	}

	wfsVersion := caps.HighestWFSVersion(WFS200)

	featuresPerPage, supportsPaging := op.FeaturesPerPage()

	var numFeatures int

	if supportsPaging {
		log.Infof("Paging supported with %d feature per page.\n",
			featuresPerPage)

		if !op.SupportsHits() {
			supportsPaging = false
		} else {
			numFeatures, err = numberFeaturesGET(getU, featureTypeName, wfsVersion)
			if err != nil {
				log.Errorf("%v\n", err)
				supportsPaging = false
			} else if numFeatures == 0 {
				return nil, nil
			} else {
				log.Infof("Number of features: %d\n", numFeatures)
			}
		}
	}

	var downloadURLs []string
	wfs2 := !versionIsLess(wfsVersion, WFS200)

	addNS := func(v url.Values) {
		if len(feature.Namespaces) == 0 {
			return
		}
		// Only use first namespace
		ns := feature.Namespaces[0]
		if wfs2 {
			v.Set(
				"NAMESPACES", fmt.Sprintf("xmlns(%s,%s)", ns.Space, ns.Local))
		} else {
			v.Set("NAMESPACE", fmt.Sprintf("(%s:%s)", ns.Space, ns.Local))
		}
	}

	addOutputFormat := func(v url.Values) {
		if outputFormat != "" {
			v.Set("outputFormat", outputFormat)
		}
	}

	addSortBy := func(v url.Values) {
		if sortBy != "" {
			v.Set("sortBy", sortBy)
		}
	}

	if supportsPaging {
		pagedURL := func(ofs, count int) string {
			v := url.Values{}
			v.Set("SERVICE", "WFS")
			v.Set("REQUEST", "GetFeature")
			v.Set("VERSION", wfsVersion)
			v.Set("startIndex", strconv.Itoa(ofs))
			if wfs2 {
				v.Set("count", strconv.Itoa(count))
			} else {
				v.Set("maxFeatures", strconv.Itoa(count))
			}
			v.Set("TYPENAMES", featureTypeName)
			addNS(v)
			addOutputFormat(v)
			addSortBy(v)
			q := *getU
			q.RawQuery = v.Encode()
			return q.String()
		}
		if numFeatures > 0 {
			if numFeatures <= featuresPerPage {
				log.Infof("all features can be fetched in one page.")
				downloadURLs = []string{pagedURL(0, numFeatures)}
			} else {
				log.Infof("features need to be downloaded in pages.")
				for pos := 0; pos < numFeatures; {
					var count int
					if rest := numFeatures - pos; rest >= numFeatures {
						count = numFeatures
					} else {
						count = rest
					}
					downloadURLs = append(downloadURLs, pagedURL(pos, count))
					pos += count
				}
			}
		}
	} else { // No paging support.
		v := url.Values{}
		v.Set("SERVICE", "WFS")
		v.Set("REQUEST", "GetFeature")
		v.Set("VERSION", wfsVersion)
		v.Set("TYPENAMES", featureTypeName)
		addNS(v)
		addOutputFormat(v)
		addSortBy(v)
		q := *getU
		q.RawQuery = v.Encode()
		downloadURLs = []string{q.String()}
	}

	return downloadURLs, nil
}

func downloadURL(user, password, url string, handler func(string, io.Reader) error) error {
	req, err := http.NewRequest(http.MethodGet, url, nil)
	if err != nil {
		return err
	}

	if user != "" || password != "" {
		req.SetBasicAuth(user, password)
	}

	resp, err := http.DefaultClient.Do(req)

	if err != nil {
		return err
	}
	if resp.StatusCode < 200 || resp.StatusCode > 299 {
		return fmt.Errorf("invalid HTTP status code: %d (%s)",
			resp.StatusCode, resp.Status)
	}
	defer resp.Body.Close()

	var already bytes.Buffer

	// Prevent the XML reader from consuming everything.
	limit := io.LimitReader(resp.Body, 16*1024)
	in := io.TeeReader(limit, &already)

	if err := scanExceptionReport(in); err != nil {
		return err
	}

	multi := io.MultiReader(bytes.NewReader(already.Bytes()), resp.Body)
	return handler(url, multi)
}

// ExceptionReport is an error with the extract code and
// text from an OWS exception document.
type ExceptionReport struct {
	Code string
	Text string
}

func (er *ExceptionReport) Error() string {
	return fmt.Sprintf(
		"WFS GetFeature error: ExceptionCode: '%s' / ExceptionText: %s",
		er.Code, er.Text)
}

func scanExceptionReport(r io.Reader) *ExceptionReport {

	const ows = "http://www.opengis.net/ows/1.1"

	decoder := xml.NewDecoder(r)
	decoder.CharsetReader = charset.NewReaderLabel

	var (
		isError bool
		code    string
		depth   int
		text    strings.Builder
	)

	type tokenFunc func(xml.Token) tokenFunc

	var exceptionReportFn, exceptionFn, exceptionTextFn, collectTextFn tokenFunc

	exceptionReportFn = func(t xml.Token) tokenFunc {
		e, ok := t.(xml.StartElement)
		if !ok {
			return exceptionReportFn
		}
		if e.Name.Local != "ExceptionReport" && e.Name.Space != ows {
			return nil
		}
		isError = true
		return exceptionFn
	}

	exceptionFn = func(t xml.Token) tokenFunc {
		e, ok := t.(xml.StartElement)
		if !ok {
			return exceptionFn
		}
		if e.Name.Local == "Exception" && e.Name.Space == ows {
			for i := range e.Attr {
				at := &e.Attr[i]
				if at.Name.Local == "code" || at.Name.Local == "exceptionCode" {
					code = at.Value
					break
				}
			}
			return exceptionTextFn
		}
		return exceptionFn
	}

	exceptionTextFn = func(t xml.Token) tokenFunc {
		e, ok := t.(xml.StartElement)
		if ok && e.Name.Local == "ExceptionText" && e.Name.Space == ows {
			return collectTextFn
		}
		return exceptionTextFn
	}

	collectTextFn = func(t xml.Token) tokenFunc {
		switch e := t.(type) {
		case xml.StartElement:
			depth++
		case xml.CharData:
			if depth == 0 {
				text.Write(e)
			}
		case xml.EndElement:
			if depth == 0 {
				return nil
			}
			depth--
		}
		return collectTextFn
	}

tokens:
	for fn := exceptionReportFn; fn != nil; {
		tok, err := decoder.Token()
		switch {
		case tok == nil && err == io.EOF:
			break tokens
		case err != nil:
			return nil
		}
		fn = fn(tok)
	}

	if isError {
		return &ExceptionReport{
			Code: code,
			Text: text.String(),
		}
	}

	return nil
}

// DownloadURLs does the actual GetFeature requests downloads
// and hands the resulting io.Readers over to the given handler.
func DownloadURLs(user, password string, urls []string, handler func(string, io.Reader) error) error {
	for _, url := range urls {
		if err := downloadURL(user, password, url, handler); err != nil {
			return err
		}
	}
	return nil
}