view pkg/geoserver/boot.go @ 4936:21a48e2d2260 fairway-marks-import

Create group layers in GeoServer via REST calls.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Mon, 17 Feb 2020 00:45:21 +0100
parents c64dba002726
children 40da1b8aba01
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package geoserver

import (
	"archive/zip"
	"bytes"
	"encoding/json"
	"encoding/xml"
	"fmt"
	"io"
	"log"
	"net/http"
	"net/url"
	"strings"
	"sync"

	"golang.org/x/net/html/charset"

	"gemma.intevation.de/gemma/pkg/config"
	"gemma.intevation.de/gemma/pkg/models"
)

const (
	workspaceName         = "gemma"
	datastoreName         = "gemma"
	databaseType          = "postgis"
	primaryKeyMetadataTbl = "waterway.gt_pk_metadata"
)

const (
	startupSQL = `SELECT public.setrole('${user,'||encode('waterway_user', 'hex')||'}')`
	closeupSQL = `RESET ROLE`
)

func basicAuth(user, password string) func(req *http.Request) {
	return func(req *http.Request) {
		req.SetBasicAuth(user, password)
	}
}

func toStream(x interface{}) io.Reader {
	var buf bytes.Buffer

	if err := json.NewEncoder(&buf).Encode(x); err != nil {
		// Should not happen
		log.Printf("warn: bad JSON: %v\n", err)
	}
	return bytes.NewReader(buf.Bytes())
}

// XXX: Creating SQL views with JSON via GeoServer REST-API fails
// Begin code for handling with XML instead
func toXMLStream(x interface{}) io.Reader {
	buf := bytes.NewBufferString(xml.Header)
	if err := xml.NewEncoder(buf).Encode(x); err != nil {
		// Should not happen
		log.Printf("warn: bad XML: %v\n", err)
	}
	return bytes.NewReader(buf.Bytes())
}

type ftXML struct {
	XMLName  xml.Name `xml:"featureType"`
	Name     string   `xml:"name"`
	Title    string   `xml:"title"`
	SRS      *string  `xml:"srs,omitempty"`
	Metadata ftMetadata
}

type ftMetadata struct {
	XMLName xml.Name `xml:"metadata"`
	Entry   ftMetadataEntry
}

type ftMetadataEntry struct {
	XMLName   xml.Name `xml:"entry"`
	Key       string   `xml:"key,attr"`
	VirtTable ftVirtTable
}

type ftVirtTable struct {
	XMLName   xml.Name `xml:"virtualTable"`
	Name      string   `xml:"name"`
	SQL       string   `xml:"sql"`
	KeyColumn *string  `xml:"keyColumn,omitempty"`
} // End code for handling with XML

func asJSON(req *http.Request) {
	req.Header.Set("Content-Type", "application/json")
}

func asContentType(req *http.Request, contentType string) {
	req.Header.Set("Content-Type", contentType)
}

func ensureWorkspace() error {
	var (
		geoURL   = config.GeoServerURL()
		user     = config.GeoServerUser()
		password = config.GeoServerPassword()
		auth     = basicAuth(user, password)
	)

	// Probe  workspace.
	req, err := http.NewRequest(
		http.MethodGet,
		geoURL+"/rest/workspaces/"+workspaceName+".json?quietOnNotFound=true",
		nil)
	if err != nil {
		return err
	}
	auth(req)
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return err
	}

	if resp.StatusCode != http.StatusNotFound {
		log.Println("info: workspace " + workspaceName + " already exists.")
		return nil
	}

	// Create workspace

	log.Println("info: creating workspace " + workspaceName)

	const createJSON = `{"workspace":{"name":"` + workspaceName + `"}}`

	req, err = http.NewRequest(
		http.MethodPost,
		geoURL+"/rest/workspaces",
		strings.NewReader(createJSON))
	if err != nil {
		return err
	}
	auth(req)
	asJSON(req)
	if resp, err = http.DefaultClient.Do(req); err != nil {
		return err
	}

	if resp.StatusCode != http.StatusCreated {
		err = fmt.Errorf("status code '%s' (%d)",
			http.StatusText(resp.StatusCode),
			resp.StatusCode)
	}

	return err
}

func ensureDataStore() error {
	var (
		geoURL   = config.GeoServerURL()
		user     = config.GeoServerUser()
		password = config.GeoServerPassword()
		auth     = basicAuth(user, password)
	)

	// Probe datastore.
	req, err := http.NewRequest(
		http.MethodGet,
		geoURL+"/rest/workspaces/"+workspaceName+
			"/datastores/"+datastoreName+".json?quietOnNotFound=true",
		nil)
	if err != nil {
		return err
	}
	auth(req)
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return err
	}

	if resp.StatusCode != http.StatusNotFound {
		log.Println("info: datastore " + datastoreName + " already exists.")
		return nil
	}

	// Create datastore.
	log.Println("info: creating datastore " + datastoreName)

	type entry struct {
		Key   interface{} `json:"@key"`
		Value interface{} `json:"$"`
	}

	// Create datastore.
	ds := map[string]interface{}{
		"dataStore": map[string]interface{}{
			"name": datastoreName,
			"connectionParameters": map[string]interface{}{
				"entry": []entry{
					{"host", config.DBHost()},
					{"port", config.DBPort()},
					{"database", config.DBName()},
					{"schema", models.DatabaseScheme},
					{"user", config.DBUser()},
					{"passwd", config.DBPassword()},
					{"dbtype", databaseType},
					{"Primary key metadata table", primaryKeyMetadataTbl},
					{"Expose primary keys", true},
					{"Session startup SQL",
						config.GeoServerStartupSQL() + ";" + startupSQL},
					{"Session close-up SQL", closeupSQL},
					{"validate connections", true},
					{"Estimated extends", false},
				},
			},
		},
	}

	req, err = http.NewRequest(
		http.MethodPost,
		geoURL+"/rest/workspaces/"+workspaceName+"/datastores",
		toStream(ds))
	if err != nil {
		return err
	}
	auth(req)
	asJSON(req)
	resp, err = http.DefaultClient.Do(req)
	if err != nil {
		return err
	}

	if resp.StatusCode != http.StatusCreated {
		err = fmt.Errorf("status code '%s' (%d)",
			http.StatusText(resp.StatusCode),
			resp.StatusCode)
	}

	return err
}

func ensureFeatures() error {
	tables := models.InternalServices.Filter(models.IntWFS)
	if len(tables) == 0 {
		log.Println("info: no tables to publish")
		return nil
	}

	log.Printf("info: number of tables to publish %d\n", len(tables))

	var (
		geoURL   = config.GeoServerURL()
		user     = config.GeoServerUser()
		password = config.GeoServerPassword()
		auth     = basicAuth(user, password)
	)

	datastoreURL := geoURL + "/rest/workspaces/" + workspaceName +
		"/datastores/" + datastoreName

	var features struct {
		FeatureTypes struct {
			FeatureType []struct {
				Name string `json:"name"`
			} `json:"featureType"`
		} `json:"featureTypes"`
	}

	hasFeature := func(name string) bool {
		for _, ft := range features.FeatureTypes.FeatureType {
			if ft.Name == name {
				return true
			}
		}
		return false
	}

	// Fetch all featuretypes.
	req, err := http.NewRequest(
		http.MethodGet,
		datastoreURL+"/featuretypes.json",
		nil)
	if err != nil {
		return err
	}
	auth(req)
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return err
	}

	err = json.NewDecoder(resp.Body).Decode(&features)
	resp.Body.Close()
	if err != nil {
		// XXX: Quirk in the JSON return by GeoServer:
		// If there are no features in the datastore
		// featureType deserializes to an empty string ""
		// instead of an empty array *palmface*.
		// So assume there no features.
		hasFeature = func(string) bool { return false }
	}

	var already []string

	defer func() {
		if len(already) > 0 {
			log.Printf("info: already having featuretypes: %s\n",
				strings.Join(already, ", "))
		}
	}()

	for i := range tables {
		table := tables[i].Name

		if hasFeature(table) {
			already = append(already, table)
			continue
		}

		// Create featuretype.
		log.Printf("info: creating featuretype %s.\n", table)

		var req *http.Request
		if models.IntSQLView(tables[i]) {
			// XXX: Creating SQL views with JSON via GeoServer REST-API fails
			// Begin code for handling with XML instead
			ft := ftXML{
				Name:  table,
				Title: table,
				SRS:   tables[i].SRS,
				Metadata: ftMetadata{
					Entry: ftMetadataEntry{
						Key: "JDBC_VIRTUAL_TABLE",
						VirtTable: ftVirtTable{
							Name:      table,
							SQL:       *tables[i].SQL,
							KeyColumn: tables[i].KeyColumn}}}}

			req, err = http.NewRequest(
				http.MethodPost,
				datastoreURL+"/featuretypes",
				toXMLStream(ft))
			if err != nil {
				return err
			}
			asContentType(req, "text/xml")
			// End code for handling with XML instead
		} else {
			ft := map[string]interface{}{
				"featureType": map[string]interface{}{
					"name":       table,
					"nativeName": table,
					"title":      table,
				},
			}

			req, err = http.NewRequest(
				http.MethodPost,
				datastoreURL+"/featuretypes",
				toStream(ft))
			if err != nil {
				return err
			}
			asJSON(req)
		}
		auth(req)

		resp, err := http.DefaultClient.Do(req)
		if err != nil {
			return err
		}

		if resp.StatusCode != http.StatusCreated {
			return fmt.Errorf("status code '%s' (%d)",
				http.StatusText(resp.StatusCode),
				resp.StatusCode)
		}
	}

	return nil
}

func ensureLayerGroups() error {

	groups := models.InternalServices.LayerGroups()
	if len(groups) == 0 {
		log.Println("info: no groups layers to publish")
		return nil
	}

	log.Printf("info: number of layer groups to publish %d\n", len(groups))

	var (
		geoURL   = config.GeoServerURL()
		user     = config.GeoServerUser()
		password = config.GeoServerPassword()
		auth     = basicAuth(user, password)
	)

	type layerGroups struct {
		LayerGroups struct {
			LayerGroup []struct {
				Name string `json:"name"`
			} `json:"layerGroup"`
		} `json:"layerGroups"`
	}

	var lg layerGroups

	hasLayerGroup := func(name string) bool {
		for i := range lg.LayerGroups.LayerGroup {
			if lg.LayerGroups.LayerGroup[i].Name == name {
				return true
			}
		}
		return false
	}

	layerGroupsURL := geoURL + "/rest/workspaces/" + workspaceName +
		"/layergroups"

	// Fetch all layer groups.
	req, err := http.NewRequest(
		http.MethodGet,
		layerGroupsURL+".json",
		nil)
	if err != nil {
		return err
	}
	auth(req)
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return err
	}

	err = json.NewDecoder(resp.Body).Decode(&lg)
	resp.Body.Close()
	if err != nil {
		// XXX: ignore this error.
	}

	var already []string

	defer func() {
		if len(already) > 0 {
			log.Printf("info: already having layer groups: %s\n",
				strings.Join(already, ", "))
		}
	}()

	for i := range groups {
		if hasLayerGroup(groups[i].Name) {
			already = append(already, groups[i].Name)
			continue
		}

		type (
			Layers struct {
				Layer []string `xml:"layer"`
			}
			Styles struct {
				Style []string `xml:"style"`
			}
			LayerGroup struct {
				XMLName xml.Name `xml:"layerGroup"`
				Name    string   `xml:"name"`
				Title   string   `xml:"title"`
				Layers  Layers   `xml:"layers"`
				Styles  Styles   `xml:"styles"`
			}
		)

		lgr := LayerGroup{
			Name:  groups[i].Name,
			Title: groups[i].Name,
			Layers: Layers{
				Layer: groups[i].Layers,
			},
			Styles: Styles{
				Style: groups[i].Layers,
			},
		}

		req, err = http.NewRequest(
			http.MethodPost,
			layerGroupsURL+".xml",
			toXMLStream(&lgr))
		if err != nil {
			return err
		}
		asContentType(req, "text/xml")
		auth(req)

		resp, err := http.DefaultClient.Do(req)
		if err != nil {
			return err
		}

		if resp.StatusCode != http.StatusCreated {
			return fmt.Errorf("status code '%s' (%d)",
				http.StatusText(resp.StatusCode),
				resp.StatusCode)
		}
	}

	return nil
}

func deleteWorkspace() error {

	// Should we delete our workspace first?
	if !config.GeoServerClean() {
		return nil
	}

	log.Println("info: delete workspace " + workspaceName)
	var (
		geoURL   = config.GeoServerURL()
		user     = config.GeoServerUser()
		password = config.GeoServerPassword()
		auth     = basicAuth(user, password)
	)

	req, err := http.NewRequest(
		http.MethodDelete,
		geoURL+"/rest/workspaces/"+workspaceName+"?recurse=true",
		nil)
	if err != nil {
		return err
	}
	auth(req)
	_, err = http.DefaultClient.Do(req)
	return err
}

type styles struct {
	Styles struct {
		Style []struct {
			Name string `json:"name"`
		} `json:"style"`
	} `json:"styles"`
}

func (s *styles) hasStyle(name string) bool {
	for i := range s.Styles.Style {
		if s.Styles.Style[i].Name == name {
			return true
		}
	}
	return false
}

func (s *styles) load() error {
	var (
		geoURL   = config.GeoServerURL()
		user     = config.GeoServerUser()
		password = config.GeoServerPassword()
		auth     = basicAuth(user, password)
	)

	req, err := http.NewRequest(
		http.MethodGet,
		geoURL+"/rest/workspaces/"+workspaceName+"/styles.json",
		nil)
	if err != nil {
		return err
	}
	auth(req)
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return err
	}
	defer resp.Body.Close()

	// Fetch all styles
	// XXX: Avoid error checking due to quirks with featuretypes.
	json.NewDecoder(resp.Body).Decode(s)
	return nil
}

var (
	stylePreprocessorsMu sync.Mutex
	stylePreprocessors   = map[string]func(string) (string, error){}
)

// RegisterStylePreprocessor stores a textual pre-processor
// for a given style to be applied when the value of the
// style changes.
func RegisterStylePreprocessor(name string, processor func(string) (string, error)) {
	stylePreprocessorsMu.Lock()
	defer stylePreprocessorsMu.Unlock()
	stylePreprocessors[name] = processor
}

// FindStylePreprocessor searches for a registed textual pre-processor
// associated with a given style name.
func FindStylePreprocessor(name string) func(string) (string, error) {
	stylePreprocessorsMu.Lock()
	defer stylePreprocessorsMu.Unlock()
	return stylePreprocessors[name]
}

func isZip(data []byte) bool {
	if len(data) == 0 {
		return false
	}
	_, err := zip.NewReader(bytes.NewReader(data), int64(len(data)))
	return err == nil
}

func updateStyle(entry *models.IntEntry, create bool) error {

	log.Printf("info: creating style %s\n", entry.Name)

	// Try to load the style data.
	binary, err := entry.LoadStyle()
	if err != nil {
		return err
	}

	zip := isZip(binary)

	if !zip { // We only support templating for plain XML styles.
		if processor := FindStylePreprocessor(entry.Name); processor != nil {
			data, err := processor(string(binary))
			if err != nil {
				return err
			}
			binary = []byte(data)
		}
	}

	var (
		geoURL   = config.GeoServerURL()
		user     = config.GeoServerUser()
		password = config.GeoServerPassword()
		auth     = basicAuth(user, password)
	)

	styleURL := geoURL + "/rest/workspaces/" + workspaceName +
		"/styles"

	// First create style

	type Style struct {
		Name     string `json:"name"`
		Filename string `json:"filename"`
	}

	var styleFilename = struct {
		Style Style `json:"style"`
	}{
		Style: Style{
			Name:     entry.Name,
			Filename: entry.Name + ".sld",
		},
	}

	if create {
		req, err := http.NewRequest(
			http.MethodPost,
			styleURL,
			toStream(&styleFilename))

		if err != nil {
			return err
		}
		auth(req)
		asJSON(req)
		resp, err := http.DefaultClient.Do(req)
		if err != nil {
			return err
		}

		if resp.StatusCode != http.StatusCreated {
			return fmt.Errorf("unable to create style %s (%s)",
				entry.Name,
				http.StatusText(resp.StatusCode))
		}
	}

	// Second upload data

	req, err := http.NewRequest(
		http.MethodPut,
		styleURL+"/"+url.PathEscape(entry.Name),
		bytes.NewReader(binary))
	if err != nil {
		return err
	}
	auth(req)
	switch {
	case zip:
		asContentType(req, "application/zip")
	case isSymbologyEncoding(binary):
		asContentType(req, "application/vnd.ogc.se+xml")
	default:
		asContentType(req, "application/vnd.ogc.sld+xml")
	}
	resp, err := http.DefaultClient.Do(req)
	if err != nil {
		return err
	}

	if resp.StatusCode != http.StatusOK {
		return fmt.Errorf("cannot upload style %s (%s)",
			entry.Name, http.StatusText(resp.StatusCode))
	}

	// Third associate with layer

	req, err = http.NewRequest(
		http.MethodPost,
		geoURL+"/rest/layers/"+
			url.PathEscape(workspaceName+":"+entry.Name)+
			"/styles?default=true",
		toStream(&styleFilename))
	if err != nil {
		return err
	}
	auth(req)
	asJSON(req)

	resp, err = http.DefaultClient.Do(req)
	if err != nil {
		return err
	}

	if resp.StatusCode != http.StatusCreated {
		return fmt.Errorf("cannot connect style %s with layer (%s)",
			entry.Name, http.StatusText(resp.StatusCode))
	}

	return nil
}

// isSymbologyEncoding tries to figure out if its plain SLD or SE.
func isSymbologyEncoding(data []byte) bool {
	decoder := xml.NewDecoder(bytes.NewReader(data))
	decoder.CharsetReader = charset.NewReaderLabel

	for {
		tok, err := decoder.Token()
		switch {
		case tok == nil && err == io.EOF:
			return false
		case err != nil:
			log.Printf("warn: invalid XML: %v\n", err)
			return false
		}
		if t, ok := tok.(xml.StartElement); ok &&
			t.Name.Space == "http://www.opengis.net/se" {
			return true
		}
	}
}

func ensureStyles() error {
	log.Println("info: creating styles")

	var stls styles
	if err := stls.load(); err != nil {
		return err
	}

	entries := models.InternalServices.Filter(
		models.IntAnd(
			models.IntWMS,
			models.IntWithStyle))

	var already []string

	defer func() {
		if len(already) > 0 {
			log.Printf("info: already having styles: %s\n",
				strings.Join(already, ", "))
		}
	}()

	for i := range entries {
		entry := &entries[i]
		if stls.hasStyle(entry.Name) {
			already = append(already, entry.Name)
			continue
		}
		if err := updateStyle(entry, true); err != nil {
			return err
		}
	}

	return nil
}

// PrepareGeoServer sets up the GeoServer to work together with the gemma server.
// It sets up a workspace, a datastore and exposes the features and styles.
func PrepareGeoServer() error {

	if config.DBUser() == "" {
		log.Println("info: Need metamorphic db user to configure GeoServer")
		return nil
	}

	if config.GeoServerURL() == "" {
		log.Println("info: No URL to GeoServer configured")
		return nil
	}

	for _, ensure := range []func() error{
		deleteWorkspace,
		ensureWorkspace,
		ensureDataStore,
		ensureFeatures,
		ensureLayerGroups,
		ensureStyles,
	} {
		if err := ensure(); err != nil {
			return err
		}
	}

	return nil
}