Mercurial > gemma
view pkg/controllers/importqueue.go @ 2670:d7b1dd25f91f import-overview-rework
Made filter building a bit more reusable.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Thu, 14 Mar 2019 16:20:16 +0100 |
parents | 0fcf80a413a2 |
children | 8f3facf902dd |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package controllers import ( "context" "database/sql" "encoding/json" "fmt" "log" "net/http" "strconv" "strings" "time" "github.com/gorilla/mux" "github.com/jackc/pgx/pgtype" "gemma.intevation.de/gemma/pkg/auth" "gemma.intevation.de/gemma/pkg/imports" "gemma.intevation.de/gemma/pkg/models" ) const ( warningSQLPrefix = ` ` selectImportsSQL = ` WITH warned AS ( SELECT distinct(import_id) AS id FROM import.import_logs WHERE kind = 'warn'::log_type ) SELECT imports.id AS id, state::varchar, enqueued, kind, username, signer, summary IS NOT NULL AS has_summary, imports.id IN (SELECT id FROM warned) AS has_warnings FROM import.imports ` selectImportSummaySQL = ` SELECT summary FROM import.imports WHERE id = $1` selectHasNoRunningImportSQL = ` SELECT true FROM import.imports WHERE id = $1 AND state <> 'running'::import_state` selectImportLogsSQL = ` SELECT time, kind::varchar, msg FROM import.import_logs WHERE import_id = $1 ORDER BY time` deleteLogsSQL = ` DELETE FROM import.import_logs WHERE import_id = $1` deleteImportSQL = ` DELETE FROM import.imports WHERE id = $1` selectBeforeSQL = ` SELECT enqueued FROM import.imports WHERE enqueued < $1 ORDER BY enqueued LIMIT 1` selectAfterSQL = ` SELECT enqueued FROM import.imports WHERE enqueued > $1 ORDER BY enqueued LIMIT 1` ) func toInt8Array(txt string) *pgtype.Int8Array { parts := strings.Split(txt, ",") var ints []int64 for _, part := range parts { part = strings.TrimSpace(part) v, err := strconv.ParseInt(part, 10, 64) if err != nil { continue } ints = append(ints, v) } var ia pgtype.Int8Array if err := ia.Set(ints); err != nil { log.Printf("warn: %v\n", err) return nil } return &ia } func toTextArray(txt string, allowed []string) *pgtype.TextArray { parts := strings.Split(txt, ",") var accepted []string for _, part := range parts { if part = strings.ToLower(strings.TrimSpace(part)); len(part) == 0 { continue } for _, a := range allowed { if part == a { accepted = append(accepted, part) break } } } if len(accepted) == 0 { return nil } var ta pgtype.TextArray if err := ta.Set(accepted); err != nil { log.Printf("warn: %v\n", err) return nil } return &ta } type filterBuilder struct { stmt strings.Builder args []interface{} hasCond bool } func (fb *filterBuilder) arg(format string, v ...interface{}) { indices := make([]interface{}, len(v)) for i := range indices { indices[i] = len(fb.args) + i + 1 } fmt.Fprintf(&fb.stmt, format, indices...) fb.args = append(fb.args, v...) } func (fb *filterBuilder) cond(format string, v ...interface{}) { if fb.hasCond { fb.stmt.WriteString(" AND ") } else { fb.hasCond = true } fb.arg(format, v...) } func queryImportListStmt( conn *sql.Conn, req *http.Request, ) (*sql.Rows, error) { var fb filterBuilder fb.stmt.WriteString(selectImportsSQL) fb.stmt.WriteString(" WHERE ") if st := req.FormValue("states"); st != "" { states := toTextArray(st, imports.ImportStateNames) fb.cond(" state = ANY($%d) ", states) } if ks := req.FormValue("kinds"); ks != "" { kinds := toTextArray(ks, imports.ImportKindNames()) fb.cond(" kind = ANY($%d) ", kinds) } if idss := req.FormValue("ids"); idss != "" { ids := toInt8Array(idss) fb.cond(" id = ANY($%d) ", ids) } if from := req.FormValue("from"); from != "" { fromTime, err := time.Parse(models.ImportTimeFormat, from) if err != nil { return nil, err } fb.cond(" enqueued >= $%d ", fromTime) } if to := req.FormValue("to"); to != "" { toTime, err := time.Parse(models.ImportTimeFormat, to) if err != nil { return nil, err } fb.cond(" enqueued <= $%d ", toTime) } switch warn := strings.ToLower(req.FormValue("warnings")); warn { case "1", "t", "true": fb.cond(" id IN (SELECT id FROM warned) ") } if !fb.hasCond { fb.stmt.WriteString(" TRUE ") } fb.stmt.WriteString(" ORDER BY enqueued DESC ") return conn.QueryContext(req.Context(), fb.stmt.String(), fb.args...) } func enqueued(ctx context.Context, conn *sql.Conn, what, query string) *models.ImportTime { if what == "" { return nil } t, err := time.Parse(models.ImportTimeFormat, what) if err != nil { log.Printf("warn: %v\n", err) return nil } var when time.Time err = conn.QueryRowContext(ctx, query, t).Scan(&when) switch { case err == sql.ErrNoRows: return nil case err != nil: log.Printf("warn: %v\n", err) return nil } return &models.ImportTime{when} } func enqueuedBefore(conn *sql.Conn, req *http.Request) *models.ImportTime { return enqueued(req.Context(), conn, req.FormValue("from"), selectBeforeSQL) } func enqueuedAfter(conn *sql.Conn, req *http.Request) *models.ImportTime { return enqueued(req.Context(), conn, req.FormValue("to"), selectAfterSQL) } func listImports( _ interface{}, req *http.Request, conn *sql.Conn, ) (jr JSONResult, err error) { var rows *sql.Rows rows, err = queryImportListStmt(conn, req) if err != nil { return } defer rows.Close() imports := make([]*models.Import, 0, 20) var signer sql.NullString for rows.Next() { var it models.Import if err = rows.Scan( &it.ID, &it.State, &it.Enqueued, &it.Kind, &it.User, &signer, &it.Summary, &it.Warnings, ); err != nil { return } if signer.Valid { it.Signer = signer.String } imports = append(imports, &it) } if err = rows.Err(); err != nil { return } jr = JSONResult{ Result: struct { Prev *models.ImportTime `json:"prev,omitempty"` Next *models.ImportTime `json:"next,omitempty"` Imports []*models.Import `json:"imports"` }{ Imports: imports, Prev: enqueuedBefore(conn, req), Next: enqueuedAfter(conn, req), }, } return } func importLogs( _ interface{}, req *http.Request, conn *sql.Conn, ) (jr JSONResult, err error) { ctx := req.Context() id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64) // Check if he have such a import job first. var summary sql.NullString err = conn.QueryRowContext(ctx, selectImportSummaySQL, id).Scan(&summary) switch { case err == sql.ErrNoRows: err = JSONError{ Code: http.StatusNotFound, Message: fmt.Sprintf("Cannot find import #%d.", id), } return case err != nil: return } var sum interface{} if summary.Valid { if err = json.NewDecoder( strings.NewReader(summary.String)).Decode(&sum); err != nil { return } } // We have it -> generate log entries. var rows *sql.Rows rows, err = conn.QueryContext(ctx, selectImportLogsSQL, id) if err != nil { return } defer rows.Close() entries := make([]*models.ImportLogEntry, 0, 10) for rows.Next() { var entry models.ImportLogEntry if err = rows.Scan(&entry.Time, &entry.Kind, &entry.Message); err != nil { return } entries = append(entries, &entry) } if err = rows.Err(); err != nil { return } jr = JSONResult{ Result: struct { Summary interface{} `json:"summary,omitempty"` Entries []*models.ImportLogEntry `json:"entries"` }{ Summary: sum, Entries: entries, }, } return } func deleteImport( _ interface{}, req *http.Request, conn *sql.Conn, ) (jr JSONResult, err error) { ctx := req.Context() id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64) var tx *sql.Tx tx, err = conn.BeginTx(ctx, nil) if err != nil { return } defer tx.Rollback() // Check if he have such a import job first. var dummy bool err = tx.QueryRowContext(ctx, selectHasNoRunningImportSQL, id).Scan(&dummy) switch { case err == sql.ErrNoRows: err = JSONError{ Code: http.StatusNotFound, Message: fmt.Sprintf("Cannot find import #%d.", id), } return case err != nil: return } if _, err = tx.ExecContext(ctx, deleteLogsSQL, id); err != nil { return } if _, err = tx.ExecContext(ctx, deleteImportSQL, id); err != nil { return } if err = tx.Commit(); err != nil { return } jr = JSONResult{Code: http.StatusNoContent} return } const ( isPendingSQL = ` SELECT state = 'pending'::import_state, kind FROM import.imports WHERE id = $1` reviewSQL = ` UPDATE import.imports SET state = $1::import_state, signer = $2 WHERE id = $3` deleteImportDataSQL = `SELECT import.del_import($1)` deleteImportTrackSQL = ` DELETE FROM import.track_imports WHERE import_id = $1` logDecisionSQL = ` INSERT INTO import.import_logs (import_id, msg) VALUES ($1, $2)` ) func reviewImports( reviews interface{}, req *http.Request, conn *sql.Conn, ) (JSONResult, error) { rs := *reviews.(*[]models.Review) type reviewResult struct { ID int64 `json:"id"` Message string `json:"message,omitempty"` Error string `json:"error,omitempty"` } results := make([]reviewResult, len(rs)) for i := range rs { rev := &rs[i] msg, err := decideImport(req, conn, rev.ID, string(rev.State)) var errString string if err != nil { errString = err.Error() } results[i] = reviewResult{ ID: rev.ID, Message: msg, Error: errString, } } return JSONResult{Result: results}, nil } func reviewImport( _ interface{}, req *http.Request, conn *sql.Conn, ) (jr JSONResult, err error) { vars := mux.Vars(req) id, _ := strconv.ParseInt(vars["id"], 10, 64) state := vars["state"] var msg string if msg, err = decideImport(req, conn, id, state); err != nil { return } result := struct { Message string `json:"message"` }{ Message: msg, } jr = JSONResult{Result: &result} return } func decideImport( req *http.Request, conn *sql.Conn, id int64, state string, ) (message string, err error) { ctx := req.Context() var tx *sql.Tx if tx, err = conn.BeginTx(ctx, nil); err != nil { return } defer tx.Rollback() var pending bool var kind string err = tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind) switch { case err == sql.ErrNoRows: err = fmt.Errorf("cannot find import #%d", id) return case err != nil: return case !pending: err = fmt.Errorf("import %d is not pending", id) return } if state == "accepted" { if jc := imports.FindJobCreator(imports.JobKind(kind)); jc != nil { if err = jc.StageDone(ctx, tx, id); err != nil { return } } } else { if _, err = tx.ExecContext(ctx, deleteImportDataSQL, id); err != nil { return } } // Remove the import track if _, err = tx.ExecContext(ctx, deleteImportTrackSQL, id); err != nil { return } // Log the decision and set the final state. session, _ := auth.GetSession(req) who := session.User if _, err = tx.ExecContext(ctx, logDecisionSQL, id, fmt.Sprintf("User '%s' %s import %d.", who, state, id)); err != nil { return } if _, err = tx.ExecContext(ctx, reviewSQL, state, who, id); err != nil { return } if err = tx.Commit(); err != nil { return } message = fmt.Sprintf( "Import #%d successfully changed to state '%s'.", id, state) return }