Mercurial > gemma
view pkg/controllers/importqueue.go @ 4848:8584197232ec
Handle unknown units gracefully
author | Tom Gottfried <tom@intevation.de> |
---|---|
date | Mon, 18 Nov 2019 16:38:28 +0100 |
parents | 1fef4679b07a |
children | d727641911a5 |
line wrap: on
line source
// This is Free Software under GNU Affero General Public License v >= 3.0 // without warranty, see README.md and license for details. // // SPDX-License-Identifier: AGPL-3.0-or-later // License-Filename: LICENSES/AGPL-3.0.txt // // Copyright (C) 2018 by via donau // – Österreichische Wasserstraßen-Gesellschaft mbH // Software engineering by Intevation GmbH // // Author(s): // * Sascha L. Teichmann <sascha.teichmann@intevation.de> package controllers import ( "context" "database/sql" "encoding/csv" "encoding/json" "fmt" "log" "net/http" "strconv" "strings" "time" "github.com/gorilla/mux" "gemma.intevation.de/gemma/pkg/auth" "gemma.intevation.de/gemma/pkg/common" "gemma.intevation.de/gemma/pkg/imports" "gemma.intevation.de/gemma/pkg/models" mw "gemma.intevation.de/gemma/pkg/middleware" ) const ( selectImportsCountSQL = ` SELECT count(*) FROM import.imports WHERE ` selectImportsSQL = ` SELECT imports.id AS id, state::varchar, enqueued, changed, kind, username, signer, summary IS NOT NULL AS has_summary, EXISTS(SELECT 1 FROM import.import_logs WHERE kind = 'error'::log_type and import_id = imports.id) AS has_errors, EXISTS(SELECT 1 FROM import.import_logs WHERE kind = 'warn'::log_type and import_id = imports.id) AS has_warnings FROM import.imports WHERE ` selectExportSQL = ` SELECT imports.id AS id, state::varchar, enqueued, changed, kind, username, (SELECT country FROM users.list_users lu WHERE lu.username = import.imports.username) AS country, signer, EXISTS(SELECT 1 FROM import.import_logs WHERE kind = 'warn'::log_type and import_id = imports.id) AS has_warnings, data FROM import.imports WHERE ` selectEnqueuedSQL = ` SELECT enqueued FROM import.imports WHERE ` selectImportSummarySQL = ` SELECT summary, enqueued FROM import.imports WHERE id = $1` selectHasNoRunningImportSQL = ` SELECT true FROM import.imports WHERE id = $1 AND state <> 'running'::import_state` selectImportLogsSQL = ` SELECT time, kind::varchar, msg FROM import.import_logs WHERE import_id = $1 ORDER BY time` deleteLogsSQL = ` DELETE FROM import.import_logs WHERE import_id = $1` deleteImportSQL = ` DELETE FROM import.imports WHERE id = $1` ) type filledStmt struct { stmt strings.Builder args []interface{} } func buildFilters(projection string, req *http.Request) (*filledStmt, *filledStmt, *filledStmt, error) { var l, a, b filterAnd var noBefore, noAfter bool cond := func(format string, args ...interface{}) { term := &filterTerm{format: format, args: args} l = append(l, term) a = append(a, term) b = append(b, term) } if query := req.FormValue("query"); query != "" { query = "%" + query + "%" cond(` (kind ILIKE $%d OR username ILIKE $%d OR signer ILIKE $%d OR `+ `id IN (SELECT import_id FROM import.import_logs WHERE msg ILIKE $%d)) `, query, query, query, query) } if cc := req.FormValue("cc"); cc != "" { codes := sliceToTextArray(splitUpper(cc)) cond(" username IN "+ "(SELECT username FROM internal.user_profiles "+ "WHERE country = ANY($%d)) ", codes) } if st := req.FormValue("states"); st != "" { states := toTextArray(st, imports.ImportStateNames) cond(" state = ANY($%d) ", states) } if ks := req.FormValue("kinds"); ks != "" { kinds := toTextArray(ks, imports.ImportKindNames()) cond(" kind = ANY($%d) ", kinds) } if idss := req.FormValue("ids"); idss != "" { ids := toInt8Array(idss) cond(" id = ANY($%d) ", ids) } if from := req.FormValue("from"); from != "" { fromTime, err := common.ParseTime(from) if err != nil { return nil, nil, nil, err } l = append(l, buildFilterTerm("enqueued >= $%d", fromTime)) b = append(b, buildFilterTerm("enqueued < $%d", fromTime)) } else { noBefore = true } if to := req.FormValue("to"); to != "" { toTime, err := common.ParseTime(to) if err != nil { return nil, nil, nil, err } l = append(l, buildFilterTerm("enqueued <= $%d", toTime)) a = append(a, buildFilterTerm("enqueued > $%d", toTime)) } else { noAfter = true } switch warn := strings.ToLower(req.FormValue("warnings")); warn { case "1", "t", "true": cond(` EXISTS(SELECT 1 FROM import.import_logs WHERE kind = 'warn'::log_type and import_id = imports.id)`) } switch errors := strings.ToLower(req.FormValue("errors")); errors { case "1", "t", "true": cond(` EXISTS(SELECT 1 FROM import.import_logs WHERE kind = 'error'::log_type and import_id = imports.id)`) } fl := &filledStmt{} fa := &filledStmt{} fb := &filledStmt{} fa.stmt.WriteString(selectEnqueuedSQL) fb.stmt.WriteString(selectEnqueuedSQL) var counting bool if projection != "" { fl.stmt.WriteString(projection) } else { switch count := strings.ToLower(req.FormValue("count")); count { case "1", "t", "true": counting = true fl.stmt.WriteString(selectImportsCountSQL) default: fl.stmt.WriteString(selectImportsSQL) } } if len(l) == 0 { fl.stmt.WriteString(" TRUE ") } else { l.serialize(&fl.stmt, &fl.args) } if len(b) == 0 { fb.stmt.WriteString(" TRUE ") } else { b.serialize(&fb.stmt, &fb.args) } if len(a) == 0 { fa.stmt.WriteString(" TRUE ") } else { a.serialize(&fa.stmt, &fa.args) } if !counting { fl.stmt.WriteString(" ORDER BY enqueued DESC ") fa.stmt.WriteString(" ORDER BY enqueued LIMIT 1") fb.stmt.WriteString(" ORDER BY enqueued DESC LIMIT 1") } if noBefore { fb = nil } if noAfter { fa = nil } return fl, fb, fa, nil } func neighbored(ctx context.Context, conn *sql.Conn, fb *filledStmt) *models.ImportTime { var when time.Time err := conn.QueryRowContext(ctx, fb.stmt.String(), fb.args...).Scan(&when) switch { case err == sql.ErrNoRows: return nil case err != nil: log.Printf("warn: %v\n", err) return nil } return &models.ImportTime{Time: when.UTC()} } func exportImports(rw http.ResponseWriter, req *http.Request) { list, _, _, err := buildFilters(selectExportSQL, req) if err != nil { http.Error(rw, "error: "+err.Error(), http.StatusBadRequest) return } rw.Header().Add("Content-Type", "text/csv") out := csv.NewWriter(rw) record := []string{ "#id", "#kind", "#enqueued", "#changed", "#user", "#country", "#signer", "#state", "#warnings", "#source", } if err := out.Write(record); err != nil { // Too late for HTTP status message. log.Printf("error: %v\n", err) return } conn := mw.GetDBConn(req) ctx := req.Context() var rows *sql.Rows if rows, err = conn.QueryContext(ctx, list.stmt.String(), list.args...); err != nil { log.Printf("error: %v\n", err) return } defer rows.Close() stringString := func(s sql.NullString) string { if s.Valid { return s.String } return "" } // Extract some meta infos from the import. type Description interface { Description() (string, error) } for rows.Next() { var ( id int64 state string enqueued time.Time changed time.Time kind string user string country string signer sql.NullString warnings bool data string description string ) if err = rows.Scan( &id, &state, &enqueued, &changed, &kind, &user, &country, &signer, &warnings, &data, ); err != nil { return } // Do some introspection on the job to be more verbose. if jc := imports.FindJobCreator(imports.JobKind(kind)); jc != nil { job := jc.Create() if err := common.FromJSONString(data, job); err != nil { log.Printf("error: %v\n", err) } else if desc, ok := job.(Description); ok { if description, err = desc.Description(); err != nil { log.Printf("error: %v\n", err) } } } record[0] = strconv.FormatInt(id, 10) record[1] = kind record[2] = enqueued.UTC().Format(common.TimeFormat) record[3] = changed.UTC().Format(common.TimeFormat) record[4] = user record[5] = country record[6] = stringString(signer) record[7] = state record[8] = strconv.FormatBool(warnings) record[9] = strings.Replace(description, ",", "|", -1) if err := out.Write(record); err != nil { log.Printf("error: %v\n", err) return } } out.Flush() if err := out.Error(); err != nil { log.Printf("error: %v\n", err) } if err = rows.Err(); err != nil { log.Printf("error: %v\n", err) return } } func listImports(req *http.Request) (jr mw.JSONResult, err error) { var list, before, after *filledStmt if list, before, after, err = buildFilters("", req); err != nil { return } ctx := req.Context() conn := mw.JSONConn(req) // Fast path for counting switch count := strings.ToLower(req.FormValue("count")); count { case "1", "t", "true": var count int64 err = conn.QueryRowContext(ctx, list.stmt.String(), list.args...).Scan(&count) switch { case err == sql.ErrNoRows: count, err = 0, nil case err != nil: return } jr = mw.JSONResult{Result: count} return } // Generate the list var rows *sql.Rows if rows, err = conn.QueryContext(ctx, list.stmt.String(), list.args...); err != nil { return } defer rows.Close() imports := make([]*models.Import, 0, 20) var signer sql.NullString for rows.Next() { var it models.Import var enqueued time.Time var changed time.Time if err = rows.Scan( &it.ID, &it.State, &enqueued, &changed, &it.Kind, &it.User, &signer, &it.Summary, &it.Errors, &it.Warnings, ); err != nil { return } if signer.Valid { it.Signer = signer.String } it.Enqueued = models.ImportTime{Time: enqueued.UTC()} it.Changed = models.ImportTime{Time: changed.UTC()} imports = append(imports, &it) } if err = rows.Err(); err != nil { return } var prev, next *models.ImportTime if before != nil { prev = neighbored(ctx, conn, before) } if after != nil { next = neighbored(ctx, conn, after) } jr = mw.JSONResult{ Result: struct { Prev *models.ImportTime `json:"prev,omitempty"` Next *models.ImportTime `json:"next,omitempty"` Imports []*models.Import `json:"imports"` }{ Imports: imports, Prev: prev, Next: next, }, } return } func importLogs(req *http.Request) (jr mw.JSONResult, err error) { ctx := req.Context() id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64) conn := mw.JSONConn(req) // Check if he have such a import job first. var summary sql.NullString var enqueued time.Time err = conn.QueryRowContext(ctx, selectImportSummarySQL, id).Scan( &summary, &enqueued, ) switch { case err == sql.ErrNoRows: err = mw.JSONError{ Code: http.StatusNotFound, Message: fmt.Sprintf("Cannot find import #%d.", id), } return case err != nil: return } enqueued = enqueued.UTC() var sum interface{} if summary.Valid { if err = json.NewDecoder( strings.NewReader(summary.String)).Decode(&sum); err != nil { return } } // We have it -> generate log entries. var rows *sql.Rows rows, err = conn.QueryContext(ctx, selectImportLogsSQL, id) if err != nil { return } defer rows.Close() entries := make([]*models.ImportLogEntry, 0, 10) for rows.Next() { var entry models.ImportLogEntry var t time.Time if err = rows.Scan(&t, &entry.Kind, &entry.Message); err != nil { return } entry.Time = models.ImportTime{Time: t.UTC()} entries = append(entries, &entry) } if err = rows.Err(); err != nil { return } jr = mw.JSONResult{ Result: struct { Enqueued models.ImportTime `json:"enqueued"` Summary interface{} `json:"summary,omitempty"` Entries []*models.ImportLogEntry `json:"entries"` }{ Enqueued: models.ImportTime{Time: enqueued}, Summary: sum, Entries: entries, }, } return } func deleteImport(req *http.Request) (jr mw.JSONResult, err error) { ctx := req.Context() id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64) var tx *sql.Tx tx, err = mw.JSONConn(req).BeginTx(ctx, nil) if err != nil { return } defer tx.Rollback() // Check if he have such a import job first. var dummy bool err = tx.QueryRowContext(ctx, selectHasNoRunningImportSQL, id).Scan(&dummy) switch { case err == sql.ErrNoRows: err = mw.JSONError{ Code: http.StatusNotFound, Message: fmt.Sprintf("Cannot find import #%d.", id), } return case err != nil: return } if _, err = tx.ExecContext(ctx, deleteLogsSQL, id); err != nil { return } if _, err = tx.ExecContext(ctx, deleteImportSQL, id); err != nil { return } if err = tx.Commit(); err != nil { return } jr = mw.JSONResult{Code: http.StatusNoContent} return } const ( isPendingSQL = ` SELECT state = 'pending'::import_state, kind FROM import.imports WHERE id = $1` reviewSQL = ` UPDATE import.imports SET state = $1::import_state, changed = CURRENT_TIMESTAMP, signer = $2 WHERE id = $3` deleteImportDataSQL = `SELECT import.del_import($1)` deleteImportTrackSQL = ` DELETE FROM import.track_imports WHERE import_id = $1` logDecisionSQL = ` INSERT INTO import.import_logs (import_id, msg) VALUES ($1, $2)` ) func reviewImports(req *http.Request) (mw.JSONResult, error) { rs := *mw.JSONInput(req).(*[]models.Review) type reviewResult struct { ID int64 `json:"id"` Message string `json:"message,omitempty"` Error string `json:"error,omitempty"` } results := make([]reviewResult, len(rs)) conn := mw.JSONConn(req) for i := range rs { rev := &rs[i] msg, err := decideImport(req, conn, rev.ID, string(rev.State)) var errString string if err != nil { errString = err.Error() } results[i] = reviewResult{ ID: rev.ID, Message: msg, Error: errString, } } return mw.JSONResult{Result: results}, nil } func reviewImport(req *http.Request) (jr mw.JSONResult, err error) { vars := mux.Vars(req) id, _ := strconv.ParseInt(vars["id"], 10, 64) state := vars["state"] var msg string if msg, err = decideImport(req, mw.JSONConn(req), id, state); err != nil { return } result := struct { Message string `json:"message"` }{ Message: msg, } jr = mw.JSONResult{Result: &result} return } func decideImport( req *http.Request, conn *sql.Conn, id int64, state string, ) (message string, err error) { ctx := req.Context() var tx *sql.Tx if tx, err = conn.BeginTx(ctx, nil); err != nil { return } defer tx.Rollback() var pending bool var kind string err = tx.QueryRowContext(ctx, isPendingSQL, id).Scan(&pending, &kind) switch { case err == sql.ErrNoRows: err = mw.JSONError{ Code: http.StatusNotFound, Message: fmt.Sprintf("cannot find import #%d", id), } return case err != nil: return case !pending: err = mw.JSONError{ Code: http.StatusConflict, Message: fmt.Sprintf("import #%d is not pending", id), } return } if state == "accepted" { if jc := imports.FindJobCreator(imports.JobKind(kind)); jc != nil { if err = jc.StageDone(ctx, tx, id); err != nil { return } } } else { if _, err = tx.ExecContext(ctx, deleteImportDataSQL, id); err != nil { return } } // Remove the import track if _, err = tx.ExecContext(ctx, deleteImportTrackSQL, id); err != nil { return } // Log the decision and set the final state. session, _ := auth.GetSession(req) who := session.User if _, err = tx.ExecContext(ctx, logDecisionSQL, id, fmt.Sprintf("User '%s' %s import %d.", who, state, id)); err != nil { return } if _, err = tx.ExecContext(ctx, reviewSQL, state, who, id); err != nil { return } if err = tx.Commit(); err != nil { return } message = fmt.Sprintf( "Import #%d successfully changed to state '%s'.", id, state) return }