Mercurial > gemma
changeset 1627:b10aa02d7819
Refactored: Moved REST /api/imports/scheduler to /api/imports/config
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Wed, 19 Dec 2018 15:58:44 +0100 |
parents | 92da44ba610c |
children | 8ad51ad5a9ee |
files | pkg/controllers/importconfig.go pkg/controllers/routes.go pkg/controllers/scheduler.go pkg/imports/config.go pkg/imports/queue.go pkg/models/scheduler.go |
diffstat | 6 files changed, 494 insertions(+), 482 deletions(-) [+] |
line wrap: on
line diff
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pkg/controllers/importconfig.go Wed Dec 19 15:58:44 2018 +0100 @@ -0,0 +1,399 @@ +// This is Free Software under GNU Affero General Public License v >= 3.0 +// without warranty, see README.md and license for details. +// +// SPDX-License-Identifier: AGPL-3.0-or-later +// License-Filename: LICENSES/AGPL-3.0.txt +// +// Copyright (C) 2018 by via donau +// – Österreichische Wasserstraßen-Gesellschaft mbH +// Software engineering by Intevation GmbH +// +// Author(s): +// * Sascha L. Teichmann <sascha.teichmann@intevation.de> + +package controllers + +import ( + "database/sql" + "errors" + "fmt" + "net/http" + "strconv" + + "github.com/gorilla/mux" + + "gemma.intevation.de/gemma/pkg/auth" + "gemma.intevation.de/gemma/pkg/imports" + "gemma.intevation.de/gemma/pkg/scheduler" +) + +const ( + selectImportConfigurationPrefix = ` +SELECT + id, + username, + kind, + send_email, + auto_accept, + cron, + url +FROM waterway.import_configuration` + + selectImportConfigurationSQL = selectImportConfigurationPrefix + ` +ORDER by id` + + selectImportConfigurationIDSQL = selectImportConfigurationPrefix + ` +WHERE id = $1` + + insertImportConfigurationSQL = ` +INSERT INTO waterway.import_configuration +(username, kind, cron, send_email, auto_accept, url) +VALUES ($1, $2, $3, $4, $5, $6) +RETURNING id` + + hasImportConfigurationSQL = ` +SELECT true FROM waterway.import_configuration +WHERE id = $1` + + deleteImportConfiguationSQL = ` +DELETE FROM waterway.import_configuration +WHERE id = $1` + + updateImportConfigurationSQL = ` +UPDATE waterway.import_configuration SET + username = $2 + kind = $3, + cron = $4, + url = $5, + send_email = $6, + auto_accept = $7 +WHERE id = $1 +` +) + +func modifyImportConfig( + input interface{}, + req *http.Request, + conn *sql.Conn, +) (jr JSONResult, err error) { + + ctx := req.Context() + + importConfig := input.(*imports.Config) + + id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64) + + var tx *sql.Tx + + if tx, err = conn.BeginTx(ctx, nil); err != nil { + return + } + defer tx.Rollback() + + var ( + entry imports.IDConfig + kind string + cron sql.NullString + url sql.NullString + ) + + err = conn.QueryRowContext(ctx, selectImportConfigurationIDSQL, id).Scan( + &entry.ID, + &entry.User, + &kind, + &entry.SendEMail, + &entry.AutoAccept, + &cron, + &url, + ) + + switch { + case err == sql.ErrNoRows: + err = JSONError{ + Code: http.StatusNotFound, + Message: fmt.Sprintf("No schedule %d found", id), + } + return + case err != nil: + return + } + + session, _ := auth.GetSession(req) + + entry.SendEMail = importConfig.SendEMail + entry.AutoAccept = importConfig.AutoAccept + + if importConfig.Cron != nil { + cron = sql.NullString{String: string(*importConfig.Cron), Valid: true} + } + + if importConfig.URL != nil { + url = sql.NullString{String: *importConfig.URL, Valid: true} + } + + if _, err = tx.ExecContext(ctx, updateImportConfigurationSQL, + id, + session.User, + string(importConfig.Kind), + cron, + url, + importConfig.SendEMail, + importConfig.AutoAccept, + ); err != nil { + return + } + + scheduler.UnbindByID(id) + + if cron.Valid { + if err = scheduler.BindAction( + string(importConfig.Kind), + cron.String, + session.User, + &id, + ); err != nil { + return + } + } + + if err = tx.Commit(); err != nil { + return + } + + var result = struct { + ID int64 `json:"id"` + }{ + ID: id, + } + + jr = JSONResult{Result: &result} + return +} + +func infoImportConfig( + _ interface{}, + req *http.Request, + conn *sql.Conn, +) (jr JSONResult, err error) { + + ctx := req.Context() + + id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64) + + var ( + entry imports.IDConfig + kind string + cron sql.NullString + url sql.NullString + ) + + err = conn.QueryRowContext(ctx, selectImportConfigurationIDSQL, id).Scan( + &entry.ID, + &entry.User, + &kind, + &entry.SendEMail, + &entry.AutoAccept, + &cron, + &url, + ) + + switch { + case err == sql.ErrNoRows: + err = JSONError{ + Code: http.StatusNotFound, + Message: fmt.Sprintf("No schedule %d found", id), + } + return + case err != nil: + return + } + + entry.Kind = imports.ImportKind(kind) + if cron.Valid { + cs := imports.CronSpec(cron.String) + entry.Cron = &cs + } + if url.Valid { + entry.URL = &url.String + } + + jr = JSONResult{Result: &entry} + return +} + +func deleteImportConfig( + _ interface{}, + req *http.Request, + conn *sql.Conn, +) (jr JSONResult, err error) { + + ctx := req.Context() + + id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64) + + var tx *sql.Tx + if tx, err = conn.BeginTx(ctx, nil); err != nil { + return + } + defer tx.Rollback() + + var found bool + err = tx.QueryRowContext(ctx, hasImportConfigurationSQL, id).Scan(&found) + switch { + case err == sql.ErrNoRows: + err = JSONError{ + Code: http.StatusNotFound, + Message: fmt.Sprintf("No schedule %d found", id), + } + return + case err != nil: + return + case !found: + err = errors.New("Unexpected result") + return + } + + if _, err = tx.ExecContext(ctx, deleteImportConfiguationSQL, id); err != nil { + return + } + + // Remove from running scheduler. + scheduler.UnbindByID(id) + + if err = tx.Commit(); err != nil { + return + } + + var result = struct { + ID int64 `json:"id"` + }{ + ID: id, + } + + jr = JSONResult{Result: &result} + return +} + +func addImportConfig( + input interface{}, + req *http.Request, + conn *sql.Conn, +) (jr JSONResult, err error) { + + importConfig := input.(*imports.Config) + + session, _ := auth.GetSession(req) + + var cron, url sql.NullString + + if importConfig.Cron != nil { + cron = sql.NullString{String: string(*importConfig.Cron), Valid: true} + } + if importConfig.URL != nil { + url = sql.NullString{String: *importConfig.URL, Valid: true} + } + + ctx := req.Context() + + var tx *sql.Tx + + if tx, err = conn.BeginTx(ctx, nil); err != nil { + return + } + defer tx.Rollback() + + var id int64 + if err = tx.QueryRowContext( + ctx, + insertImportConfigurationSQL, + session.User, + string(importConfig.Kind), + cron, + importConfig.SendEMail, + importConfig.AutoAccept, + url, + ).Scan(&id); err != nil { + return + } + + // Need to start a scheduler job right away? + if importConfig.Cron != nil { + if err = scheduler.BindAction( + string(importConfig.Kind), + string(*importConfig.Cron), + session.User, + &id, + ); err != nil { + return + } + } + + if err = tx.Commit(); err != nil { + scheduler.UnbindByID(id) + } + + var result = struct { + ID int64 `json:"id"` + }{ + ID: id, + } + + jr = JSONResult{ + Code: http.StatusCreated, + Result: &result, + } + return +} + +func listImportConfigs( + _ interface{}, + req *http.Request, + conn *sql.Conn, +) (jr JSONResult, err error) { + + ctx := req.Context() + var rows *sql.Rows + + if rows, err = conn.QueryContext(ctx, selectImportConfigurationSQL); err != nil { + return + } + defer rows.Close() + + list := []*imports.IDConfig{} + + for rows.Next() { + var ( + entry imports.IDConfig + kind string + cron sql.NullString + url sql.NullString + ) + if err = rows.Scan( + &entry.ID, + &entry.User, + &kind, + &entry.SendEMail, + &entry.AutoAccept, + &cron, + &url, + ); err != nil { + return + } + entry.Kind = imports.ImportKind(kind) + if cron.Valid { + cs := imports.CronSpec(cron.String) + entry.Cron = &cs + } + if url.Valid { + entry.URL = &url.String + } + list = append(list, &entry) + } + + if err = rows.Err(); err != nil { + return + } + + jr = JSONResult{Result: list} + return +}
--- a/pkg/controllers/routes.go Wed Dec 19 15:07:14 2018 +0100 +++ b/pkg/controllers/routes.go Wed Dec 19 15:58:44 2018 +0100 @@ -21,6 +21,7 @@ "github.com/gorilla/mux" "gemma.intevation.de/gemma/pkg/auth" + "gemma.intevation.de/gemma/pkg/imports" "gemma.intevation.de/gemma/pkg/middleware" "gemma.intevation.de/gemma/pkg/models" ) @@ -176,30 +177,30 @@ })).Methods(http.MethodPost) // Import scheduler configuration - api.Handle("/imports/scheduler/{id:[0-9]+}", + api.Handle("/imports/config/{id:[0-9]+}", waterwayAdmin(&JSONHandler{ - Handle: modifySchedule, + Handle: modifyImportConfig, })).Methods(http.MethodPatch) - api.Handle("/imports/scheduler/{id:[0-9]+}", + api.Handle("/imports/config/{id:[0-9]+}", waterwayAdmin(&JSONHandler{ - Handle: deleteSchedule, + Handle: deleteImportConfig, })).Methods(http.MethodDelete) - api.Handle("/imports/scheduler/{id:[0-9]+}", + api.Handle("/imports/config/{id:[0-9]+}", waterwayAdmin(&JSONHandler{ - Handle: infoSchedule, + Handle: infoImportConfig, })).Methods(http.MethodGet) - api.Handle("/imports/scheduler", + api.Handle("/imports/config", waterwayAdmin(&JSONHandler{ - Input: func() interface{} { return new(models.ImportConfig) }, - Handle: addSchedule, + Input: func() interface{} { return new(imports.Config) }, + Handle: addImportConfig, })).Methods(http.MethodPost) - api.Handle("/imports/scheduler", + api.Handle("/imports/config", waterwayAdmin(&JSONHandler{ - Handle: listScheduler, + Handle: listImportConfigs, })).Methods(http.MethodGet) // Import queue
--- a/pkg/controllers/scheduler.go Wed Dec 19 15:07:14 2018 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,398 +0,0 @@ -// This is Free Software under GNU Affero General Public License v >= 3.0 -// without warranty, see README.md and license for details. -// -// SPDX-License-Identifier: AGPL-3.0-or-later -// License-Filename: LICENSES/AGPL-3.0.txt -// -// Copyright (C) 2018 by via donau -// – Österreichische Wasserstraßen-Gesellschaft mbH -// Software engineering by Intevation GmbH -// -// Author(s): -// * Sascha L. Teichmann <sascha.teichmann@intevation.de> - -package controllers - -import ( - "database/sql" - "errors" - "fmt" - "net/http" - "strconv" - - "gemma.intevation.de/gemma/pkg/auth" - "gemma.intevation.de/gemma/pkg/models" - "gemma.intevation.de/gemma/pkg/scheduler" - "github.com/gorilla/mux" -) - -const ( - selectImportConfigurationPrefix = ` -SELECT - id, - username, - kind, - send_email, - auto_accept, - cron, - url -FROM waterway.import_configuration` - - selectImportConfigurationSQL = selectImportConfigurationPrefix + ` -ORDER by id` - - selectImportConfigurationIDSQL = selectImportConfigurationPrefix + ` -WHERE id = $1` - - insertImportConfigurationSQL = ` -INSERT INTO waterway.import_configuration -(username, kind, cron, send_email, auto_accept, url) -VALUES ($1, $2, $3, $4, $5, $6) -RETURNING id` - - hasImportConfigurationSQL = ` -SELECT true FROM waterway.import_configuration -WHERE id = $1` - - deleteImportConfiguationSQL = ` -DELETE FROM waterway.import_configuration -WHERE id = $1` - - updateImportConfigurationSQL = ` -UPDATE waterway.import_configuration SET - username = $2 - kind = $3, - cron = $4, - url = $5, - send_email = $6, - auto_accept = $7 -WHERE id = $1 -` -) - -func modifySchedule( - input interface{}, - req *http.Request, - conn *sql.Conn, -) (jr JSONResult, err error) { - - ctx := req.Context() - - importConfig := input.(*models.ImportConfig) - - id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64) - - var tx *sql.Tx - - if tx, err = conn.BeginTx(ctx, nil); err != nil { - return - } - defer tx.Rollback() - - var ( - entry models.IDImportConfig - kind string - cron sql.NullString - url sql.NullString - ) - - err = conn.QueryRowContext(ctx, selectImportConfigurationIDSQL, id).Scan( - &entry.ID, - &entry.User, - &kind, - &entry.SendEMail, - &entry.AutoAccept, - &cron, - &url, - ) - - switch { - case err == sql.ErrNoRows: - err = JSONError{ - Code: http.StatusNotFound, - Message: fmt.Sprintf("No schedule %d found", id), - } - return - case err != nil: - return - } - - session, _ := auth.GetSession(req) - - entry.SendEMail = importConfig.SendEMail - entry.AutoAccept = importConfig.AutoAccept - - if importConfig.Cron != nil { - cron = sql.NullString{String: string(*importConfig.Cron), Valid: true} - } - - if importConfig.URL != nil { - url = sql.NullString{String: *importConfig.URL, Valid: true} - } - - if _, err = tx.ExecContext(ctx, updateImportConfigurationSQL, - id, - session.User, - string(importConfig.Kind), - cron, - url, - importConfig.SendEMail, - importConfig.AutoAccept, - ); err != nil { - return - } - - scheduler.UnbindByID(id) - - if cron.Valid { - if err = scheduler.BindAction( - string(importConfig.Kind), - cron.String, - session.User, - &id, - ); err != nil { - return - } - } - - if err = tx.Commit(); err != nil { - return - } - - var result = struct { - ID int64 `json:"id"` - }{ - ID: id, - } - - jr = JSONResult{Result: &result} - return -} - -func infoSchedule( - _ interface{}, - req *http.Request, - conn *sql.Conn, -) (jr JSONResult, err error) { - - ctx := req.Context() - - id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64) - - var ( - entry models.IDImportConfig - kind string - cron sql.NullString - url sql.NullString - ) - - err = conn.QueryRowContext(ctx, selectImportConfigurationIDSQL, id).Scan( - &entry.ID, - &entry.User, - &kind, - &entry.SendEMail, - &entry.AutoAccept, - &cron, - &url, - ) - - switch { - case err == sql.ErrNoRows: - err = JSONError{ - Code: http.StatusNotFound, - Message: fmt.Sprintf("No schedule %d found", id), - } - return - case err != nil: - return - } - - entry.Kind = models.ImportKind(kind) - if cron.Valid { - cs := models.CronSpec(cron.String) - entry.Cron = &cs - } - if url.Valid { - entry.URL = &url.String - } - - jr = JSONResult{Result: &entry} - return -} - -func deleteSchedule( - _ interface{}, - req *http.Request, - conn *sql.Conn, -) (jr JSONResult, err error) { - - ctx := req.Context() - - id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64) - - var tx *sql.Tx - if tx, err = conn.BeginTx(ctx, nil); err != nil { - return - } - defer tx.Rollback() - - var found bool - err = tx.QueryRowContext(ctx, hasImportConfigurationSQL, id).Scan(&found) - switch { - case err == sql.ErrNoRows: - err = JSONError{ - Code: http.StatusNotFound, - Message: fmt.Sprintf("No schedule %d found", id), - } - return - case err != nil: - return - case !found: - err = errors.New("Unexpected result") - return - } - - if _, err = tx.ExecContext(ctx, deleteImportConfiguationSQL, id); err != nil { - return - } - - // Remove from running scheduler. - scheduler.UnbindByID(id) - - if err = tx.Commit(); err != nil { - return - } - - var result = struct { - ID int64 `json:"id"` - }{ - ID: id, - } - - jr = JSONResult{Result: &result} - return -} - -func addSchedule( - input interface{}, - req *http.Request, - conn *sql.Conn, -) (jr JSONResult, err error) { - - importConfig := input.(*models.ImportConfig) - - session, _ := auth.GetSession(req) - - var cron, url sql.NullString - - if importConfig.Cron != nil { - cron = sql.NullString{String: string(*importConfig.Cron), Valid: true} - } - if importConfig.URL != nil { - url = sql.NullString{String: *importConfig.URL, Valid: true} - } - - ctx := req.Context() - - var tx *sql.Tx - - if tx, err = conn.BeginTx(ctx, nil); err != nil { - return - } - defer tx.Rollback() - - var id int64 - if err = tx.QueryRowContext( - ctx, - insertImportConfigurationSQL, - session.User, - string(importConfig.Kind), - cron, - importConfig.SendEMail, - importConfig.AutoAccept, - url, - ).Scan(&id); err != nil { - return - } - - // Need to start a scheduler job right away? - if importConfig.Cron != nil { - if err = scheduler.BindAction( - string(importConfig.Kind), - string(*importConfig.Cron), - session.User, - &id, - ); err != nil { - return - } - } - - if err = tx.Commit(); err != nil { - scheduler.UnbindByID(id) - } - - var result = struct { - ID int64 `json:"id"` - }{ - ID: id, - } - - jr = JSONResult{ - Code: http.StatusCreated, - Result: &result, - } - return -} - -func listScheduler( - _ interface{}, - req *http.Request, - conn *sql.Conn, -) (jr JSONResult, err error) { - - ctx := req.Context() - var rows *sql.Rows - - if rows, err = conn.QueryContext(ctx, selectImportConfigurationSQL); err != nil { - return - } - defer rows.Close() - - list := []*models.IDImportConfig{} - - for rows.Next() { - var ( - entry models.IDImportConfig - kind string - cron sql.NullString - url sql.NullString - ) - if err = rows.Scan( - &entry.ID, - &entry.User, - &kind, - &entry.SendEMail, - &entry.AutoAccept, - &cron, - &url, - ); err != nil { - return - } - entry.Kind = models.ImportKind(kind) - if cron.Valid { - cs := models.CronSpec(cron.String) - entry.Cron = &cs - } - if url.Valid { - entry.URL = &url.String - } - list = append(list, &entry) - } - - if err = rows.Err(); err != nil { - return - } - - jr = JSONResult{Result: list} - return -}
--- /dev/null Thu Jan 01 00:00:00 1970 +0000 +++ b/pkg/imports/config.go Wed Dec 19 15:58:44 2018 +0100 @@ -0,0 +1,71 @@ +// This is Free Software under GNU Affero General Public License v >= 3.0 +// without warranty, see README.md and license for details. +// +// SPDX-License-Identifier: AGPL-3.0-or-later +// License-Filename: LICENSES/AGPL-3.0.txt +// +// Copyright (C) 2018 by via donau +// – Österreichische Wasserstraßen-Gesellschaft mbH +// Software engineering by Intevation GmbH +// +// Author(s): +// * Sascha L. Teichmann <sascha.teichmann@intevation.de> + +package imports + +import ( + "encoding/json" + "fmt" + + "github.com/robfig/cron" +) + +type ( + CronSpec string + ImportKind string + + Config struct { + Kind ImportKind `json:"kind"` + SendEMail bool `json:"send-email"` + AutoAccept bool `json:"auto-accept"` + Cron *CronSpec `json:"cron"` + URL *string `json:"url"` + } + + IDConfig struct { + ID int64 `json:"id"` + User string `json:"user"` + Kind ImportKind `json:"kind"` + SendEMail bool `json:"send-email"` + AutoAccept bool `json:"auto-accept"` + Cron *CronSpec `json:"cron,omitempty"` + URL *string `json:"url,omitempty"` + } +) + +func (ik *ImportKind) UnmarshalJSON(data []byte) error { + var s string + if err := json.Unmarshal(data, &s); err != nil { + return err + } + + if !HasImportKindName(s) { + return fmt.Errorf("Unknown kind '%s'", s) + } + + *ik = ImportKind(s) + + return nil +} + +func (cs *CronSpec) UnmarshalJSON(data []byte) error { + var spec string + if err := json.Unmarshal(data, &spec); err != nil { + return err + } + if _, err := cron.Parse(spec); err != nil { + return err + } + *cs = CronSpec(spec) + return nil +}
--- a/pkg/imports/queue.go Wed Dec 19 15:07:14 2018 +0100 +++ b/pkg/imports/queue.go Wed Dec 19 15:58:44 2018 +0100 @@ -193,6 +193,18 @@ return iqueue.importKindNames() } +// HasImportKind checks if the import queue supports a given kind. +func HasImportKindName(kind string) bool { + return iqueue.hasImportKindName(kind) +} + +// +func (q *importQueue) hasImportKindName(kind string) bool { + q.creatorsMu.Lock() + defer q.creatorsMu.Unlock() + return q.creators[JobKind(kind)] != nil +} + // RegisterJobCreator adds a JobCreator to the global import queue. // This a good candidate to be called in a init function for // a particular JobCreator.
--- a/pkg/models/scheduler.go Wed Dec 19 15:07:14 2018 +0100 +++ /dev/null Thu Jan 01 00:00:00 1970 +0000 @@ -1,73 +0,0 @@ -// This is Free Software under GNU Affero General Public License v >= 3.0 -// without warranty, see README.md and license for details. -// -// SPDX-License-Identifier: AGPL-3.0-or-later -// License-Filename: LICENSES/AGPL-3.0.txt -// -// Copyright (C) 2018 by via donau -// – Österreichische Wasserstraßen-Gesellschaft mbH -// Software engineering by Intevation GmbH -// -// Author(s): -// * Sascha L. Teichmann <sascha.teichmann@intevation.de> - -package models - -import ( - "encoding/json" - "fmt" - - "github.com/robfig/cron" - - "gemma.intevation.de/gemma/pkg/scheduler" -) - -type ( - CronSpec string - ImportKind string - - ImportConfig struct { - Kind ImportKind `json:"kind"` - SendEMail bool `json:"send-email"` - AutoAccept bool `json:"auto-accept"` - Cron *CronSpec `json:"cron"` - URL *string `json:"url"` - } - - IDImportConfig struct { - ID int64 `json:"id"` - User string `json:"user"` - Kind ImportKind `json:"kind"` - SendEMail bool `json:"send-email"` - AutoAccept bool `json:"auto-accept"` - Cron *CronSpec `json:"cron,omitempty"` - URL *string `json:"url,omitempty"` - } -) - -func (ik *ImportKind) UnmarshalJSON(data []byte) error { - var s string - if err := json.Unmarshal(data, &s); err != nil { - return err - } - - if !scheduler.HasAction(s) { - return fmt.Errorf("Unknown kind '%s'", s) - } - - *ik = ImportKind(s) - - return nil -} - -func (cs *CronSpec) UnmarshalJSON(data []byte) error { - var spec string - if err := json.Unmarshal(data, &spec); err != nil { - return err - } - if _, err := cron.Parse(spec); err != nil { - return err - } - *cs = CronSpec(spec) - return nil -}