Mercurial > gemma
view pkg/controllers/importqueue.go @ 2549:9bf6b767a56a
client: refactored and improved splitscreen for diagrams
To make different diagrams possible, the splitscreen view needed to be decoupled from the cross profiles.
Also the style has changed to make it more consistent with the rest of the app. The standard box header
is now used and there are collapse and expand animations.
author | Markus Kottlaender <markus@intevation.de> |
---|---|
date | Fri, 08 Mar 2019 08:50:47 +0100 |
parents | 20d9b71f4125 |
children | 3a242e6aa56d |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package controllers import ( "database/sql" "encoding/json" "fmt" "log" "net/http" "strconv" "strings" "github.com/gorilla/mux" "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/auth" "gemma.intevation.de/gemma/pkg/imports" "gemma.intevation.de/gemma/pkg/models" ) const ( selectImportsSQL = ` SELECT id, state::varchar, enqueued, kind, username, signer, summary, EXISTS ( SELECT true FROM import.import_logs WHERE kind = 'warn'::log_type AND import_id = import.imports.id ) AS has_warnings FROM import.imports ` selectHasImportSQL = ` SELECT true FROM import.imports WHERE id = $1` selectHasNoRunningImportSQL = ` SELECT true FROM import.imports WHERE id = $1 AND state <> 'running'::import_state` selectImportLogsSQL = ` SELECT time, kind::varchar, msg FROM import.import_logs WHERE import_id = $1 ORDER BY time` deleteLogsSQL = ` DELETE FROM import.import_logs WHERE import_id = $1` deleteImportSQL = ` DELETE FROM import.imports WHERE id = $1` ) func toInt8Array(txt string) *pgtype.Int8Array { parts := strings.Split(txt, ",") var ints []int64 for _, part := range parts { part = strings.TrimSpace(part) v, err := strconv.ParseInt(part, 10, 64) if err != nil { continue } ints = append(ints, v) } var ia pgtype.Int8Array if err := ia.Set(ints); err != nil { log.Printf("warn: %v\n", err) return nil } return &ia } func toTextArray(txt string, allowed []string) *pgtype.TextArray { parts := strings.Split(txt, ",") var accepted []string for _, part := range parts { if part = strings.ToLower(strings.TrimSpace(part)); len(part) == 0 { continue } for _, a := range allowed { if part == a { accepted = append(accepted, part) break } } } if len(accepted) == 0 { return nil } var ta pgtype.TextArray if err := ta.Set(accepted); err != nil { log.Printf("warn: %v\n", err) return nil } return &ta } func queryImportListStmt( conn *sql.Conn, req *http.Request, ) (*sql.Rows, error) { var ( stmt strings.Builder args []interface{} states *pgtype.TextArray kinds *pgtype.TextArray ids *pgtype.Int8Array ) arg := func(format string, v interface{}) { fmt.Fprintf(&stmt, format, len(args)+1) args = append(args, v) } if st := req.FormValue("states"); st != "" { states = toTextArray(st, imports.ImportStateNames) } if ks := req.FormValue("kinds"); ks != "" { kinds = toTextArray(ks, imports.ImportKindNames()) } if idss := req.FormValue("ids"); idss != "" { ids = toInt8Array(idss) } stmt.WriteString(selectImportsSQL) if states != nil || kinds != nil || ids != nil { stmt.WriteString(" WHERE ") } if states != nil { arg(" state = ANY($%d) ", states) } if states != nil && (kinds != nil || ids != nil) { stmt.WriteString("AND") } if kinds != nil { arg(" kind = ANY($%d) ", kinds) } if (states != nil || kinds != nil) && ids != nil { stmt.WriteString("AND") } if ids != nil { arg(" id = ANY($%d) ", ids) } stmt.WriteString(" ORDER BY enqueued DESC ") if lim := req.FormValue("limit"); lim != "" { limit, err := strconv.ParseInt(lim, 10, 64) if err != nil { return nil, err } arg(" LIMIT $%d ", limit) } if ofs := req.FormValue("offset"); ofs != "" { offset, err := strconv.ParseInt(ofs, 10, 64) if err != nil { return nil, err } arg(" OFFSET $%d ", offset) } return conn.QueryContext(req.Context(), stmt.String(), args...) } func listImports( _ interface{}, req *http.Request, conn *sql.Conn, ) (jr JSONResult, err error) { var rows *sql.Rows rows, err = queryImportListStmt(conn, req) if err != nil { return } defer rows.Close() imports := make([]*models.Import, 0, 20) var signer, summary sql.NullString for rows.Next() { var it models.Import if err = rows.Scan( &it.ID, &it.State, &it.Enqueued, &it.Kind, &it.User, &signer, &summary, &it.Warnings, ); err != nil { return } if signer.Valid { it.Signer = signer.String } if summary.Valid { if err = json.NewDecoder( strings.NewReader(summary.String)).Decode(&it.Summary); err != nil { return } } imports = append(imports, &it) } if err = rows.Err(); err != nil { return } jr = JSONResult{ Result: struct { Imports []*models.Import `json:"imports"` }{ Imports: imports, }, } return } func importLogs( _ interface{}, req *http.Request, conn *sql.Conn, ) (jr JSONResult, err error) { ctx := req.Context() id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64) // Check if he have such a import job first. var dummy bool err = conn.QueryRowContext(ctx, selectHasImportSQL, id).Scan(&dummy) switch { case err == sql.ErrNoRows: err = JSONError{ Code: http.StatusNotFound, Message: fmt.Sprintf("Cannot find import #%d.", id), } return case err != nil: return } // We have it -> generate log entries. var rows *sql.Rows rows, err = conn.QueryContext(ctx, selectImportLogsSQL, id) if err != nil { return } defer rows.Close() entries := make([]*models.ImportLogEntry, 0, 10) for rows.Next() { var entry models.ImportLogEntry if err = rows.Scan(&entry.Time, &entry.Kind, &entry.Message); err != nil { return } entries = append(entries, &entry) } if err = rows.Err(); err != nil { return } jr = JSONResult{ Result: struct { Entries []*models.ImportLogEntry `json:"entries"` }{ Entries: entries, }, } return } func deleteImport( _ interface{}, req *http.Request, conn *sql.Conn, ) (jr JSONResult, err error) { ctx := req.Context() id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64) var tx *sql.Tx tx, err = conn.BeginTx(ctx, nil) if err != nil { return } defer tx.Rollback() // Check if he have such a import job first. var dummy bool err = tx.QueryRowContext(ctx, selectHasNoRunningImportSQL, id).Scan(&dummy) switch { case err == sql.ErrNoRows: err = JSONError{ Code: http.StatusNotFound, Message: fmt.Sprintf("Cannot find import #%d.", id), } return case err != nil: return } if _, err = tx.ExecContext(ctx, deleteLogsSQL, id); err != nil { return } if _, err = tx.ExecContext(ctx, deleteImportSQL, id); err != nil { return } if err = tx.Commit(); err != nil { return } jr = JSONResult{Code: http.StatusNoContent} return } const ( isPendingSQL = ` SELECT state = 'pending'::import_state, kind FROM import.imports WHERE id = $1` reviewSQL = ` UPDATE import.imports SET state = $1::import_state, signer = $2 WHERE id = $3` deleteImportDataSQL = `SELECT import.del_import($1)` deleteImportTrackSQL = ` DELETE FROM import.track_imports WHERE import_id = $1` logDecisionSQL = ` INSERT INTO import.import_logs (import_id, msg) VALUES ($1, $2)` ) func reviewImports( reviews interface{}, req *http.Request, conn *sql.Conn, ) (JSONResult, error) { rs := *reviews.(*[]models.Review) type reviewResult struct { ID int64 `json:"id"` Message string `json:"message,omitempty"` Error string `json:"error,omitempty"` } results := make([]reviewResult, len(rs)) for i := range rs { rev := &rs[i] msg, err := decideImport(req, conn, rev.ID, string(rev.State)) var errString string if err != nil { errString = err.Error() } results[i] = reviewResult{ ID: rev.ID, Message: msg, Error: errString, } } return JSONResult{Result: results}, nil } func reviewImport( _ interface{}, req *http.Request, conn *sql.Conn, ) (jr JSONResult, err error) { vars := mux.Vars(req) id, _ := strconv.ParseInt(vars["id"], 10, 64) state := vars["state"] var msg string if msg, err = decideImport(req, conn, id, state); err != nil { return } result := struct { Message string `json:"message"` }{ Message: msg, } jr = JSONResult{Result: &result} return } func decideImport( req *http.Request, conn *sql.Conn, id int64, state string, ) (message string, err error) { ctx := req.Context() var tx *sql.Tx if tx, err = conn.BeginTx(ctx, nil); err != nil { return } defer tx.Rollback() var pending bool var kind string err = tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind) switch { case err == sql.ErrNoRows: err = fmt.Errorf("cannot find import #%d", id) return case err != nil: return case !pending: err = fmt.Errorf("import %d is not pending", id) return } if state == "accepted" { if jc := imports.FindJobCreator(imports.JobKind(kind)); jc != nil { if err = jc.StageDone(ctx, tx, id); err != nil { return } } } else { if _, err = tx.ExecContext(ctx, deleteImportDataSQL, id); err != nil { return } } // Remove the import track if _, err = tx.ExecContext(ctx, deleteImportTrackSQL, id); err != nil { return } // Log the decision and set the final state. session, _ := auth.GetSession(req) who := session.User if _, err = tx.ExecContext(ctx, logDecisionSQL, id, fmt.Sprintf("User '%s' %s import %d.", who, state, id)); err != nil { return } if _, err = tx.ExecContext(ctx, reviewSQL, state, who, id); err != nil { return } if err = tx.Commit(); err != nil { return } message = fmt.Sprintf( "Import #%d successfully changed to state '%s'.", id, state) return }