view pkg/imports/statsupdate.go @ 5718:3d497077f888 uploadwg

Implemented direct file upload as alternative import method for WG. For testing and data corrections it is useful to be able to import waterway gauges data directly by uploading a xml file.
author Sascha Wilde <wilde@sha-bang.de>
date Thu, 18 Apr 2024 19:23:19 +0200
parents 6270951dda28
children
line wrap: on
line source

// This is Free Software under GNU Affero General Public License v >= 3.0
// without warranty, see README.md and license for details.
//
// SPDX-License-Identifier: AGPL-3.0-or-later
// License-Filename: LICENSES/AGPL-3.0.txt
//
// Copyright (C) 2021 by via donau
//   – Österreichische Wasserstraßen-Gesellschaft mbH
// Software engineering by Intevation GmbH
//
// Author(s):
//  * Sascha L. Teichmann <sascha.teichmann@intevation.de>

package imports

import (
	"context"
	"database/sql"
	"errors"
	"fmt"
	"time"

	"gemma.intevation.de/gemma/pkg/auth"
	"gemma.intevation.de/gemma/pkg/common"
	"gemma.intevation.de/gemma/pkg/models"
)

// StatsUpdate is job for status updates.
type StatsUpdate struct {
	models.QueueConfigurationType
	Name string `json:"name"`
}

// StatsUpdateJobKind is the unique name of this import job type.
const StatsUpdateJobKind JobKind = "statsupdate"

type statsUpdateJobCreator struct{}

func init() { RegisterJobCreator(StatsUpdateJobKind, statsUpdateJobCreator{}) }

func (statsUpdateJobCreator) Description() string { return "statsupdate" }

func (statsUpdateJobCreator) AutoAccept() bool { return true }

func (statsUpdateJobCreator) Create() Job { return new(StatsUpdate) }

func (statsUpdateJobCreator) Depends() [2][]string { return [2][]string{{}, {}} }

func (statsUpdateJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error {
	return nil
}

// RequiresRoles enforces to be a sys_admin to run this .
func (*StatsUpdate) RequiresRoles() auth.Roles { return auth.Roles{"sys_admin"} }

// Description gives a short info about relevant facts of this import.
func (su *StatsUpdate) Description([]string) (string, error) { return su.Name, nil }

// CleanUp is an empty implementation.
func (*StatsUpdate) CleanUp() error { return nil }

// MarshalAttributes implements DB marshaling of this job.
func (su *StatsUpdate) MarshalAttributes(attrs common.Attributes) error {
	if err := su.QueueConfigurationType.MarshalAttributes(attrs); err != nil {
		return err
	}
	attrs.Set("name", su.Name)
	return nil
}

// UnmarshalAttributes implements DB unmarshaling this job.
func (su *StatsUpdate) UnmarshalAttributes(attrs common.Attributes) error {
	if err := su.QueueConfigurationType.UnmarshalAttributes(attrs); err != nil {
		return err
	}
	name, found := attrs.Get("name")
	if !found {
		return errors.New("missing 'name' attribute")
	}
	su.Name = name
	return nil
}

const loadUpdateStatsScriptSQL = `SELECT script FROM sys_admin.stats_updates WHERE name = $1`

// Do executes the actual report generation.
func (su *StatsUpdate) Do(
	ctx context.Context,
	_ int64,
	conn *sql.Conn,
	feedback Feedback,
) (any, error) {

	start := time.Now()

	feedback.Info("Running stats update %s.", su.Name)

	tx, err := conn.BeginTx(ctx, nil)
	if err != nil {
		return nil, err
	}
	defer tx.Rollback()

	var script string
	switch err := tx.QueryRowContext(ctx, loadUpdateStatsScriptSQL, su.Name).Scan(&script); {
	case err == sql.ErrNoRows:
		return nil, fmt.Errorf("no update script found for '%s'", su.Name)
	case err != nil:
		return nil, err
	}

	if _, err := tx.ExecContext(ctx, script); err != nil {
		return nil, err
	}

	if err := tx.Commit(); err != nil {
		return nil, err
	}

	feedback.Info("Running stats update took %v.", time.Since(start))

	return nil, nil
}