Mercurial > gemma
view pkg/imports/statsupdate.go @ 5718:3d497077f888 uploadwg
Implemented direct file upload as alternative import method for WG.
For testing and data corrections it is useful to be able to import
waterway gauges data directly by uploading a xml file.
author | Sascha Wilde <wilde@sha-bang.de> |
---|---|
date | Thu, 18 Apr 2024 19:23:19 +0200 |
parents | 6270951dda28 |
children |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2021 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package imports import ( "context" "database/sql" "errors" "fmt" "time" "gemma.intevation.de/gemma/pkg/auth" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/models" ) // StatsUpdate is job for status updates. type StatsUpdate struct { models.QueueConfigurationType Name string `json:"name"` } // StatsUpdateJobKind is the unique name of this import job type. const StatsUpdateJobKind JobKind = "statsupdate" type statsUpdateJobCreator struct{} func init() { RegisterJobCreator(StatsUpdateJobKind, statsUpdateJobCreator{}) } func (statsUpdateJobCreator) Description() string { return "statsupdate" } func (statsUpdateJobCreator) AutoAccept() bool { return true } func (statsUpdateJobCreator) Create() Job { return new(StatsUpdate) } func (statsUpdateJobCreator) Depends() [2][]string { return [2][]string{{}, {}} } func (statsUpdateJobCreator) StageDone(context.Context, *sql.Tx, int64, Feedback) error { return nil } // RequiresRoles enforces to be a sys_admin to run this . func (*StatsUpdate) RequiresRoles() auth.Roles { return auth.Roles{"sys_admin"} } // Description gives a short info about relevant facts of this import. func (su *StatsUpdate) Description([]string) (string, error) { return su.Name, nil } // CleanUp is an empty implementation. func (*StatsUpdate) CleanUp() error { return nil } // MarshalAttributes implements DB marshaling of this job. func (su *StatsUpdate) MarshalAttributes(attrs common.Attributes) error { if err := su.QueueConfigurationType.MarshalAttributes(attrs); err != nil { return err } attrs.Set("name", su.Name) return nil } // UnmarshalAttributes implements DB unmarshaling this job. func (su *StatsUpdate) UnmarshalAttributes(attrs common.Attributes) error { if err := su.QueueConfigurationType.UnmarshalAttributes(attrs); err != nil { return err } name, found := attrs.Get("name") if !found { return errors.New("missing 'name' attribute") } su.Name = name return nil } const loadUpdateStatsScriptSQL = `SELECT script FROM sys_admin.stats_updates WHERE name = $1` // Do executes the actual report generation. func (su *StatsUpdate) Do( ctx context.Context, _ int64, conn *sql.Conn, feedback Feedback, ) (any, error) { start := time.Now() feedback.Info("Running stats update %s.", su.Name) tx, err := conn.BeginTx(ctx, nil) if err != nil { return nil, err } defer tx.Rollback() var script string switch err := tx.QueryRowContext(ctx, loadUpdateStatsScriptSQL, su.Name).Scan(&script); { case err == sql.ErrNoRows: return nil, fmt.Errorf("no update script found for '%s'", su.Name) case err != nil: return nil, err } if _, err := tx.ExecContext(ctx, script); err != nil { return nil, err } if err := tx.Commit(); err != nil { return nil, err } feedback.Info("Running stats update took %v.", time.Since(start)) return nil, nil }