Mercurial > gemma
view pkg/controllers/importqueue.go @ 1194:7db850de0952
Added a signer who makes the final decison on an import.
Generate a log entry when the final decison is made.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Sun, 18 Nov 2018 18:29:25 +0100 |
parents | 58acc343b1b6 |
children | 486d66a9565c |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package controllers import ( "context" "database/sql" "fmt" "log" "net/http" "strconv" "strings" "gemma.intevation.de/gemma/pkg/auth" "gemma.intevation.de/gemma/pkg/imports" "gemma.intevation.de/gemma/pkg/models" "github.com/gorilla/mux" "github.com/jackc/pgx/pgtype" ) const ( selectImportsSQL = ` SELECT id, state::varchar, enqueued, kind, username, signer FROM waterway.imports ` selectHasImportSQL = ` SELECT true FROM Waterway.imports WHERE id = $1` selectHasNoRunningImportSQL = ` SELECT true FROM Waterway.imports WHERE id = $1 AND state <> 'running'::waterway.import_state` selectImportLogsSQL = ` SELECT time, kind::varchar, msg FROM waterway.import_logs WHERE import_id = $1 ORDER BY time` deleteLogsSQL = ` DELETE FROM waterway.import_logs WHERE import_id = $1` deleteImportSQL = ` DELETE FROM waterway.imports WHERE id = $1` ) func toTextArray(txt string, allowed []string) *pgtype.TextArray { parts := strings.Split(txt, ",") var accepted []string for _, part := range parts { if part = strings.ToLower(strings.TrimSpace(part)); len(part) == 0 { continue } for _, a := range allowed { if part == a { accepted = append(accepted, part) break } } } if len(accepted) == 0 { return nil } var ta pgtype.TextArray if err := ta.Set(accepted); err != nil { log.Printf("warn: %v\n", err) return nil } return &ta } func queryImportListStmt( conn *sql.Conn, ctx context.Context, vars map[string]string, ) (*sql.Rows, error) { var ( stmt strings.Builder args []interface{} states *pgtype.TextArray kinds *pgtype.TextArray ) if st, found := vars["states"]; found { states = toTextArray(st, imports.ImportStateNames) } if ks, found := vars["kinds"]; found { kinds = toTextArray(ks, imports.ImportKindNames) } stmt.WriteString(selectImportsSQL) if states != nil || kinds != nil { stmt.WriteString(" WHERE ") } if states != nil { fmt.Fprintf(&stmt, " states = ANY($%d) ", len(args)+1) args = append(args, states) } if states != nil && kinds != nil { stmt.WriteString("AND") } if kinds != nil { fmt.Fprintf(&stmt, " kind = ANY($%d) ", len(args)+1) args = append(args, kinds) } stmt.WriteString(" ORDER BY enqueued DESC ") if lim, found := vars["limit"]; found { fmt.Fprintf(&stmt, " LIMIT $%d ", len(args)+1) limit, _ := strconv.ParseInt(lim, 10, 64) args = append(args, limit) } if ofs, found := vars["offset"]; found { fmt.Fprintf(&stmt, " OFFSET $%d ", len(args)+1) offset, _ := strconv.ParseInt(ofs, 10, 64) args = append(args, offset) } return conn.QueryContext(ctx, stmt.String(), args...) } func listImports( _ interface{}, req *http.Request, conn *sql.Conn, ) (jr JSONResult, err error) { var rows *sql.Rows rows, err = queryImportListStmt(conn, req.Context(), mux.Vars(req)) if err != nil { return } defer rows.Close() imports := make([]*models.Import, 0, 20) for rows.Next() { var it models.Import if err = rows.Scan( &it.ID, &it.State, &it.Enqueued, &it.Kind, &it.User, &it.Signer, ); err != nil { return } imports = append(imports, &it) } if err = rows.Err(); err != nil { return } jr = JSONResult{ Result: struct { Imports []*models.Import `json:"imports"` }{ Imports: imports, }, } return } func importLogs( _ interface{}, req *http.Request, conn *sql.Conn, ) (jr JSONResult, err error) { ctx := req.Context() id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64) // Check if he have such a import job first. var dummy bool err = conn.QueryRowContext(ctx, selectHasImportSQL, id).Scan(&dummy) switch { case err == sql.ErrNoRows: err = JSONError{ Code: http.StatusNotFound, Message: fmt.Sprintf("Cannot find import #%d.", id), } return case err != nil: return } // We have it -> generate log entries. var rows *sql.Rows rows, err = conn.QueryContext(ctx, selectImportLogsSQL, id) if err != nil { return } defer rows.Close() entries := make([]*models.ImportLogEntry, 0, 10) for rows.Next() { var entry models.ImportLogEntry if err = rows.Scan(&entry.Time, &entry.Kind, &entry.Message); err != nil { return } entries = append(entries, &entry) } if err = rows.Err(); err != nil { return } jr = JSONResult{ Result: struct { Entries []*models.ImportLogEntry `json:"entries"` }{ Entries: entries, }, } return } func deleteImport( _ interface{}, req *http.Request, conn *sql.Conn, ) (jr JSONResult, err error) { ctx := req.Context() id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64) var tx *sql.Tx tx, err = conn.BeginTx(ctx, nil) if err != nil { return } defer tx.Rollback() // Check if he have such a import job first. var dummy bool err = tx.QueryRowContext(ctx, selectHasNoRunningImportSQL, id).Scan(&dummy) switch { case err == sql.ErrNoRows: err = JSONError{ Code: http.StatusNotFound, Message: fmt.Sprintf("Cannot find import #%d.", id), } return case err != nil: return } if _, err = tx.ExecContext(ctx, deleteLogsSQL, id); err != nil { return } if _, err = tx.ExecContext(ctx, deleteImportSQL, id); err != nil { return } if err = tx.Commit(); err != nil { return } jr = JSONResult{Code: http.StatusNoContent} return } const ( isPendingSQL = ` SELECT state = 'pending'::waterway.import_state, kind WHERE id = $1` reviewSQL = ` UPDATE waterway.imports SET state = $1::waterway.import_state, signer = $2 WHERE id = $3` deleteImportDataSQL = `SELECT waterway.del_import($1)` deleteImportTrackSQL = ` DELETE FROM waterway.track_imports WHERE import_id = $1` logDecisionSQL = ` INSERT INTO waterway.import_logs (import_id, msg) VALUES ($1, $2)` ) func reviewImport( _ interface{}, req *http.Request, conn *sql.Conn, ) (jr JSONResult, err error) { vars := mux.Vars(req) id, _ := strconv.ParseInt(vars["id"], 10, 64) state := vars["state"] ctx := req.Context() var tx *sql.Tx if tx, err = conn.BeginTx(ctx, nil); err != nil { return } defer tx.Rollback() var pending bool var kind string err = tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind) switch { case err == sql.ErrNoRows: err = JSONError{ Code: http.StatusNotFound, Message: fmt.Sprintf("Cannot find import #%d.", id), } return case err != nil: return case !pending: err = JSONError{ Code: http.StatusBadRequest, Message: fmt.Sprintf("Import %d is not pending.", id), } return } if state == "accepted" { if jc := imports.FindJobCreator(imports.JobKind(kind)); jc != nil { if err = jc.StageDone(tx, ctx, id); err != nil { return } } if _, err = tx.ExecContext(ctx, deleteImportTrackSQL, id); err != nil { return } } else { if _, err = tx.ExecContext(ctx, deleteImportDataSQL, id); err != nil { return } } // Log the decision and set the final state. session, _ := auth.GetSession(req) who := session.User if _, err = tx.ExecContext(ctx, logDecisionSQL, id, fmt.Sprintf("User '%s' %s import %d.", who, state, id)); err != nil { return } if _, err = tx.ExecContext(ctx, reviewSQL, state, who, id); err != nil { return } if err = tx.Commit(); err != nil { return } return }