Mercurial > gemma
view pkg/controllers/importconfig.go @ 1708:49e047c2106e
Imports: Made imports re-runnable if they fail.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Tue, 08 Jan 2019 13:35:44 +0100 |
parents | 897d4d8316ad |
children | 3adb834ed55e |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package controllers import ( "context" "database/sql" "errors" "fmt" "net/http" "sort" "strconv" "github.com/gorilla/mux" "gemma.intevation.de/gemma/pkg/auth" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/imports" "gemma.intevation.de/gemma/pkg/scheduler" ) const ( selectImportConfigurationPrefix = ` SELECT id, username, kind, send_email, auto_accept, cron, url FROM waterway.import_configuration` selectImportConfigurationSQL = selectImportConfigurationPrefix + ` ORDER by id` selectImportConfigurationIDSQL = selectImportConfigurationPrefix + ` WHERE id = $1` insertImportConfigurationSQL = ` INSERT INTO waterway.import_configuration (username, kind, cron, send_email, auto_accept, url) VALUES ($1, $2, $3, $4, $5, $6) RETURNING id` insertImportConfigurationAttributeSQL = ` INSERT INTO waterway.import_configuration_attributes (import_configuration_id, k, v) VALUES ($1, $2, $3)` hasImportConfigurationSQL = ` SELECT true FROM waterway.import_configuration WHERE id = $1` deleteImportConfiguationAttributesSQL = ` DELETE FROM waterway.import_configuration_attributes WHERE id = $1` deleteImportConfiguationSQL = ` DELETE FROM waterway.import_configuration WHERE id = $1` updateImportConfigurationSQL = ` UPDATE waterway.import_configuration SET username = $2 kind = $3, cron = $4, url = $5, send_email = $6, auto_accept = $7 WHERE id = $1 ` ) func modifyImportConfig( input interface{}, req *http.Request, conn *sql.Conn, ) (jr JSONResult, err error) { ctx := req.Context() importConfig := input.(*imports.Config) id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64) var tx *sql.Tx if tx, err = conn.BeginTx(ctx, nil); err != nil { return } defer tx.Rollback() var ( entry imports.IDConfig kind string cron sql.NullString url sql.NullString ) err = conn.QueryRowContext(ctx, selectImportConfigurationIDSQL, id).Scan( &entry.ID, &entry.User, &kind, &entry.SendEMail, &entry.AutoAccept, &cron, &url, ) switch { case err == sql.ErrNoRows: err = JSONError{ Code: http.StatusNotFound, Message: fmt.Sprintf("No schedule %d found", id), } return case err != nil: return } session, _ := auth.GetSession(req) entry.SendEMail = importConfig.SendEMail entry.AutoAccept = importConfig.AutoAccept if importConfig.Cron != nil { cron = sql.NullString{String: string(*importConfig.Cron), Valid: true} } if importConfig.URL != nil { url = sql.NullString{String: *importConfig.URL, Valid: true} } if _, err = tx.ExecContext(ctx, updateImportConfigurationSQL, id, session.User, string(importConfig.Kind), cron, url, importConfig.SendEMail, importConfig.AutoAccept, ); err != nil { return } if importConfig.Attributes != nil { if _, err = tx.ExecContext(ctx, deleteImportConfiguationAttributesSQL, id); err != nil { return } if err = storeConfigAttributes(ctx, tx, id, importConfig.Attributes); err != nil { return } } scheduler.UnbindByID(id) if cron.Valid { if err = scheduler.BindAction( string(importConfig.Kind), cron.String, id, ); err != nil { return } } if err = tx.Commit(); err != nil { return } var result = struct { ID int64 `json:"id"` }{ ID: id, } jr = JSONResult{Result: &result} return } func infoImportConfig( _ interface{}, req *http.Request, conn *sql.Conn, ) (jr JSONResult, err error) { ctx := req.Context() id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64) var ( entry imports.IDConfig kind string cron sql.NullString url sql.NullString ) err = conn.QueryRowContext(ctx, selectImportConfigurationIDSQL, id).Scan( &entry.ID, &entry.User, &kind, &entry.SendEMail, &entry.AutoAccept, &cron, &url, ) switch { case err == sql.ErrNoRows: err = JSONError{ Code: http.StatusNotFound, Message: fmt.Sprintf("No schedule %d found", id), } return case err != nil: return } entry.Kind = imports.ImportKind(kind) if cron.Valid { cs := imports.CronSpec(cron.String) entry.Cron = &cs } if url.Valid { entry.URL = &url.String } jr = JSONResult{Result: &entry} return } func deleteImportConfig( _ interface{}, req *http.Request, conn *sql.Conn, ) (jr JSONResult, err error) { ctx := req.Context() id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64) var tx *sql.Tx if tx, err = conn.BeginTx(ctx, nil); err != nil { return } defer tx.Rollback() var found bool err = tx.QueryRowContext(ctx, hasImportConfigurationSQL, id).Scan(&found) switch { case err == sql.ErrNoRows: err = JSONError{ Code: http.StatusNotFound, Message: fmt.Sprintf("No schedule %d found", id), } return case err != nil: return case !found: err = errors.New("Unexpected result") return } if _, err = tx.ExecContext(ctx, deleteImportConfiguationAttributesSQL, id); err != nil { return } if _, err = tx.ExecContext(ctx, deleteImportConfiguationSQL, id); err != nil { return } // Remove from running scheduler. scheduler.UnbindByID(id) if err = tx.Commit(); err != nil { return } var result = struct { ID int64 `json:"id"` }{ ID: id, } jr = JSONResult{Result: &result} return } func storeConfigAttributes( ctx context.Context, tx *sql.Tx, id int64, attrs common.Attributes, ) error { if len(attrs) == 0 { return nil } attrStmt, err := tx.PrepareContext(ctx, insertImportConfigurationAttributeSQL) if err != nil { return err } defer attrStmt.Close() // Sort to make it deterministic keys := make([]string, len(attrs)) i := 0 for key := range attrs { keys[i] = key i++ } sort.Strings(keys) for _, key := range keys { if _, err := attrStmt.ExecContext(ctx, id, key, attrs[key]); err != nil { return err } } return nil } func addImportConfig( input interface{}, req *http.Request, conn *sql.Conn, ) (jr JSONResult, err error) { importConfig := input.(*imports.Config) session, _ := auth.GetSession(req) var cron, url sql.NullString if importConfig.Cron != nil { cron = sql.NullString{String: string(*importConfig.Cron), Valid: true} } if importConfig.URL != nil { url = sql.NullString{String: *importConfig.URL, Valid: true} } ctx := req.Context() var tx *sql.Tx if tx, err = conn.BeginTx(ctx, nil); err != nil { return } defer tx.Rollback() var id int64 if err = tx.QueryRowContext( ctx, insertImportConfigurationSQL, session.User, string(importConfig.Kind), cron, importConfig.SendEMail, importConfig.AutoAccept, url, ).Scan(&id); err != nil { return } // Store extra attributes if err = storeConfigAttributes(ctx, tx, id, importConfig.Attributes); err != nil { return } // Need to start a scheduler job right away? if importConfig.Cron != nil { if err = scheduler.BindAction( string(importConfig.Kind), string(*importConfig.Cron), id, ); err != nil { return } } if err = tx.Commit(); err != nil { scheduler.UnbindByID(id) } var result = struct { ID int64 `json:"id"` }{ ID: id, } jr = JSONResult{ Code: http.StatusCreated, Result: &result, } return } func listImportConfigs( _ interface{}, req *http.Request, conn *sql.Conn, ) (jr JSONResult, err error) { ctx := req.Context() var rows *sql.Rows if rows, err = conn.QueryContext(ctx, selectImportConfigurationSQL); err != nil { return } defer rows.Close() list := []*imports.IDConfig{} for rows.Next() { var ( entry imports.IDConfig kind string cron sql.NullString url sql.NullString ) if err = rows.Scan( &entry.ID, &entry.User, &kind, &entry.SendEMail, &entry.AutoAccept, &cron, &url, ); err != nil { return } entry.Kind = imports.ImportKind(kind) if cron.Valid { cs := imports.CronSpec(cron.String) entry.Cron = &cs } if url.Valid { entry.URL = &url.String } list = append(list, &entry) } if err = rows.Err(); err != nil { return } jr = JSONResult{Result: list} return }