Mercurial > gemma
changeset 2049:d1a680be7ae4 unify_imports
Imports. Re-enabled /imports/config GET.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Sun, 27 Jan 2019 14:27:37 +0100 |
parents | 3c4b8f4815af |
children | a3ccef8ec304 |
files | pkg/controllers/importconfig.go pkg/imports/config.go |
diffstat | 2 files changed, 156 insertions(+), 102 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/controllers/importconfig.go Fri Jan 25 18:27:04 2019 +0100 +++ b/pkg/controllers/importconfig.go Sun Jan 27 14:27:37 2019 +0100 @@ -15,7 +15,6 @@ import ( "database/sql" - "errors" "fmt" "net/http" "strconv" @@ -23,21 +22,15 @@ "github.com/gorilla/mux" "gemma.intevation.de/gemma/pkg/imports" - "gemma.intevation.de/gemma/pkg/scheduler" ) const ( - selectImportConfigurationPrefix = ` + selectImportConfigurationIDSQL = ` SELECT id, username, kind -FROM import.import_configuration` - - selectImportConfigurationSQL = selectImportConfigurationPrefix + ` -ORDER by id` - - selectImportConfigurationIDSQL = selectImportConfigurationPrefix + ` +FROM import.import_configuration WHERE id = $1` insertImportConfigurationSQL = ` @@ -55,19 +48,9 @@ SELECT true FROM import.import_configuration WHERE id = $1` - deleteImportConfiguationAttributesSQL = ` -DELETE FROM import.import_configuration_attributes -WHERE import_configuration_id = $1` - deleteImportConfiguationSQL = ` DELETE FROM import.import_configuration WHERE id = $1` - - updateImportConfigurationSQL = ` -UPDATE import.import_configuration SET - username = $2, - kind = $3 -WHERE id = $1` ) func runImportConfig( @@ -262,54 +245,57 @@ conn *sql.Conn, ) (jr JSONResult, err error) { - ctx := req.Context() + /* - id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64) + ctx := req.Context() - var tx *sql.Tx - if tx, err = conn.BeginTx(ctx, nil); err != nil { - return - } - defer tx.Rollback() + id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64) - var found bool - err = tx.QueryRowContext(ctx, hasImportConfigurationSQL, id).Scan(&found) - switch { - case err == sql.ErrNoRows: - err = JSONError{ - Code: http.StatusNotFound, - Message: fmt.Sprintf("No schedule %d found", id), + var tx *sql.Tx + if tx, err = conn.BeginTx(ctx, nil); err != nil { + return } - return - case err != nil: - return - case !found: - err = errors.New("Unexpected result") - return - } + defer tx.Rollback() - if _, err = tx.ExecContext(ctx, deleteImportConfiguationAttributesSQL, id); err != nil { - return - } + var found bool + err = tx.QueryRowContext(ctx, hasImportConfigurationSQL, id).Scan(&found) + switch { + case err == sql.ErrNoRows: + err = JSONError{ + Code: http.StatusNotFound, + Message: fmt.Sprintf("No schedule %d found", id), + } + return + case err != nil: + return + case !found: + err = errors.New("Unexpected result") + return + } - if _, err = tx.ExecContext(ctx, deleteImportConfiguationSQL, id); err != nil { - return - } + if _, err = tx.ExecContext(ctx, deleteImportConfiguationAttributesSQL, id); err != nil { + return + } - // Remove from running scheduler. - scheduler.UnbindByID(id) + if _, err = tx.ExecContext(ctx, deleteImportConfiguationSQL, id); err != nil { + return + } + + // Remove from running scheduler. + scheduler.UnbindByID(id) - if err = tx.Commit(); err != nil { - return - } + if err = tx.Commit(); err != nil { + return + } - var result = struct { - ID int64 `json:"id"` - }{ - ID: id, - } + var result = struct { + ID int64 `json:"id"` + }{ + ID: id, + } - jr = JSONResult{Result: &result} + jr = JSONResult{Result: &result} + */ return } @@ -432,51 +418,18 @@ conn *sql.Conn, ) (jr JSONResult, err error) { - /* - - ctx := req.Context() - var rows *sql.Rows - - if rows, err = conn.QueryContext(ctx, selectImportConfigurationSQL); err != nil { - return - } - defer rows.Close() - - list := []*imports.IDConfig{} + ctx := req.Context() + configs := []*imports.ImportConfigOut{} - for rows.Next() { - var ( - entry imports.IDConfig - kind string - cron sql.NullString - url sql.NullString - ) - if err = rows.Scan( - &entry.ID, - &entry.User, - &kind, - &entry.SendEMail, - &cron, - &url, - ); err != nil { - return - } - entry.Kind = imports.ImportKind(kind) - if cron.Valid { - cs := models.CronSpec(cron.String) - entry.Cron = &cs - } - if url.Valid { - entry.URL = &url.String - } - list = append(list, &entry) - } - - if err = rows.Err(); err != nil { - return - } - - jr = JSONResult{Result: list} - */ + if err = imports.ListAllPersistentConfigurationsContext( + ctx, conn, + func(config *imports.ImportConfigOut) error { + configs = append(configs, config) + return nil + }, + ); err != nil { + return + } + jr = JSONResult{Result: configs} return }
--- a/pkg/imports/config.go Fri Jan 25 18:27:04 2019 +0100 +++ b/pkg/imports/config.go Sun Jan 27 14:27:37 2019 +0100 @@ -36,6 +36,7 @@ ImportConfigOut struct { ID int64 `json:"id"` Kind ImportKind `json:"kind"` + User string `json:"user"` Config interface{} `json:"config,omitempty"` } @@ -78,8 +79,37 @@ SELECT k, v FROM import.import_configuration_attributes WHERE import_configuration_id = $1` + + deleteImportConfiguationAttributesSQL = ` +DELETE FROM import.import_configuration_attributes +WHERE import_configuration_id = $1` + + updateImportConfigurationSQL = ` +UPDATE import.import_configuration SET + username = $2, + kind = $3 +WHERE id = $1` + + selectImportConfigurationsByID = ` +SELECT + c.id AS id, + username, + kind, + a.k, + a.v +FROM import.import_configuration c JOIN + import.import_configuration_attributes a + ON c.id = a.import_configuration_id +ORDER by c.id` ) +func (pc *PersistentConfig) Update(ctx context.Context, tx *sql.Tx) error { + if _, err := tx.ExecContext(ctx, deleteImportConfiguationAttributesSQL, pc.ID); err != nil { + return err + } + return nil +} + func LoadPersistentConfigContext(ctx context.Context, conn *sql.Conn, id int64) (*PersistentConfig, error) { cfg := &PersistentConfig{ID: id} @@ -135,3 +165,74 @@ }) return cfg, err } + +func ListAllPersistentConfigurationsContext( + ctx context.Context, + conn *sql.Conn, + fn func(*ImportConfigOut) error, +) error { + + rows, err := conn.QueryContext(ctx, selectImportConfigurationsByID) + if err != nil { + return err + } + defer rows.Close() + + var ( + first = true + lastID int64 + pc PersistentConfig + k, v sql.NullString + ) + + send := func() error { + kind := JobKind(pc.Kind) + ctor := ImportModelForJobKind(kind) + if ctor == nil { + return fmt.Errorf("unable to deserialize kind '%s'", pc.Kind) + } + config := ctor() + pc.Attributes.Unmarshal(config) + return fn(&ImportConfigOut{ + ID: pc.ID, + Kind: ImportKind(pc.Kind), + User: pc.User, + Config: config, + }) + } + + for rows.Next() { + if err := rows.Scan(&pc.ID, &pc.User, &k, &v); err != nil { + return err + } + if !first { + if lastID != pc.ID { + if err := send(); err != nil { + return err + } + pc.Attributes = nil + } + } else { + first = false + } + + if k.Valid && v.Valid { + if pc.Attributes == nil { + pc.Attributes = common.Attributes{} + } + pc.Attributes.Set(k.String, v.String) + } + + lastID = pc.ID + } + + if err := rows.Err(); err != nil { + return err + } + + err = nil + if !first { + err = send() + } + return err +}