view pkg/geoserver/boot.go @ 5490:5f47eeea988d logging

Use own logging package.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 20 Sep 2021 17:45:39 +0200
parents 345515bc4548
children 6270951dda28
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package geoserver

import (
	"archive/zip"
	"bytes"
	"encoding/json"
	"encoding/xml"
	"fmt"
	"io"
	"net/http"
	"net/url"
	"strings"
	"sync"

	"golang.org/x/net/html/charset"

	"gemma.intevation.de/gemma/pkg/config"
	"gemma.intevation.de/gemma/pkg/log"
	"gemma.intevation.de/gemma/pkg/models"
)

const (
	workspaceName         = "gemma"
	datastoreName         = "gemma"
	databaseType          = "postgis"
	primaryKeyMetadataTbl = "waterway.gt_pk_metadata"
)

const (
	startupSQL = `SELECT public.setrole('${user,'||encode('waterway_user', 'hex')||'}')`
	closeupSQL = `RESET ROLE`
)

func basicAuth(user, password string) func(req *http.Request) {
	return func(req *http.Request) {
		req.SetBasicAuth(user, password)
	}
}

func toStream(x interface{}) io.Reader {
	var buf bytes.Buffer

	if err := json.NewEncoder(&buf).Encode(x); err != nil {
		// Should not happen
		log.Warnf("bad JSON: %v\n", err)
	}
	return bytes.NewReader(buf.Bytes())
}

// XXX: Creating SQL views with JSON via GeoServer REST-API fails
// Begin code for handling with XML instead
func toXMLStream(x interface{}) io.Reader {
	buf := bytes.NewBufferString(xml.Header)
	if err := xml.NewEncoder(buf).Encode(x); err != nil {
		// Should not happen
		log.Warnf("bad XML: %v\n", err)
	}
	return bytes.NewReader(buf.Bytes())
}

func asJSON(req *http.Request) {
	req.Header.Set("Content-Type", "application/json")
}

func asContentType(req *http.Request, contentType string) {
	req.Header.Set("Content-Type", contentType)
}

func ensureWorkspace() error {
	var (
		geoURL   = config.GeoServerURL()
		user     = config.GeoServerUser()
		password = config.GeoServerPassword()
		auth     = basicAuth(user, password)
	)

	// Probe  workspace.
	req, err := http.NewRequest(
		http.MethodGet,
		geoURL+"/rest/workspaces/"+workspaceName+".json?quietOnNotFound=true",
		nil)
	if err != nil {
		return err
	}
	auth(req)
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return err
	}

	if resp.StatusCode != http.StatusNotFound {
		log.Infof("workspace " + workspaceName + " already exists.")
		return nil
	}

	// Create workspace

	log.Infof("creating workspace " + workspaceName)

	const createJSON = `{"workspace":{"name":"` + workspaceName + `"}}`

	req, err = http.NewRequest(
		http.MethodPost,
		geoURL+"/rest/workspaces",
		strings.NewReader(createJSON))
	if err != nil {
		return err
	}
	auth(req)
	asJSON(req)
	if resp, err = http.DefaultClient.Do(req); err != nil {
		return err
	}

	if resp.StatusCode != http.StatusCreated {
		err = fmt.Errorf("status code '%s' (%d)",
			http.StatusText(resp.StatusCode),
			resp.StatusCode)
	}

	return err
}

func ensureDataStore() error {
	var (
		geoURL   = config.GeoServerURL()
		user     = config.GeoServerUser()
		password = config.GeoServerPassword()
		auth     = basicAuth(user, password)
	)

	// Probe datastore.
	req, err := http.NewRequest(
		http.MethodGet,
		geoURL+"/rest/workspaces/"+workspaceName+
			"/datastores/"+datastoreName+".json?quietOnNotFound=true",
		nil)
	if err != nil {
		return err
	}
	auth(req)
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return err
	}

	if resp.StatusCode != http.StatusNotFound {
		log.Infof("datastore " + datastoreName + " already exists.")
		return nil
	}

	// Create datastore.
	log.Infoln("creating datastore " + datastoreName)

	type entry struct {
		Key   interface{} `json:"@key"`
		Value interface{} `json:"$"`
	}

	// Create datastore.
	ds := map[string]interface{}{
		"dataStore": map[string]interface{}{
			"name": datastoreName,
			"connectionParameters": map[string]interface{}{
				"entry": []entry{
					{"host", config.DBHost()},
					{"port", config.DBPort()},
					{"database", config.DBName()},
					{"schema", models.DatabaseScheme},
					{"user", config.DBUser()},
					{"passwd", config.DBPassword()},
					{"dbtype", databaseType},
					{"Primary key metadata table", primaryKeyMetadataTbl},
					{"Expose primary keys", true},
					{"Session startup SQL",
						config.GeoServerStartupSQL() + ";" + startupSQL},
					{"Session close-up SQL", closeupSQL},
					{"validate connections", true},
					{"Estimated extends", false},
				},
			},
		},
	}

	req, err = http.NewRequest(
		http.MethodPost,
		geoURL+"/rest/workspaces/"+workspaceName+"/datastores",
		toStream(ds))
	if err != nil {
		return err
	}
	auth(req)
	asJSON(req)
	resp, err = http.DefaultClient.Do(req)
	if err != nil {
		return err
	}

	if resp.StatusCode != http.StatusCreated {
		err = fmt.Errorf("status code '%s' (%d)",
			http.StatusText(resp.StatusCode),
			resp.StatusCode)
	}

	return err
}

func ensureFeatures() error {
	tables := models.InternalServices.Filter(models.IntWFS)
	if len(tables) == 0 {
		log.Infoln("no tables to publish")
		return nil
	}

	log.Infof("number of tables to publish %d\n", len(tables))

	var (
		geoURL   = config.GeoServerURL()
		user     = config.GeoServerUser()
		password = config.GeoServerPassword()
		auth     = basicAuth(user, password)
	)

	datastoreURL := geoURL + "/rest/workspaces/" + workspaceName +
		"/datastores/" + datastoreName

	var features struct {
		FeatureTypes struct {
			FeatureType []struct {
				Name string `json:"name"`
			} `json:"featureType"`
		} `json:"featureTypes"`
	}

	hasFeature := func(name string) bool {
		for _, ft := range features.FeatureTypes.FeatureType {
			if ft.Name == name {
				return true
			}
		}
		return false
	}

	// Fetch all featuretypes.
	req, err := http.NewRequest(
		http.MethodGet,
		datastoreURL+"/featuretypes.json",
		nil)
	if err != nil {
		return err
	}
	auth(req)
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return err
	}

	err = json.NewDecoder(resp.Body).Decode(&features)
	resp.Body.Close()
	if err != nil {
		// XXX: Quirk in the JSON return by GeoServer:
		// If there are no features in the datastore
		// featureType deserializes to an empty string ""
		// instead of an empty array *palmface*.
		// So assume there no features.
		hasFeature = func(string) bool { return false }
	}

	var already []string

	defer func() {
		if len(already) > 0 {
			log.Infof("already having featuretypes: %s\n",
				strings.Join(already, ", "))
		}
	}()

	for i := range tables {
		table := tables[i].Name

		if hasFeature(table) {
			already = append(already, table)
			continue
		}

		// Create featuretype.
		log.Infof("creating featuretype %s.\n", table)

		var req *http.Request

		ft := map[string]interface{}{
			"name":       table,
			"nativeName": table,
			"title":      table,
		}
		if srs := tables[i].SRS; srs != nil {
			ft["srs"] = *srs
			// A bit of a hack!
			if *srs == "EPSG:4326" {
				box := map[string]interface{}{
					"minx": -180,
					"maxx": +180,
					"miny": -90,
					"maxy": +90,
					"crs":  "EPSG:4326",
				}
				ft["nativeBoundingBox"] = box
				ft["latLonBoundingBox"] = box
			}
		}

		var entries []map[string]interface{}

		if models.IntSQLView(tables[i]) {
			vt := map[string]interface{}{
				"name": table,
				"sql":  *tables[i].SQL,
			}
			if kc := tables[i].KeyColumn; kc != nil {
				vt["keyColumn"] = *kc
			}
			entry := map[string]interface{}{
				"@key":         "JDBC_VIRTUAL_TABLE",
				"virtualTable": vt,
			}
			entries = append(entries, entry)
		}

		if attr := tables[i].WMSTAttribute; attr != nil {
			di := map[string]interface{}{
				"enabled":             true,
				"attribute":           *attr,
				"presentation":        "CONTINUOUS_INTERVAL",
				"units":               "ISO8601",
				"nearestMatchEnabled": false,
				"defaultValue": map[string]string{
					"strategy":       "FIXED",
					"referenceValue": "PT1M/PRESENT",
				},
			}
			if endAttr := tables[i].WMSTEndAttribute; endAttr != nil {
				di["endAttribute"] = *endAttr
			}
			entry := map[string]interface{}{
				"@key":          "time",
				"dimensionInfo": di,
			}
			entries = append(entries, entry)
		}

		if len(entries) > 0 {
			ft["metadata"] = map[string]interface{}{
				"entry": entries,
			}
		}

		doc := map[string]interface{}{
			"featureType": ft,
		}

		req, err = http.NewRequest(
			http.MethodPost,
			datastoreURL+"/featuretypes",
			toStream(doc))
		if err != nil {
			return err
		}
		asJSON(req)
		auth(req)

		resp, err := http.DefaultClient.Do(req)
		if err != nil {
			return err
		}

		if resp.StatusCode != http.StatusCreated {
			return fmt.Errorf("status code '%s' (%d)",
				http.StatusText(resp.StatusCode),
				resp.StatusCode)
		}
	}

	return nil
}

func ensureLayerGroups() error {

	groups := models.InternalServices.LayerGroups()
	if len(groups) == 0 {
		log.Infoln("info: no groups layers to publish")
		return nil
	}

	log.Infof("number of layer groups to publish %d\n", len(groups))

	var (
		geoURL   = config.GeoServerURL()
		user     = config.GeoServerUser()
		password = config.GeoServerPassword()
		auth     = basicAuth(user, password)
	)

	type layerGroups struct {
		LayerGroups struct {
			LayerGroup []struct {
				Name string `json:"name"`
			} `json:"layerGroup"`
		} `json:"layerGroups"`
	}

	var lg layerGroups

	hasLayerGroup := func(name string) bool {
		for i := range lg.LayerGroups.LayerGroup {
			if lg.LayerGroups.LayerGroup[i].Name == name {
				return true
			}
		}
		return false
	}

	layerGroupsURL := geoURL + "/rest/workspaces/" + workspaceName +
		"/layergroups"

	// Fetch all layer groups.
	req, err := http.NewRequest(
		http.MethodGet,
		layerGroupsURL+".json",
		nil)
	if err != nil {
		return err
	}
	auth(req)
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return err
	}

	// XXX: ignore the error
	_ = json.NewDecoder(resp.Body).Decode(&lg)
	resp.Body.Close()

	var already []string

	defer func() {
		if len(already) > 0 {
			log.Infof("already having layer groups: %s\n",
				strings.Join(already, ", "))
		}
	}()

	for i := range groups {
		if hasLayerGroup(groups[i].Name) {
			already = append(already, groups[i].Name)
			continue
		}

		log.Infof("creating layer group %s.\n", groups[i].Name)

		type (
			Layers struct {
				Layer []string `xml:"layer"`
			}
			Styles struct {
				Style []string `xml:"style"`
			}
			LayerGroup struct {
				XMLName xml.Name `xml:"layerGroup"`
				Name    string   `xml:"name"`
				Title   string   `xml:"title"`
				Layers  Layers   `xml:"layers"`
				Styles  Styles   `xml:"styles"`
			}
		)

		lgr := LayerGroup{
			Name:  groups[i].Name,
			Title: groups[i].Name,
			Layers: Layers{
				Layer: groups[i].Layers,
			},
			Styles: Styles{
				Style: groups[i].Layers,
			},
		}

		req, err = http.NewRequest(
			http.MethodPost,
			layerGroupsURL+".xml",
			toXMLStream(&lgr))
		if err != nil {
			return err
		}
		asContentType(req, "text/xml")
		auth(req)

		resp, err := http.DefaultClient.Do(req)
		if err != nil {
			return err
		}

		if resp.StatusCode != http.StatusCreated {
			return fmt.Errorf("status code '%s' (%d)",
				http.StatusText(resp.StatusCode),
				resp.StatusCode)
		}
	}

	return nil
}

func deleteWorkspace() error {

	// Should we delete our workspace first?
	if !config.GeoServerClean() {
		return nil
	}

	log.Infoln("info: delete workspace " + workspaceName)
	var (
		geoURL   = config.GeoServerURL()
		user     = config.GeoServerUser()
		password = config.GeoServerPassword()
		auth     = basicAuth(user, password)
	)

	req, err := http.NewRequest(
		http.MethodDelete,
		geoURL+"/rest/workspaces/"+workspaceName+"?recurse=true",
		nil)
	if err != nil {
		return err
	}
	auth(req)
	_, err = http.DefaultClient.Do(req)
	return err
}

type styles struct {
	Styles struct {
		Style []struct {
			Name string `json:"name"`
		} `json:"style"`
	} `json:"styles"`
}

func (s *styles) hasStyle(name string) bool {
	for i := range s.Styles.Style {
		if s.Styles.Style[i].Name == name {
			return true
		}
	}
	return false
}

func (s *styles) load() error {
	var (
		geoURL   = config.GeoServerURL()
		user     = config.GeoServerUser()
		password = config.GeoServerPassword()
		auth     = basicAuth(user, password)
	)

	req, err := http.NewRequest(
		http.MethodGet,
		geoURL+"/rest/workspaces/"+workspaceName+"/styles.json",
		nil)
	if err != nil {
		return err
	}
	auth(req)
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return err
	}
	defer resp.Body.Close()

	// Fetch all styles
	// XXX: Avoid error checking due to quirks with featuretypes.
	json.NewDecoder(resp.Body).Decode(s)
	return nil
}

var (
	stylePreprocessorsMu sync.Mutex
	stylePreprocessors   = map[string]func(string) (string, error){}
)

// RegisterStylePreprocessor stores a textual pre-processor
// for a given style to be applied when the value of the
// style changes.
func RegisterStylePreprocessor(name string, processor func(string) (string, error)) {
	stylePreprocessorsMu.Lock()
	defer stylePreprocessorsMu.Unlock()
	stylePreprocessors[name] = processor
}

// FindStylePreprocessor searches for a registed textual pre-processor
// associated with a given style name.
func FindStylePreprocessor(name string) func(string) (string, error) {
	stylePreprocessorsMu.Lock()
	defer stylePreprocessorsMu.Unlock()
	return stylePreprocessors[name]
}

func isZip(data []byte) bool {
	if len(data) == 0 {
		return false
	}
	_, err := zip.NewReader(bytes.NewReader(data), int64(len(data)))
	return err == nil
}

func updateStyle(entry *models.IntEntry, create bool) error {

	log.Infof("creating style %s\n", entry.Name)

	// Try to load the style data.
	binary, err := entry.LoadStyle()
	if err != nil {
		return err
	}

	zip := isZip(binary)

	if !zip { // We only support templating for plain XML styles.
		if processor := FindStylePreprocessor(entry.Name); processor != nil {
			data, err := processor(string(binary))
			if err != nil {
				return err
			}
			binary = []byte(data)
		}
	}

	var (
		geoURL   = config.GeoServerURL()
		user     = config.GeoServerUser()
		password = config.GeoServerPassword()
		auth     = basicAuth(user, password)
	)

	styleURL := geoURL + "/rest/workspaces/" + workspaceName +
		"/styles"

	// First create style

	type Style struct {
		Name     string `json:"name"`
		Filename string `json:"filename"`
	}

	var styleFilename = struct {
		Style Style `json:"style"`
	}{
		Style: Style{
			Name:     entry.Name,
			Filename: entry.Name + ".sld",
		},
	}

	if create {
		req, err := http.NewRequest(
			http.MethodPost,
			styleURL,
			toStream(&styleFilename))

		if err != nil {
			return err
		}
		auth(req)
		asJSON(req)
		resp, err := http.DefaultClient.Do(req)
		if err != nil {
			return err
		}

		if resp.StatusCode != http.StatusCreated {
			return fmt.Errorf("unable to create style %s (%s)",
				entry.Name,
				http.StatusText(resp.StatusCode))
		}
	}

	// Second upload data

	req, err := http.NewRequest(
		http.MethodPut,
		styleURL+"/"+url.PathEscape(entry.Name),
		bytes.NewReader(binary))
	if err != nil {
		return err
	}
	auth(req)
	switch {
	case zip:
		asContentType(req, "application/zip")
	case isSymbologyEncoding(binary):
		asContentType(req, "application/vnd.ogc.se+xml")
	default:
		asContentType(req, "application/vnd.ogc.sld+xml")
	}
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return err
	}

	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("cannot upload style %s (%s)",
			entry.Name, http.StatusText(resp.StatusCode))
	}

	// Third associate with layer

	req, err = http.NewRequest(
		http.MethodPost,
		geoURL+"/rest/layers/"+
			url.PathEscape(workspaceName+":"+entry.Name)+
			"/styles?default=true",
		toStream(&styleFilename))
	if err != nil {
		return err
	}
	auth(req)
	asJSON(req)

	resp, err = http.DefaultClient.Do(req)
	if err != nil {
		return err
	}

	if resp.StatusCode != http.StatusCreated {
		return fmt.Errorf("cannot connect style %s with layer (%s)",
			entry.Name, http.StatusText(resp.StatusCode))
	}

	return nil
}

// isSymbologyEncoding tries to figure out if its plain SLD or SE.
func isSymbologyEncoding(data []byte) bool {
	decoder := xml.NewDecoder(bytes.NewReader(data))
	decoder.CharsetReader = charset.NewReaderLabel

	for {
		tok, err := decoder.Token()
		switch {
		case tok == nil && err == io.EOF:
			return false
		case err != nil:
			log.Warnf("invalid XML: %v\n", err)
			return false
		}
		if t, ok := tok.(xml.StartElement); ok &&
			t.Name.Space == "http://www.opengis.net/se" {
			return true
		}
	}
}

func ensureStyles() error {
	log.Infoln("creating styles")

	var stls styles
	if err := stls.load(); err != nil {
		return err
	}

	entries := models.InternalServices.Filter(
		models.IntAnd(
			models.IntWMS,
			models.IntWithStyle))

	var already []string

	defer func() {
		if len(already) > 0 {
			log.Infof("already having styles: %s\n",
				strings.Join(already, ", "))
		}
	}()

	for i := range entries {
		entry := &entries[i]
		if stls.hasStyle(entry.Name) {
			already = append(already, entry.Name)
			continue
		}
		if err := updateStyle(entry, true); err != nil {
			return err
		}
	}

	return nil
}

// PrepareGeoServer sets up the GeoServer to work together with the gemma server.
// It sets up a workspace, a datastore and exposes the features and styles.
func PrepareGeoServer() error {

	if config.DBUser() == "" {
		log.Infoln("need metamorphic db user to configure GeoServer")
		return nil
	}

	if config.GeoServerURL() == "" {
		log.Infoln("no URL to GeoServer configured")
		return nil
	}

	for _, ensure := range []func() error{
		deleteWorkspace,
		ensureWorkspace,
		ensureDataStore,
		ensureFeatures,
		ensureLayerGroups,
		ensureStyles,
	} {
		if err := ensure(); err != nil {
			return err
		}
	}

	return nil
}