view pkg/geoserver/boot.go @ 5591:0011f50cf216 surveysperbottleneckid

Removed no longer used alternative api for surveys/ endpoint. As bottlenecks in the summary for SR imports are now identified by their id and no longer by the (not guarantied to be unique!) name, there is no longer the need to request survey data by the name+date tuple (which isn't reliable anyway). So the workaround was now reversed.
author Sascha Wilde <wilde@sha-bang.de>
date Wed, 06 Apr 2022 13:30:29 +0200
parents 5f47eeea988d
children 6270951dda28
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package geoserver

import (
	"archive/zip"
	"bytes"
	"encoding/json"
	"encoding/xml"
	"fmt"
	"io"
	"net/http"
	"net/url"
	"strings"
	"sync"

	"golang.org/x/net/html/charset"

	"gemma.intevation.de/gemma/pkg/config"
	"gemma.intevation.de/gemma/pkg/log"
	"gemma.intevation.de/gemma/pkg/models"
)

const (
	workspaceName         = "gemma"
	datastoreName         = "gemma"
	databaseType          = "postgis"
	primaryKeyMetadataTbl = "waterway.gt_pk_metadata"
)

const (
	startupSQL = `SELECT public.setrole('${user,'||encode('waterway_user', 'hex')||'}')`
	closeupSQL = `RESET ROLE`
)

func basicAuth(user, password string) func(req *http.Request) {
	return func(req *http.Request) {
		req.SetBasicAuth(user, password)
	}
}

func toStream(x interface{}) io.Reader {
	var buf bytes.Buffer

	if err := json.NewEncoder(&buf).Encode(x); err != nil {
		// Should not happen
		log.Warnf("bad JSON: %v\n", err)
	}
	return bytes.NewReader(buf.Bytes())
}

// XXX: Creating SQL views with JSON via GeoServer REST-API fails
// Begin code for handling with XML instead
func toXMLStream(x interface{}) io.Reader {
	buf := bytes.NewBufferString(xml.Header)
	if err := xml.NewEncoder(buf).Encode(x); err != nil {
		// Should not happen
		log.Warnf("bad XML: %v\n", err)
	}
	return bytes.NewReader(buf.Bytes())
}

func asJSON(req *http.Request) {
	req.Header.Set("Content-Type", "application/json")
}

func asContentType(req *http.Request, contentType string) {
	req.Header.Set("Content-Type", contentType)
}

func ensureWorkspace() error {
	var (
		geoURL   = config.GeoServerURL()
		user     = config.GeoServerUser()
		password = config.GeoServerPassword()
		auth     = basicAuth(user, password)
	)

	// Probe  workspace.
	req, err := http.NewRequest(
		http.MethodGet,
		geoURL+"/rest/workspaces/"+workspaceName+".json?quietOnNotFound=true",
		nil)
	if err != nil {
		return err
	}
	auth(req)
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return err
	}

	if resp.StatusCode != http.StatusNotFound {
		log.Infof("workspace " + workspaceName + " already exists.")
		return nil
	}

	// Create workspace

	log.Infof("creating workspace " + workspaceName)

	const createJSON = `{"workspace":{"name":"` + workspaceName + `"}}`

	req, err = http.NewRequest(
		http.MethodPost,
		geoURL+"/rest/workspaces",
		strings.NewReader(createJSON))
	if err != nil {
		return err
	}
	auth(req)
	asJSON(req)
	if resp, err = http.DefaultClient.Do(req); err != nil {
		return err
	}

	if resp.StatusCode != http.StatusCreated {
		err = fmt.Errorf("status code '%s' (%d)",
			http.StatusText(resp.StatusCode),
			resp.StatusCode)
	}

	return err
}

func ensureDataStore() error {
	var (
		geoURL   = config.GeoServerURL()
		user     = config.GeoServerUser()
		password = config.GeoServerPassword()
		auth     = basicAuth(user, password)
	)

	// Probe datastore.
	req, err := http.NewRequest(
		http.MethodGet,
		geoURL+"/rest/workspaces/"+workspaceName+
			"/datastores/"+datastoreName+".json?quietOnNotFound=true",
		nil)
	if err != nil {
		return err
	}
	auth(req)
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return err
	}

	if resp.StatusCode != http.StatusNotFound {
		log.Infof("datastore " + datastoreName + " already exists.")
		return nil
	}

	// Create datastore.
	log.Infoln("creating datastore " + datastoreName)

	type entry struct {
		Key   interface{} `json:"@key"`
		Value interface{} `json:"$"`
	}

	// Create datastore.
	ds := map[string]interface{}{
		"dataStore": map[string]interface{}{
			"name": datastoreName,
			"connectionParameters": map[string]interface{}{
				"entry": []entry{
					{"host", config.DBHost()},
					{"port", config.DBPort()},
					{"database", config.DBName()},
					{"schema", models.DatabaseScheme},
					{"user", config.DBUser()},
					{"passwd", config.DBPassword()},
					{"dbtype", databaseType},
					{"Primary key metadata table", primaryKeyMetadataTbl},
					{"Expose primary keys", true},
					{"Session startup SQL",
						config.GeoServerStartupSQL() + ";" + startupSQL},
					{"Session close-up SQL", closeupSQL},
					{"validate connections", true},
					{"Estimated extends", false},
				},
			},
		},
	}

	req, err = http.NewRequest(
		http.MethodPost,
		geoURL+"/rest/workspaces/"+workspaceName+"/datastores",
		toStream(ds))
	if err != nil {
		return err
	}
	auth(req)
	asJSON(req)
	resp, err = http.DefaultClient.Do(req)
	if err != nil {
		return err
	}

	if resp.StatusCode != http.StatusCreated {
		err = fmt.Errorf("status code '%s' (%d)",
			http.StatusText(resp.StatusCode),
			resp.StatusCode)
	}

	return err
}

func ensureFeatures() error {
	tables := models.InternalServices.Filter(models.IntWFS)
	if len(tables) == 0 {
		log.Infoln("no tables to publish")
		return nil
	}

	log.Infof("number of tables to publish %d\n", len(tables))

	var (
		geoURL   = config.GeoServerURL()
		user     = config.GeoServerUser()
		password = config.GeoServerPassword()
		auth     = basicAuth(user, password)
	)

	datastoreURL := geoURL + "/rest/workspaces/" + workspaceName +
		"/datastores/" + datastoreName

	var features struct {
		FeatureTypes struct {
			FeatureType []struct {
				Name string `json:"name"`
			} `json:"featureType"`
		} `json:"featureTypes"`
	}

	hasFeature := func(name string) bool {
		for _, ft := range features.FeatureTypes.FeatureType {
			if ft.Name == name {
				return true
			}
		}
		return false
	}

	// Fetch all featuretypes.
	req, err := http.NewRequest(
		http.MethodGet,
		datastoreURL+"/featuretypes.json",
		nil)
	if err != nil {
		return err
	}
	auth(req)
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return err
	}

	err = json.NewDecoder(resp.Body).Decode(&features)
	resp.Body.Close()
	if err != nil {
		// XXX: Quirk in the JSON return by GeoServer:
		// If there are no features in the datastore
		// featureType deserializes to an empty string ""
		// instead of an empty array *palmface*.
		// So assume there no features.
		hasFeature = func(string) bool { return false }
	}

	var already []string

	defer func() {
		if len(already) > 0 {
			log.Infof("already having featuretypes: %s\n",
				strings.Join(already, ", "))
		}
	}()

	for i := range tables {
		table := tables[i].Name

		if hasFeature(table) {
			already = append(already, table)
			continue
		}

		// Create featuretype.
		log.Infof("creating featuretype %s.\n", table)

		var req *http.Request

		ft := map[string]interface{}{
			"name":       table,
			"nativeName": table,
			"title":      table,
		}
		if srs := tables[i].SRS; srs != nil {
			ft["srs"] = *srs
			// A bit of a hack!
			if *srs == "EPSG:4326" {
				box := map[string]interface{}{
					"minx": -180,
					"maxx": +180,
					"miny": -90,
					"maxy": +90,
					"crs":  "EPSG:4326",
				}
				ft["nativeBoundingBox"] = box
				ft["latLonBoundingBox"] = box
			}
		}

		var entries []map[string]interface{}

		if models.IntSQLView(tables[i]) {
			vt := map[string]interface{}{
				"name": table,
				"sql":  *tables[i].SQL,
			}
			if kc := tables[i].KeyColumn; kc != nil {
				vt["keyColumn"] = *kc
			}
			entry := map[string]interface{}{
				"@key":         "JDBC_VIRTUAL_TABLE",
				"virtualTable": vt,
			}
			entries = append(entries, entry)
		}

		if attr := tables[i].WMSTAttribute; attr != nil {
			di := map[string]interface{}{
				"enabled":             true,
				"attribute":           *attr,
				"presentation":        "CONTINUOUS_INTERVAL",
				"units":               "ISO8601",
				"nearestMatchEnabled": false,
				"defaultValue": map[string]string{
					"strategy":       "FIXED",
					"referenceValue": "PT1M/PRESENT",
				},
			}
			if endAttr := tables[i].WMSTEndAttribute; endAttr != nil {
				di["endAttribute"] = *endAttr
			}
			entry := map[string]interface{}{
				"@key":          "time",
				"dimensionInfo": di,
			}
			entries = append(entries, entry)
		}

		if len(entries) > 0 {
			ft["metadata"] = map[string]interface{}{
				"entry": entries,
			}
		}

		doc := map[string]interface{}{
			"featureType": ft,
		}

		req, err = http.NewRequest(
			http.MethodPost,
			datastoreURL+"/featuretypes",
			toStream(doc))
		if err != nil {
			return err
		}
		asJSON(req)
		auth(req)

		resp, err := http.DefaultClient.Do(req)
		if err != nil {
			return err
		}

		if resp.StatusCode != http.StatusCreated {
			return fmt.Errorf("status code '%s' (%d)",
				http.StatusText(resp.StatusCode),
				resp.StatusCode)
		}
	}

	return nil
}

func ensureLayerGroups() error {

	groups := models.InternalServices.LayerGroups()
	if len(groups) == 0 {
		log.Infoln("info: no groups layers to publish")
		return nil
	}

	log.Infof("number of layer groups to publish %d\n", len(groups))

	var (
		geoURL   = config.GeoServerURL()
		user     = config.GeoServerUser()
		password = config.GeoServerPassword()
		auth     = basicAuth(user, password)
	)

	type layerGroups struct {
		LayerGroups struct {
			LayerGroup []struct {
				Name string `json:"name"`
			} `json:"layerGroup"`
		} `json:"layerGroups"`
	}

	var lg layerGroups

	hasLayerGroup := func(name string) bool {
		for i := range lg.LayerGroups.LayerGroup {
			if lg.LayerGroups.LayerGroup[i].Name == name {
				return true
			}
		}
		return false
	}

	layerGroupsURL := geoURL + "/rest/workspaces/" + workspaceName +
		"/layergroups"

	// Fetch all layer groups.
	req, err := http.NewRequest(
		http.MethodGet,
		layerGroupsURL+".json",
		nil)
	if err != nil {
		return err
	}
	auth(req)
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return err
	}

	// XXX: ignore the error
	_ = json.NewDecoder(resp.Body).Decode(&lg)
	resp.Body.Close()

	var already []string

	defer func() {
		if len(already) > 0 {
			log.Infof("already having layer groups: %s\n",
				strings.Join(already, ", "))
		}
	}()

	for i := range groups {
		if hasLayerGroup(groups[i].Name) {
			already = append(already, groups[i].Name)
			continue
		}

		log.Infof("creating layer group %s.\n", groups[i].Name)

		type (
			Layers struct {
				Layer []string `xml:"layer"`
			}
			Styles struct {
				Style []string `xml:"style"`
			}
			LayerGroup struct {
				XMLName xml.Name `xml:"layerGroup"`
				Name    string   `xml:"name"`
				Title   string   `xml:"title"`
				Layers  Layers   `xml:"layers"`
				Styles  Styles   `xml:"styles"`
			}
		)

		lgr := LayerGroup{
			Name:  groups[i].Name,
			Title: groups[i].Name,
			Layers: Layers{
				Layer: groups[i].Layers,
			},
			Styles: Styles{
				Style: groups[i].Layers,
			},
		}

		req, err = http.NewRequest(
			http.MethodPost,
			layerGroupsURL+".xml",
			toXMLStream(&lgr))
		if err != nil {
			return err
		}
		asContentType(req, "text/xml")
		auth(req)

		resp, err := http.DefaultClient.Do(req)
		if err != nil {
			return err
		}

		if resp.StatusCode != http.StatusCreated {
			return fmt.Errorf("status code '%s' (%d)",
				http.StatusText(resp.StatusCode),
				resp.StatusCode)
		}
	}

	return nil
}

func deleteWorkspace() error {

	// Should we delete our workspace first?
	if !config.GeoServerClean() {
		return nil
	}

	log.Infoln("info: delete workspace " + workspaceName)
	var (
		geoURL   = config.GeoServerURL()
		user     = config.GeoServerUser()
		password = config.GeoServerPassword()
		auth     = basicAuth(user, password)
	)

	req, err := http.NewRequest(
		http.MethodDelete,
		geoURL+"/rest/workspaces/"+workspaceName+"?recurse=true",
		nil)
	if err != nil {
		return err
	}
	auth(req)
	_, err = http.DefaultClient.Do(req)
	return err
}

type styles struct {
	Styles struct {
		Style []struct {
			Name string `json:"name"`
		} `json:"style"`
	} `json:"styles"`
}

func (s *styles) hasStyle(name string) bool {
	for i := range s.Styles.Style {
		if s.Styles.Style[i].Name == name {
			return true
		}
	}
	return false
}

func (s *styles) load() error {
	var (
		geoURL   = config.GeoServerURL()
		user     = config.GeoServerUser()
		password = config.GeoServerPassword()
		auth     = basicAuth(user, password)
	)

	req, err := http.NewRequest(
		http.MethodGet,
		geoURL+"/rest/workspaces/"+workspaceName+"/styles.json",
		nil)
	if err != nil {
		return err
	}
	auth(req)
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return err
	}
	defer resp.Body.Close()

	// Fetch all styles
	// XXX: Avoid error checking due to quirks with featuretypes.
	json.NewDecoder(resp.Body).Decode(s)
	return nil
}

var (
	stylePreprocessorsMu sync.Mutex
	stylePreprocessors   = map[string]func(string) (string, error){}
)

// RegisterStylePreprocessor stores a textual pre-processor
// for a given style to be applied when the value of the
// style changes.
func RegisterStylePreprocessor(name string, processor func(string) (string, error)) {
	stylePreprocessorsMu.Lock()
	defer stylePreprocessorsMu.Unlock()
	stylePreprocessors[name] = processor
}

// FindStylePreprocessor searches for a registed textual pre-processor
// associated with a given style name.
func FindStylePreprocessor(name string) func(string) (string, error) {
	stylePreprocessorsMu.Lock()
	defer stylePreprocessorsMu.Unlock()
	return stylePreprocessors[name]
}

func isZip(data []byte) bool {
	if len(data) == 0 {
		return false
	}
	_, err := zip.NewReader(bytes.NewReader(data), int64(len(data)))
	return err == nil
}

func updateStyle(entry *models.IntEntry, create bool) error {

	log.Infof("creating style %s\n", entry.Name)

	// Try to load the style data.
	binary, err := entry.LoadStyle()
	if err != nil {
		return err
	}

	zip := isZip(binary)

	if !zip { // We only support templating for plain XML styles.
		if processor := FindStylePreprocessor(entry.Name); processor != nil {
			data, err := processor(string(binary))
			if err != nil {
				return err
			}
			binary = []byte(data)
		}
	}

	var (
		geoURL   = config.GeoServerURL()
		user     = config.GeoServerUser()
		password = config.GeoServerPassword()
		auth     = basicAuth(user, password)
	)

	styleURL := geoURL + "/rest/workspaces/" + workspaceName +
		"/styles"

	// First create style

	type Style struct {
		Name     string `json:"name"`
		Filename string `json:"filename"`
	}

	var styleFilename = struct {
		Style Style `json:"style"`
	}{
		Style: Style{
			Name:     entry.Name,
			Filename: entry.Name + ".sld",
		},
	}

	if create {
		req, err := http.NewRequest(
			http.MethodPost,
			styleURL,
			toStream(&styleFilename))

		if err != nil {
			return err
		}
		auth(req)
		asJSON(req)
		resp, err := http.DefaultClient.Do(req)
		if err != nil {
			return err
		}

		if resp.StatusCode != http.StatusCreated {
			return fmt.Errorf("unable to create style %s (%s)",
				entry.Name,
				http.StatusText(resp.StatusCode))
		}
	}

	// Second upload data

	req, err := http.NewRequest(
		http.MethodPut,
		styleURL+"/"+url.PathEscape(entry.Name),
		bytes.NewReader(binary))
	if err != nil {
		return err
	}
	auth(req)
	switch {
	case zip:
		asContentType(req, "application/zip")
	case isSymbologyEncoding(binary):
		asContentType(req, "application/vnd.ogc.se+xml")
	default:
		asContentType(req, "application/vnd.ogc.sld+xml")
	}
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return err
	}

	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("cannot upload style %s (%s)",
			entry.Name, http.StatusText(resp.StatusCode))
	}

	// Third associate with layer

	req, err = http.NewRequest(
		http.MethodPost,
		geoURL+"/rest/layers/"+
			url.PathEscape(workspaceName+":"+entry.Name)+
			"/styles?default=true",
		toStream(&styleFilename))
	if err != nil {
		return err
	}
	auth(req)
	asJSON(req)

	resp, err = http.DefaultClient.Do(req)
	if err != nil {
		return err
	}

	if resp.StatusCode != http.StatusCreated {
		return fmt.Errorf("cannot connect style %s with layer (%s)",
			entry.Name, http.StatusText(resp.StatusCode))
	}

	return nil
}

// isSymbologyEncoding tries to figure out if its plain SLD or SE.
func isSymbologyEncoding(data []byte) bool {
	decoder := xml.NewDecoder(bytes.NewReader(data))
	decoder.CharsetReader = charset.NewReaderLabel

	for {
		tok, err := decoder.Token()
		switch {
		case tok == nil && err == io.EOF:
			return false
		case err != nil:
			log.Warnf("invalid XML: %v\n", err)
			return false
		}
		if t, ok := tok.(xml.StartElement); ok &&
			t.Name.Space == "http://www.opengis.net/se" {
			return true
		}
	}
}

func ensureStyles() error {
	log.Infoln("creating styles")

	var stls styles
	if err := stls.load(); err != nil {
		return err
	}

	entries := models.InternalServices.Filter(
		models.IntAnd(
			models.IntWMS,
			models.IntWithStyle))

	var already []string

	defer func() {
		if len(already) > 0 {
			log.Infof("already having styles: %s\n",
				strings.Join(already, ", "))
		}
	}()

	for i := range entries {
		entry := &entries[i]
		if stls.hasStyle(entry.Name) {
			already = append(already, entry.Name)
			continue
		}
		if err := updateStyle(entry, true); err != nil {
			return err
		}
	}

	return nil
}

// PrepareGeoServer sets up the GeoServer to work together with the gemma server.
// It sets up a workspace, a datastore and exposes the features and styles.
func PrepareGeoServer() error {

	if config.DBUser() == "" {
		log.Infoln("need metamorphic db user to configure GeoServer")
		return nil
	}

	if config.GeoServerURL() == "" {
		log.Infoln("no URL to GeoServer configured")
		return nil
	}

	for _, ensure := range []func() error{
		deleteWorkspace,
		ensureWorkspace,
		ensureDataStore,
		ensureFeatures,
		ensureLayerGroups,
		ensureStyles,
	} {
		if err := ensure(); err != nil {
			return err
		}
	}

	return nil
}