view pkg/imports/config.go @ 5670:b75d0b303328

Various fixes and improvements of gauges import: - Allow update of erased data (and thereby set erased to false) - Fix source_organization to work with ERDMS2 - Give ISRS of new and updated gauges in summary - Fixed reference of null pointers if revlevels are missing - Fixed reference of null pointer on update errors - Added ISRS to reference_code warning
author Sascha Wilde <wilde@sha-bang.de>
date Fri, 08 Dec 2023 17:29:56 +0100
parents 51e90370eced
children 6270951dda28
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2018 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"encoding/json"
	"fmt"
	"sort"

	"gemma.intevation.de/gemma/pkg/auth"
	"gemma.intevation.de/gemma/pkg/common"
)

type (
	// ImportKind is a string which has to be one
	// of the registered import types.
	ImportKind string

	// ImportConfigIn is used to de-serialize JSON
	// configurations coming from the REST endpoints
	// to be stored in the database.
	ImportConfigIn struct {
		Kind   ImportKind      `json:"kind"`
		Config json.RawMessage `json:"config"`
	}

	// ImportConfigOut is used to serialize
	// JSON versions to the REST endpoints
	// which are stored in the database.
	ImportConfigOut struct {
		ID     int64       `json:"id"`
		Kind   ImportKind  `json:"kind"`
		User   string      `json:"user"`
		Config interface{} `json:"config,omitempty"`
	}

	// PersistentConfig is the the in-memory
	// representation of a configuration
	// which is stored in the database.
	PersistentConfig struct {
		ID         int64
		User       string
		Kind       string
		Attributes common.Attributes
	}
)

// UnmarshalJSON checks if the incoming string
// is a registered import type.
func (ik *ImportKind) UnmarshalJSON(data []byte) error {
	var s string
	if err := json.Unmarshal(data, &s); err != nil {
		return err
	}

	if !HasImportKindName(s) {
		return fmt.Errorf("unknown kind '%s'", s)
	}

	*ik = ImportKind(s)

	return nil
}

const (
	configUser = "sys_admin"

	loadPersistentConfigSQL = `
SELECT
  username,
  kind
FROM import.import_configuration
WHERE id = $1`

	loadPersistentConfigAttributesSQL = `
SELECT k, v
FROM import.import_configuration_attributes
WHERE import_configuration_id = $1`

	hasImportConfigurationSQL = `
SELECT true FROM import.import_configuration
WHERE id = $1`

	deleteImportConfiguationAttributesSQL = `
DELETE FROM import.import_configuration_attributes
WHERE import_configuration_id = $1`

	deleteImportConfiguationSQL = `
DELETE FROM import.import_configuration
WHERE id = $1`

	updateImportConfigurationSQL = `
UPDATE import.import_configuration SET
  username = $2,
  kind = $3
WHERE id = $1`

	selectImportConfigurationsByID = `
SELECT
  c.id AS id,
  username,
  kind,
  a.k,
  a.v
FROM import.import_configuration c LEFT JOIN
  import.import_configuration_attributes a
  ON c.id = a.import_configuration_id
ORDER by c.id`

	insertImportConfigurationSQL = `
INSERT INTO import.import_configuration
(username, kind)
VALUES ($1, $2)
RETURNING id`

	insertImportConfigurationAttributeSQL = `
INSERT INTO import.import_configuration_attributes
(import_configuration_id, k, v)
VALUES ($1, $2, $3)`
)

// UpdateContext stores an updated configuration to the database.
func (pc *PersistentConfig) UpdateContext(ctx context.Context, tx *sql.Tx) error {
	if _, err := tx.ExecContext(
		ctx,
		updateImportConfigurationSQL,
		pc.ID, pc.User, pc.Kind,
	); err != nil {
		return err
	}
	if _, err := tx.ExecContext(
		ctx,
		deleteImportConfiguationAttributesSQL,
		pc.ID,
	); err != nil {
		return err
	}
	return storeConfigAttributes(ctx, tx, pc.ID, pc.Attributes)
}

// LoadPersistentConfigContext loads a configuration from the database.
func LoadPersistentConfigContext(
	ctx context.Context,
	conn *sql.Conn,
	id int64,
) (*PersistentConfig, error) {

	cfg := &PersistentConfig{ID: id}

	err := conn.QueryRowContext(ctx, loadPersistentConfigSQL, id).Scan(
		&cfg.User,
		&cfg.Kind,
	)

	switch {
	case err == sql.ErrNoRows:
		return nil, nil
	case err != nil:
		return nil, err
	}

	// load the extra attributes.
	rows, err := conn.QueryContext(ctx, loadPersistentConfigAttributesSQL, id)
	if err != nil {
		return nil, err
	}
	defer rows.Close()
	var attributes common.Attributes
	for rows.Next() {
		var k, v string
		if err = rows.Scan(&k, &v); err != nil {
			return nil, err
		}
		if attributes == nil {
			attributes = common.Attributes{}
		}
		attributes[k] = v
	}
	if err = rows.Err(); err != nil {
		return nil, err
	}
	if len(attributes) > 0 {
		cfg.Attributes = attributes
	}
	return cfg, nil
}

func loadPersistentConfig(id int64) (*PersistentConfig, error) {
	return loadPersistentConfigContext(context.Background(), id)
}

func loadPersistentConfigContext(ctx context.Context, id int64) (*PersistentConfig, error) {
	var cfg *PersistentConfig
	err := auth.RunAs(ctx, configUser, func(conn *sql.Conn) error {
		var err error
		cfg, err = LoadPersistentConfigContext(ctx, conn, id)
		return err
	})
	return cfg, err
}

// ListAllPersistentConfigurationsContext iterates over all
// configurations stored in the database and reports every
// particular one to the callback fn.
func ListAllPersistentConfigurationsContext(
	ctx context.Context,
	conn *sql.Conn,
	fn func(*ImportConfigOut) error,
) error {

	rows, err := conn.QueryContext(ctx, selectImportConfigurationsByID)
	if err != nil {
		return err
	}
	defer rows.Close()

	var (
		first = true
		pc    PersistentConfig
	)

	send := func() error {
		kind := JobKind(pc.Kind)
		ctor := ImportModelForJobKind(kind)
		if ctor == nil {
			return fmt.Errorf("unable to deserialize kind '%s'", pc.Kind)
		}
		config := ctor()
		pc.Attributes.Unmarshal(config)
		return fn(&ImportConfigOut{
			ID:     pc.ID,
			Kind:   ImportKind(pc.Kind),
			User:   pc.User,
			Config: config,
		})
	}

	for rows.Next() {
		var (
			id         int64
			user, kind string
			k, v       sql.NullString
		)
		if err := rows.Scan(
			&id,
			&user,
			&kind,
			&k, &v,
		); err != nil {
			return err
		}
		if !first {
			if pc.ID != id {
				if err := send(); err != nil {
					return err
				}
				pc.ID = id
				pc.User = user
				pc.Kind = kind
				pc.Attributes = nil
			}
		} else {
			first = false
			pc.ID = id
			pc.User = user
			pc.Kind = kind
		}

		if k.Valid && v.Valid {
			if pc.Attributes == nil {
				pc.Attributes = common.Attributes{}
			}
			// Prevent sending the `password` back to the client.
			// (See importconfig.infoImportConfig() for the other place
			//  where this is done.)
			if k.String != "password" {
				pc.Attributes.Set(k.String, v.String)
			}
		}
	}

	if err := rows.Err(); err != nil {
		return err
	}

	err = nil
	if !first {
		err = send()
	}
	return err
}

// DeletePersistentConfigurationContext deletes
// a configuration from the database identified by
// its id.
func DeletePersistentConfigurationContext(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
) error {
	var found bool
	if err := tx.QueryRowContext(
		ctx,
		hasImportConfigurationSQL,
		id,
	).Scan(&found); err != nil {
		return err
	}
	if !found {
		return sql.ErrNoRows
	}
	if _, err := tx.ExecContext(
		ctx,
		deleteImportConfiguationAttributesSQL,
		id,
	); err != nil {
		return nil
	}
	_, err := tx.ExecContext(
		ctx,
		deleteImportConfiguationSQL,
		id,
	)
	return err
}

func storeConfigAttributes(
	ctx context.Context,
	tx *sql.Tx,
	id int64,
	attrs common.Attributes,
) error {
	if len(attrs) == 0 {
		return nil
	}
	attrStmt, err := tx.PrepareContext(ctx, insertImportConfigurationAttributeSQL)
	if err != nil {
		return err
	}
	defer attrStmt.Close()
	// Sort to make it deterministic
	keys := make([]string, len(attrs))
	i := 0
	for key := range attrs {
		keys[i] = key
		i++
	}
	sort.Strings(keys)
	for _, key := range keys {
		if _, err := attrStmt.ExecContext(ctx, id, key, attrs[key]); err != nil {
			return err
		}
	}
	return nil
}

// StoreContext stores a configuration to the database and returns
// its new database id.
func (pc *PersistentConfig) StoreContext(ctx context.Context, tx *sql.Tx) (int64, error) {
	var id int64
	if err := tx.QueryRowContext(
		ctx,
		insertImportConfigurationSQL,
		pc.User,
		pc.Kind,
	).Scan(&id); err != nil {
		return 0, err
	}

	if err := storeConfigAttributes(ctx, tx, id, pc.Attributes); err != nil {
		return 0, err
	}
	pc.ID = id
	return id, nil
}