view pkg/controllers/system.go @ 5123:eeb45e3e0a5a queued-stage-done

Added mechanism to have sync import jobs on import queue. Review jobs are now sync with a controller waiting for 20 secs before returning. If all reviews return earlier the controller extists earlier, too. If one or more decisions took longer they are run in background till they are decided and the the controller returns a error message for these imports that the process is st still running.
author Sascha L. Teichmann <sascha.teichmann@intevation.de>
date Thu, 26 Mar 2020 22:24:45 +0100
parents 44b032028e48
children 24156a964eaa
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018, 2019 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha Wilde <sascha.wilde@intevation.de>

package controllers

import (
	"bytes"
	"context"
	"database/sql"
	"fmt"
	"io/ioutil"
	"log"
	"net/http"
	"regexp"
	"strings"
	"sync"
	"time"

	"github.com/gorilla/mux"

	"gemma.intevation.de/gemma/pkg/auth"
	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/config"
	"gemma.intevation.de/gemma/pkg/geoserver"
	"gemma.intevation.de/gemma/pkg/imports"
	"gemma.intevation.de/gemma/pkg/models"

	mw "gemma.intevation.de/gemma/pkg/middleware"
)

const (
	getSettingsSQL = `
SELECT config_key, config_val
FROM sys_admin.system_config`

	getConfigSQL = `
SELECT config_val
FROM sys_admin.system_config
WHERE config_key = $1`

	updateSettingSQL = `
INSERT INTO sys_admin.system_config (config_key, config_val)
VALUES ($1, $2)
ON CONFLICT (config_key) DO UPDATE SET config_val = $2`

	deleteSoundingDiffsSQL = `
DELETE FROM caching.sounding_differences`
)

// System status end points

func showSystemLog(req *http.Request) (jr mw.JSONResult, err error) {

	serviceName := mux.Vars(req)["service"]
	fileName := mux.Vars(req)["file"]

	// The following check is currently most likely unnecessary as I wasn't
	// able to inject a verbatim '/' via the middleware, but better be on
	// the safe site...
	if strings.Contains(fileName, "/") {
		err = mw.JSONError{
			Code:    http.StatusBadRequest,
			Message: "error: no slashes allowed in file name",
		}
		return
	}

	var path string

	switch serviceName {
	case "apache2", "postgresql":
		path = "/var/log/" + serviceName + "/" + fileName
	default:
		err = mw.JSONError{
			Code:    http.StatusBadRequest,
			Message: "error: invalid service: " + serviceName,
		}
		return
	}

	var txt []byte

	if txt, err = ioutil.ReadFile(path); err != nil {
		return
	}

	jr = mw.JSONResult{
		Result: struct {
			Path    string `json:"path"`
			Content string `json:"content"`
		}{path, string(txt)},
	}
	return
}

func getSystemConfig(req *http.Request) (jr mw.JSONResult, err error) {

	cfg := config.PublishedConfig()
	if cfg == "" {
		jr = mw.JSONResult{Result: strings.NewReader("{}")}
		return
	}

	var data []byte
	if data, err = ioutil.ReadFile(cfg); err != nil {
		return
	}

	jr = mw.JSONResult{Result: bytes.NewReader(data)}
	return
}

func getSystemSettings(req *http.Request) (jr mw.JSONResult, err error) {

	var rows *sql.Rows
	if rows, err = mw.JSONConn(req).QueryContext(req.Context(), getSettingsSQL); err != nil {
		return
	}
	defer rows.Close()

	settings := map[string]string{}

	for rows.Next() {
		var key, val string
		if err = rows.Scan(&key, &val); err != nil {
			return
		}
		settings[key] = val
	}
	if err = rows.Err(); err != nil {
		return
	}

	jr = mw.JSONResult{Result: settings}
	return
}

type reconfFunc func(sql.NullString, string) (func(*http.Request), error)

var (
	reconfigureFuncsMu sync.Mutex
	reconfigureFuncs   = map[string]reconfFunc{}
)

func registerReconfigureFunc(key string, fn reconfFunc) {
	reconfigureFuncsMu.Lock()
	defer reconfigureFuncsMu.Unlock()
	reconfigureFuncs[key] = fn
}

func reconfigureFunc(key string) reconfFunc {
	reconfigureFuncsMu.Lock()
	defer reconfigureFuncsMu.Unlock()
	return reconfigureFuncs[key]
}

var validColorRe = regexp.MustCompile(`#[[:xdigit:]]{6}`)

func reconfigureWMSLayer(
	old sql.NullString, curr,
	which string,
) (func(*http.Request), error) {

	if !validColorRe.MatchString(curr) {
		return nil, fmt.Errorf("'%v' is not a valid color", curr)
	}

	if !old.Valid || old.String != strings.ToLower(curr) {
		return func(*http.Request) { geoserver.ReconfigureStyle(which) }, nil
	}

	return nil, nil
}

func reconfigureClassBreaks(
	old sql.NullString, curr,
	which string,
	recalc func(*http.Request),
) (func(*http.Request), error) {

	// If new values are broken, don't proceed.
	currCVs, err := models.ParseColorValues(curr)
	if err != nil {
		return nil, err
	}

	doBoth := func(req *http.Request) {
		log.Printf("info: Trigger re-calculation of %s.", which)
		geoserver.ReconfigureStyle(which)
		recalc(req)
	}

	if !old.Valid {
		return doBoth, nil
	}

	oldCVs, err := models.ParseColorValues(old.String)
	if err != nil {
		log.Printf("warn: old config value is broken: %v\n", err)
		return doBoth, nil
	}

	if len(currCVs) != len(oldCVs) {
		return doBoth, nil
	}

	colorChanged := false

	for i := range currCVs {
		if currCVs[i].Value != oldCVs[i].Value {
			return doBoth, nil
		}
		if currCVs[i].Color != oldCVs[i].Color {
			colorChanged = true
		}
	}

	// Only the color changed -> no expensive recalc needed.
	if colorChanged {
		log.Println("info: Only colors changed.")
		return func(*http.Request) { geoserver.ReconfigureStyle(which) }, nil
	}

	return nil, nil
}

func init() {
	registerReconfigureFunc("morphology_classbreaks",
		func(old sql.NullString, curr string) (func(*http.Request), error) {
			return reconfigureClassBreaks(
				old, curr,
				"sounding_results_areas_geoserver",
				func(req *http.Request) {
					if s, ok := auth.GetSession(req); ok {
						triggerSoundingResultsContoursRecalc(s.User, curr)
					}
				})
		})
	registerReconfigureFunc("morphology_classbreaks_compare",
		func(old sql.NullString, curr string) (func(*http.Request), error) {
			return reconfigureClassBreaks(
				old, curr,
				"sounding_differences",
				func(*http.Request) { go deleteSoundingDiffs() })
		})

	reconf := func(which string) func(sql.NullString, string) (func(*http.Request), error) {
		return func(old sql.NullString, curr string) (func(*http.Request), error) {
			return reconfigureWMSLayer(old, curr, which)
		}
	}

	dm := reconf("distance_marks_geoserver")
	registerReconfigureFunc("distance_marks_fill", dm)
	registerReconfigureFunc("distance_marks_stroke", dm)

	dma := reconf("distance_marks_ashore_geoserver")
	registerReconfigureFunc("distance_marks_ashore_fill", dma)
	registerReconfigureFunc("distance_marks_ashore_stroke", dma)

	registerReconfigureFunc("waterway_area_stroke", reconf("waterway_area"))
	registerReconfigureFunc("waterway_axis_stroke", reconf("waterway_axis"))

	// TODO: Add more layers.
}

func triggerSoundingResultsContoursRecalc(who, breaks string) {
	var serialized string
	job := &imports.IsoRefresh{ClassBreaks: breaks}
	serialized, err := common.ToJSONString(job)
	if err != nil {
		log.Printf("error: %v\n", err)
		return
	}
	var jobID int64
	if jobID, err = imports.AddJob(
		imports.ISRJobKind,
		time.Time{},
		nil,
		nil,
		who,
		false,
		serialized,
	); err != nil {
		log.Printf("error: %v\n", err)
		return
	}
	log.Printf(
		"info: Recalculate sounding results contours in job %d.\n",
		jobID)

}

func deleteSoundingDiffs() {
	// TODO: Better do that in import queue?
	ctx := context.Background()

	if err := auth.RunAs(ctx, "sys_admin",
		func(conn *sql.Conn) error {
			_, err := conn.ExecContext(ctx, deleteSoundingDiffsSQL)
			return err
		},
	); err != nil {
		log.Printf("error: Cleaning sounding diffs cache failed: %v\n", err)
	}
}

func setSystemSettings(req *http.Request) (jr mw.JSONResult, err error) {

	settings := mw.JSONInput(req).(*map[string]string)

	ctx := req.Context()
	var tx *sql.Tx
	if tx, err = mw.JSONConn(req).BeginTx(ctx, nil); err != nil {
		return
	}
	defer tx.Rollback()

	var setStmt *sql.Stmt
	if setStmt, err = tx.PrepareContext(ctx, updateSettingSQL); err != nil {
		return
	}
	defer setStmt.Close()
	var getStmt *sql.Stmt
	if getStmt, err = tx.PrepareContext(ctx, getConfigSQL); err != nil {
		return
	}
	defer getStmt.Close()

	reconfigure := map[string]func(*http.Request){}

	for key, value := range *settings {
		var old sql.NullString
		err = getStmt.QueryRowContext(ctx, key).Scan(&old)
		switch {
		case err == sql.ErrNoRows:
			old.Valid, err = false, nil
		case err != nil:
			return
		}

		if cmp := reconfigureFunc(key); cmp != nil {
			var fn func(*http.Request)
			if fn, err = cmp(old, value); err != nil {
				return
			}
			if fn != nil {
				reconfigure[key] = fn
			}
		}

		if _, err = setStmt.ExecContext(ctx, key, value); err != nil {
			return
		}
	}

	if err = tx.Commit(); err != nil {
		return
	}

	for _, fn := range reconfigure {
		fn(req)
	}

	jr = mw.JSONResult{
		Code: http.StatusCreated,
		Result: struct {
			Result string `json:"result"`
		}{"success"},
	}
	return
}