Mercurial > gemma
changeset 2042:d29ac997eb34 unify_imports
This breaks this branch!!!! Starting to remove the old persistent layer for configured imports.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Fri, 25 Jan 2019 16:07:09 +0100 |
parents | d61ca2b3fc12 |
children | 58b77f6b8764 |
files | pkg/controllers/importconfig.go pkg/controllers/routes.go pkg/imports/config.go pkg/imports/modelconvert.go pkg/imports/scheduled.go pkg/models/importbase.go schema/gemma.sql |
diffstat | 7 files changed, 390 insertions(+), 512 deletions(-) [+] |
line wrap: on
line diff
--- a/pkg/controllers/importconfig.go Fri Jan 25 14:33:03 2019 +0100 +++ b/pkg/controllers/importconfig.go Fri Jan 25 16:07:09 2019 +0100 @@ -14,21 +14,8 @@ package controllers import ( - "context" "database/sql" - "errors" - "fmt" "net/http" - "sort" - "strconv" - - "github.com/gorilla/mux" - - "gemma.intevation.de/gemma/pkg/auth" - "gemma.intevation.de/gemma/pkg/common" - "gemma.intevation.de/gemma/pkg/imports" - "gemma.intevation.de/gemma/pkg/models" - "gemma.intevation.de/gemma/pkg/scheduler" ) const ( @@ -50,7 +37,7 @@ insertImportConfigurationSQL = ` INSERT INTO import.import_configuration -(username, kind, cron, send_email, url) +(username, kind, cron, send_email) VALUES ($1, $2, $3, $4, $5) RETURNING id` @@ -76,8 +63,7 @@ username = $2, kind = $3, cron = $4, - url = $5, - send_email = $6 + send_email = $5 WHERE id = $1 ` ) @@ -88,25 +74,28 @@ conn *sql.Conn, ) (jr JSONResult, err error) { - id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64) + /* - ctx := req.Context() + id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64) - var jobID int64 - if jobID, err = imports.RunConfiguredImportContext(ctx, conn, id); err != nil { - return - } + ctx := req.Context() + + var jobID int64 + if jobID, err = imports.RunConfiguredImportContext(ctx, conn, id); err != nil { + return + } - var result = struct { - ID int64 `json:"id"` - }{ - ID: jobID, - } + var result = struct { + ID int64 `json:"id"` + }{ + ID: jobID, + } - jr = JSONResult{ - Code: http.StatusCreated, - Result: &result, - } + jr = JSONResult{ + Code: http.StatusCreated, + Result: &result, + } + */ return } @@ -116,104 +105,107 @@ conn *sql.Conn, ) (jr JSONResult, err error) { - ctx := req.Context() - - importConfig := input.(*imports.Config) - - id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64) - - var tx *sql.Tx - - if tx, err = conn.BeginTx(ctx, nil); err != nil { - return - } - defer tx.Rollback() + /* - var ( - entry imports.IDConfig - kind string - dummy sql.NullString - url sql.NullString - ) + ctx := req.Context() - err = conn.QueryRowContext(ctx, selectImportConfigurationIDSQL, id).Scan( - &entry.ID, - &entry.User, - &kind, - &entry.SendEMail, - &dummy, - &url, - ) + importConfig := input.(*imports.Config) - switch { - case err == sql.ErrNoRows: - err = JSONError{ - Code: http.StatusNotFound, - Message: fmt.Sprintf("No schedule %d found", id), - } - return - case err != nil: - return - } - - session, _ := auth.GetSession(req) - - entry.SendEMail = importConfig.SendEMail + id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64) - // We always take the cron spec from the input. - // If there is no spec remove schedule. - var cron sql.NullString - if importConfig.Cron != nil { - cron = sql.NullString{String: string(*importConfig.Cron), Valid: true} - } - - if importConfig.URL != nil { - url = sql.NullString{String: *importConfig.URL, Valid: true} - } + var tx *sql.Tx - if _, err = tx.ExecContext(ctx, updateImportConfigurationSQL, - id, - session.User, - string(importConfig.Kind), - cron, - url, - importConfig.SendEMail, - ); err != nil { - return - } - - if importConfig.Attributes != nil { - if _, err = tx.ExecContext(ctx, deleteImportConfiguationAttributesSQL, id); err != nil { + if tx, err = conn.BeginTx(ctx, nil); err != nil { return } - if err = storeConfigAttributes(ctx, tx, id, importConfig.Attributes); err != nil { + defer tx.Rollback() + + var ( + entry imports.IDConfig + kind string + dummy sql.NullString + url sql.NullString + ) + + err = conn.QueryRowContext(ctx, selectImportConfigurationIDSQL, id).Scan( + &entry.ID, + &entry.User, + &kind, + &entry.SendEMail, + &dummy, + &url, + ) + + switch { + case err == sql.ErrNoRows: + err = JSONError{ + Code: http.StatusNotFound, + Message: fmt.Sprintf("No schedule %d found", id), + } + return + case err != nil: return } - } + + session, _ := auth.GetSession(req) + + entry.SendEMail = importConfig.SendEMail - scheduler.UnbindByID(id) + // We always take the cron spec from the input. + // If there is no spec remove schedule. + var cron sql.NullString + if importConfig.Cron != nil { + cron = sql.NullString{String: string(*importConfig.Cron), Valid: true} + } - if cron.Valid { - if err = scheduler.BindAction( + if importConfig.URL != nil { + url = sql.NullString{String: *importConfig.URL, Valid: true} + } + + if _, err = tx.ExecContext(ctx, updateImportConfigurationSQL, + id, + session.User, string(importConfig.Kind), - cron.String, - id, + cron, + url, + importConfig.SendEMail, ); err != nil { return } - } - if err = tx.Commit(); err != nil { - return - } + if importConfig.Attributes != nil { + if _, err = tx.ExecContext(ctx, deleteImportConfiguationAttributesSQL, id); err != nil { + return + } + if err = storeConfigAttributes(ctx, tx, id, importConfig.Attributes); err != nil { + return + } + } + + scheduler.UnbindByID(id) - var result = struct { - ID int64 `json:"id"` - }{ - ID: id, - } + if cron.Valid { + if err = scheduler.BindAction( + string(importConfig.Kind), + cron.String, + id, + ); err != nil { + return + } + } - jr = JSONResult{Result: &result} + if err = tx.Commit(); err != nil { + return + } + + var result = struct { + ID int64 `json:"id"` + }{ + ID: id, + } + + jr = JSONResult{Result: &result} + */ return } @@ -223,25 +215,28 @@ conn *sql.Conn, ) (jr JSONResult, err error) { - ctx := req.Context() + /* + + ctx := req.Context() - id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64) + id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64) - var entry *imports.IDConfig + var entry *imports.IDConfig - entry, err = imports.LoadIDConfigContext(ctx, conn, id) - switch { - case err != nil: - return - case entry == nil: - err = JSONError{ - Code: http.StatusNotFound, - Message: fmt.Sprintf("No schedule %d found", id), + entry, err = imports.LoadIDConfigContext(ctx, conn, id) + switch { + case err != nil: + return + case entry == nil: + err = JSONError{ + Code: http.StatusNotFound, + Message: fmt.Sprintf("No schedule %d found", id), + } + return } - return - } - jr = JSONResult{Result: &entry} + jr = JSONResult{Result: &entry} + */ return } @@ -251,57 +246,63 @@ conn *sql.Conn, ) (jr JSONResult, err error) { - ctx := req.Context() + /* - id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64) + ctx := req.Context() - var tx *sql.Tx - if tx, err = conn.BeginTx(ctx, nil); err != nil { - return - } - defer tx.Rollback() + id, _ := strconv.ParseInt(mux.Vars(req)["id"], 10, 64) - var found bool - err = tx.QueryRowContext(ctx, hasImportConfigurationSQL, id).Scan(&found) - switch { - case err == sql.ErrNoRows: - err = JSONError{ - Code: http.StatusNotFound, - Message: fmt.Sprintf("No schedule %d found", id), + var tx *sql.Tx + if tx, err = conn.BeginTx(ctx, nil); err != nil { + return } - return - case err != nil: - return - case !found: - err = errors.New("Unexpected result") - return - } + defer tx.Rollback() - if _, err = tx.ExecContext(ctx, deleteImportConfiguationAttributesSQL, id); err != nil { - return - } + var found bool + err = tx.QueryRowContext(ctx, hasImportConfigurationSQL, id).Scan(&found) + switch { + case err == sql.ErrNoRows: + err = JSONError{ + Code: http.StatusNotFound, + Message: fmt.Sprintf("No schedule %d found", id), + } + return + case err != nil: + return + case !found: + err = errors.New("Unexpected result") + return + } - if _, err = tx.ExecContext(ctx, deleteImportConfiguationSQL, id); err != nil { - return - } + if _, err = tx.ExecContext(ctx, deleteImportConfiguationAttributesSQL, id); err != nil { + return + } - // Remove from running scheduler. - scheduler.UnbindByID(id) + if _, err = tx.ExecContext(ctx, deleteImportConfiguationSQL, id); err != nil { + return + } + + // Remove from running scheduler. + scheduler.UnbindByID(id) - if err = tx.Commit(); err != nil { - return - } + if err = tx.Commit(); err != nil { + return + } - var result = struct { - ID int64 `json:"id"` - }{ - ID: id, - } + var result = struct { + ID int64 `json:"id"` + }{ + ID: id, + } - jr = JSONResult{Result: &result} + jr = JSONResult{Result: &result} + + */ return } +/* + func storeConfigAttributes( ctx context.Context, tx *sql.Tx, @@ -332,78 +333,83 @@ return nil } +*/ + func addImportConfig( input interface{}, req *http.Request, conn *sql.Conn, ) (jr JSONResult, err error) { - importConfig := input.(*imports.Config) + /* - session, _ := auth.GetSession(req) - - var cron, url sql.NullString + importConfig := input.(*imports.Config) - if importConfig.Cron != nil { - cron = sql.NullString{String: string(*importConfig.Cron), Valid: true} - } - if importConfig.URL != nil { - url = sql.NullString{String: *importConfig.URL, Valid: true} - } + session, _ := auth.GetSession(req) - ctx := req.Context() - - var tx *sql.Tx - - if tx, err = conn.BeginTx(ctx, nil); err != nil { - return - } - defer tx.Rollback() + var cron, url sql.NullString - var id int64 - if err = tx.QueryRowContext( - ctx, - insertImportConfigurationSQL, - session.User, - string(importConfig.Kind), - cron, - importConfig.SendEMail, - url, - ).Scan(&id); err != nil { - return - } + if importConfig.Cron != nil { + cron = sql.NullString{String: string(*importConfig.Cron), Valid: true} + } + if importConfig.URL != nil { + url = sql.NullString{String: *importConfig.URL, Valid: true} + } - // Store extra attributes - if err = storeConfigAttributes(ctx, tx, id, importConfig.Attributes); err != nil { - return - } + ctx := req.Context() - // Need to start a scheduler job right away? - if importConfig.Cron != nil { - if err = scheduler.BindAction( - string(importConfig.Kind), - string(*importConfig.Cron), - id, - ); err != nil { + var tx *sql.Tx + + if tx, err = conn.BeginTx(ctx, nil); err != nil { return } - } + defer tx.Rollback() - if err = tx.Commit(); err != nil { - scheduler.UnbindByID(id) - return - } + var id int64 + if err = tx.QueryRowContext( + ctx, + insertImportConfigurationSQL, + session.User, + string(importConfig.Kind), + cron, + importConfig.SendEMail, + url, + ).Scan(&id); err != nil { + return + } + + // Store extra attributes + if err = storeConfigAttributes(ctx, tx, id, importConfig.Attributes); err != nil { + return + } - var result = struct { - ID int64 `json:"id"` - }{ - ID: id, - } + // Need to start a scheduler job right away? + if importConfig.Cron != nil { + if err = scheduler.BindAction( + string(importConfig.Kind), + string(*importConfig.Cron), + id, + ); err != nil { + return + } + } - jr = JSONResult{ - Code: http.StatusCreated, - Result: &result, - } + if err = tx.Commit(); err != nil { + scheduler.UnbindByID(id) + return + } + + var result = struct { + ID int64 `json:"id"` + }{ + ID: id, + } + + jr = JSONResult{ + Code: http.StatusCreated, + Result: &result, + } + */ return } @@ -413,48 +419,51 @@ conn *sql.Conn, ) (jr JSONResult, err error) { - ctx := req.Context() - var rows *sql.Rows - - if rows, err = conn.QueryContext(ctx, selectImportConfigurationSQL); err != nil { - return - } - defer rows.Close() - - list := []*imports.IDConfig{} + /* - for rows.Next() { - var ( - entry imports.IDConfig - kind string - cron sql.NullString - url sql.NullString - ) - if err = rows.Scan( - &entry.ID, - &entry.User, - &kind, - &entry.SendEMail, - &cron, - &url, - ); err != nil { + ctx := req.Context() + var rows *sql.Rows + + if rows, err = conn.QueryContext(ctx, selectImportConfigurationSQL); err != nil { return } - entry.Kind = imports.ImportKind(kind) - if cron.Valid { - cs := models.CronSpec(cron.String) - entry.Cron = &cs - } - if url.Valid { - entry.URL = &url.String + defer rows.Close() + + list := []*imports.IDConfig{} + + for rows.Next() { + var ( + entry imports.IDConfig + kind string + cron sql.NullString + url sql.NullString + ) + if err = rows.Scan( + &entry.ID, + &entry.User, + &kind, + &entry.SendEMail, + &cron, + &url, + ); err != nil { + return + } + entry.Kind = imports.ImportKind(kind) + if cron.Valid { + cs := models.CronSpec(cron.String) + entry.Cron = &cs + } + if url.Valid { + entry.URL = &url.String + } + list = append(list, &entry) } - list = append(list, &entry) - } - if err = rows.Err(); err != nil { - return - } + if err = rows.Err(); err != nil { + return + } - jr = JSONResult{Result: list} + jr = JSONResult{Result: list} + */ return }
--- a/pkg/controllers/routes.go Fri Jan 25 14:33:03 2019 +0100 +++ b/pkg/controllers/routes.go Fri Jan 25 16:07:09 2019 +0100 @@ -15,6 +15,7 @@ package controllers import ( + "encoding/json" "net/http" "net/http/httputil" @@ -243,7 +244,7 @@ api.Handle("/imports/config/{id:[0-9]+}", waterwayAdmin(&JSONHandler{ - Input: func() interface{} { return new(imports.Config) }, + Input: func() interface{} { return &json.RawMessage{} }, Handle: modifyImportConfig, })).Methods(http.MethodPatch) @@ -259,7 +260,7 @@ api.Handle("/imports/config", waterwayAdmin(&JSONHandler{ - Input: func() interface{} { return new(imports.Config) }, + Input: func() interface{} { return new(imports.ImportConfigIn) }, Handle: addImportConfig, })).Methods(http.MethodPost)
--- a/pkg/imports/config.go Fri Jan 25 14:33:03 2019 +0100 +++ b/pkg/imports/config.go Fri Jan 25 16:07:09 2019 +0100 @@ -21,7 +21,6 @@ "gemma.intevation.de/gemma/pkg/auth" "gemma.intevation.de/gemma/pkg/common" - "gemma.intevation.de/gemma/pkg/models" ) type ( @@ -29,34 +28,22 @@ // of the registered import types. ImportKind string - // Config is JSON serialized form of a import configuration. - Config struct { - // Kind is the import type. - Kind ImportKind `json:"kind"` - // SendEMail indicates if a mail should be be send - // when the import was changed to states - // 'pending' or 'failed'. - SendEMail bool `json:"send-email"` - // Cron is the cron schedule - // of this configuration if this value is not - // nil. If nil the import is not scheduled. - Cron *models.CronSpec `json:"cron"` - // URL is an optional URL used by the import. - URL *string `json:"url"` - // Attributes are optional key/value pairs for a configuration. - Attributes common.Attributes `json:"attributes,omitempty"` + ImportConfigIn struct { + Kind ImportKind `json:"kind"` + Config json.RawMessage `json:"config"` } - // IDConfig is the same as Config with an ID. - // Mainly used for server delivered configurations. - IDConfig struct { - ID int64 `json:"id"` - User string `json:"user"` - Kind ImportKind `json:"kind"` - SendEMail bool `json:"send-email"` - Cron *models.CronSpec `json:"cron,omitempty"` - URL *string `json:"url,omitempty"` - Attributes common.Attributes `json:"attributes,omitempty"` + ImportConfigOut struct { + ID int64 `json:"id"` + Kind ImportKind `json:"kind"` + Config interface{} `json:"config"` + } + + PersistentConfig struct { + ID int64 + User string + Kind string + Attributes common.Attributes } ) @@ -80,34 +67,26 @@ const ( configUser = "sys_admin" - loadConfigSQL = ` + loadPersistentConfigSQL = ` SELECT username, - kind, - send_email, - cron, - url + kind FROM import.import_configuration WHERE id = $1` - loadConfigAttributesSQL = ` + loadPersistentConfigAttributesSQL = ` SELECT k, v FROM import.import_configuration_attributes WHERE import_configuration_id = $1` ) -// LoadIDConfigContext loads an import configuration from database. -func LoadIDConfigContext(ctx context.Context, conn *sql.Conn, id int64) (*IDConfig, error) { +func LoadPersistentConfigContext(ctx context.Context, conn *sql.Conn, id int64) (*PersistentConfig, error) { + + cfg := &PersistentConfig{ID: id} - cfg := &IDConfig{ID: id} - var kind ImportKind - var cron, url sql.NullString - err := conn.QueryRowContext(ctx, loadConfigSQL, id).Scan( + err := conn.QueryRowContext(ctx, loadPersistentConfigSQL, id).Scan( &cfg.User, - &kind, - &cfg.SendEMail, - &cron, - &url, + &cfg.Kind, ) switch { @@ -117,16 +96,8 @@ return nil, err } - cfg.Kind = ImportKind(kind) - if cron.Valid { - c := models.CronSpec(cron.String) - cfg.Cron = &c - } - if url.Valid { - cfg.URL = &url.String - } // load the extra attributes. - rows, err := conn.QueryContext(ctx, loadConfigAttributesSQL, id) + rows, err := conn.QueryContext(ctx, loadPersistentConfigAttributesSQL, id) if err != nil { return nil, err } @@ -151,15 +122,15 @@ return cfg, nil } -func loadIDConfig(id int64) (*IDConfig, error) { - return loadIDConfigContext(context.Background(), id) +func loadPersistentConfig(id int64) (*PersistentConfig, error) { + return loadPersistentConfigContext(context.Background(), id) } -func loadIDConfigContext(ctx context.Context, id int64) (*IDConfig, error) { - var cfg *IDConfig +func loadPersistentConfigContext(ctx context.Context, id int64) (*PersistentConfig, error) { + var cfg *PersistentConfig err := auth.RunAs(ctx, configUser, func(conn *sql.Conn) error { var err error - cfg, err = LoadIDConfigContext(ctx, conn, id) + cfg, err = LoadPersistentConfigContext(ctx, conn, id) return err }) return cfg, err
--- a/pkg/imports/modelconvert.go Fri Jan 25 14:33:03 2019 +0100 +++ b/pkg/imports/modelconvert.go Fri Jan 25 16:07:09 2019 +0100 @@ -32,6 +32,10 @@ STJobKind: func() interface{} { return new(models.StretchImport) }, } +func ImportModelForJobKind(kind JobKind) func() interface{} { + return kindToImportModel[kind] +} + func MustImportModel(kind JobKind) func() interface{} { if ctor := kindToImportModel[kind]; ctor != nil { return ctor
--- a/pkg/imports/scheduled.go Fri Jan 25 14:33:03 2019 +0100 +++ b/pkg/imports/scheduled.go Fri Jan 25 16:07:09 2019 +0100 @@ -16,165 +16,13 @@ import ( "context" "database/sql" - "errors" "fmt" "log" - "time" - "gemma.intevation.de/gemma/pkg/common" + "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/scheduler" ) -// JobKindSetups maps JobKinds to special setup functions. -var JobKindSetups = map[JobKind]func(*IDConfig) (interface{}, error){ - - GMJobKind: func(cfg *IDConfig) (interface{}, error) { - log.Println("info: schedule 'gm' import") - insecure := cfg.Attributes.Bool("insecure") - return &GaugeMeasurement{ - URL: *cfg.URL, - Insecure: insecure, - }, nil - }, - - FAJobKind: func(cfg *IDConfig) (interface{}, error) { - log.Println("info: schedule 'fa' import") - insecure := cfg.Attributes.Bool("insecure") - return &FairwayAvailability{ - URL: *cfg.URL, - Insecure: insecure, - }, nil - }, - - BNJobKind: func(cfg *IDConfig) (interface{}, error) { - log.Println("info: schedule 'bn' import") - insecure := cfg.Attributes.Bool("insecure") - return &Bottleneck{ - URL: *cfg.URL, - Insecure: insecure, - }, nil - }, - - WXJobKind: func(cfg *IDConfig) (interface{}, error) { - log.Println("info: schedule 'wx' import") - ft, found := cfg.Attributes.Get("feature-type") - if !found { - return nil, errors.New("cannot find 'feature-type' attribute") - } - sb, found := cfg.Attributes.Get("sort-by") - if !found { - return nil, errors.New("cannot find 'sort-by' attribute") - } - return &WaterwayAxis{ - URL: *cfg.URL, - FeatureType: ft, - SortBy: sb, - }, nil - }, - - WAJobKind: func(cfg *IDConfig) (interface{}, error) { - log.Println("info: schedule 'wa' import") - ft, found := cfg.Attributes.Get("feature-type") - if !found { - return nil, errors.New("cannot find 'feature-type' attribute") - } - sb, found := cfg.Attributes.Get("sort-by") - if !found { - return nil, errors.New("cannot find 'sort-by' attribute") - } - return &WaterwayArea{ - URL: *cfg.URL, - FeatureType: ft, - SortBy: sb, - }, nil - }, - - FDJobKind: func(cfg *IDConfig) (interface{}, error) { - log.Println("info: schedule 'fd' import") - ft, found := cfg.Attributes.Get("feature-type") - if !found { - return nil, errors.New("cannot find 'feature-type' attribute") - } - sb, found := cfg.Attributes.Get("sort-by") - if !found { - return nil, errors.New("cannot find 'sort-by' attribute") - } - los, found := cfg.Attributes.Int("los") - if !found { - return nil, errors.New("cannot find 'los' attribute") - } - minWidth, found := cfg.Attributes.Int("min-width") - if !found { - return nil, errors.New("cannot find 'min-width' attribute") - } - maxWidth, found := cfg.Attributes.Int("max-width") - if !found { - return nil, errors.New("cannot find 'max-width' attribute") - } - depth, found := cfg.Attributes.Int("depth") - if !found { - return nil, errors.New("cannot find 'depth' attribute") - } - sourceOrganization, found := cfg.Attributes.Get("source-organization") - if !found { - return nil, errors.New("cannot find 'source-organization' attribute") - } - - return &FairwayDimension{ - URL: *cfg.URL, - FeatureType: ft, - SortBy: sb, - LOS: los, - MinWidth: minWidth, - MaxWidth: maxWidth, - Depth: depth, - SourceOrganization: sourceOrganization, - }, nil - }, - - WGJobKind: func(cfg *IDConfig) (interface{}, error) { - log.Println("info: schedule 'wg' import") - username, _ := cfg.Attributes.Get("username") - password, _ := cfg.Attributes.Get("password") - insecure := cfg.Attributes.Bool("insecure") - return &WaterwayGauge{ - URL: *cfg.URL, - Username: username, - Password: password, - Insecure: insecure, - }, nil - }, - - DMVJobKind: func(cfg *IDConfig) (interface{}, error) { - log.Println("info: schedule 'dvm' import") - username, _ := cfg.Attributes.Get("username") - password, _ := cfg.Attributes.Get("password") - insecure := cfg.Attributes.Bool("insecure") - return &DistanceMarksVirtual{ - URL: *cfg.URL, - Username: username, - Password: password, - Insecure: insecure, - }, nil - }, - DMAJobKind: func(cfg *IDConfig) (interface{}, error) { - log.Println("info: schedule 'dma' import") - ft, found := cfg.Attributes.Get("feature-type") - if !found { - return nil, errors.New("cannot find 'feature-type' attribute") - } - sb, found := cfg.Attributes.Get("sort-by") - if !found { - return nil, errors.New("cannot find 'sort-by' attribute") - } - return &DistanceMarksAshore{ - URL: *cfg.URL, - FeatureType: ft, - SortBy: sb, - }, nil - }, -} - func init() { run := func(cfgID int64) { jobID, err := RunConfiguredImport(cfgID) @@ -185,78 +33,98 @@ log.Printf("info: added import #%d to queue\n", jobID) } - for kind := range JobKindSetups { + for kind := range kindToImportModel { scheduler.RegisterAction(string(kind), run) } } // RunConfiguredImportContext runs an import configured from the database. func RunConfiguredImportContext(ctx context.Context, conn *sql.Conn, id int64) (int64, error) { - cfg, err := LoadIDConfigContext(ctx, conn, id) + cfg, err := LoadPersistentConfigContext(ctx, conn, id) return runConfiguredImport(id, cfg, err) } // RunConfiguredImport runs an import configured from the database. func RunConfiguredImport(id int64) (int64, error) { - cfg, err := loadIDConfig(id) + cfg, err := loadPersistentConfig(id) return runConfiguredImport(id, cfg, err) } -func runConfiguredImport(id int64, cfg *IDConfig, err error) (int64, error) { +func runConfiguredImport(id int64, cfg *PersistentConfig, err error) (int64, error) { if err != nil { return 0, err } if cfg == nil { - return 0, fmt.Errorf("no config found for id %d.\n", id) - } - if cfg.URL == nil { - return 0, errors.New("error: No URL specified") + return 0, fmt.Errorf("no config found for id %d.", id) } kind := JobKind(cfg.Kind) - setup := JobKindSetups[kind] - if setup == nil { - return 0, fmt.Errorf("unknown job kind: %s", cfg.Kind) + ctor := ImportModelForJobKind(kind) + if ctor == nil { + return 0, fmt.Errorf("no constructor for kind '%s'.", cfg.Kind) } - what, err := setup(cfg) - if err != nil { - return 0, err + what := ctor() + + if gqc, ok := what.(models.QueueConfigurationGetter); ok { + qc := gqc.GetQueueConfiguration() + _ = qc + // TODO: More. } - var serialized string - if serialized, err = common.ToJSONString(what); err != nil { - return 0, err + if ge, ok := what.(models.EmailTypeGetter); ok { + e := ge.GetEmailType() + _ = e + // TODO: More. } - due, _ := cfg.Attributes.Time("due") + /* + + setup := JobKindSetups[kind] + if setup == nil { + return 0, fmt.Errorf("unknown job kind: %s", cfg.Kind) + } - ret, found := cfg.Attributes.Int("retries") - var retries *int - if found { - retries = &ret - } + what, err := setup(cfg) + if err != nil { + return 0, err + } - dur, found := cfg.Attributes.Duration("wait-retry") - var waitRetry *time.Duration - if found { - waitRetry = &dur - } + var serialized string + if serialized, err = common.ToJSONString(what); err != nil { + return 0, err + } + + due, _ := cfg.Attributes.Time("due") - var jobID int64 - if jobID, err = AddJob( - kind, - due, - retries, - waitRetry, - cfg.User, - cfg.SendEMail, - serialized, - ); err != nil { - return 0, err - } + ret, found := cfg.Attributes.Int("retries") + var retries *int + if found { + retries = &ret + } + + dur, found := cfg.Attributes.Duration("wait-retry") + var waitRetry *time.Duration + if found { + waitRetry = &dur + } - return jobID, nil + var jobID int64 + if jobID, err = AddJob( + kind, + due, + retries, + waitRetry, + cfg.User, + cfg.SendEMail, + serialized, + ); err != nil { + return 0, err + } + + return jobID, nil + */ + return 0, nil }
--- a/pkg/models/importbase.go Fri Jan 25 14:33:03 2019 +0100 +++ b/pkg/models/importbase.go Fri Jan 25 16:07:09 2019 +0100 @@ -83,6 +83,34 @@ return nil } +func (et *EmailType) MarshalAttributes(attrs common.Attributes) error { + if et.Email { + attrs.SetBool("email", et.Email) + } + return nil +} + +func (et *EmailType) UnmarshalAttributes(attrs common.Attributes) error { + et.Email = attrs.Bool("email") + return nil +} + +func (qct *QueueConfigurationType) MarshalAttributes(attrs common.Attributes) error { + if err := qct.EmailType.MarshalAttributes(attrs); err != nil { + return err + } + if qct.Trys != nil { + attrs.SetInt("trys", *qct.Trys) + } + if qct.WaitRetry != nil { + attrs.SetDuration("wait-retry", *qct.WaitRetry) + } + if qct.Due != nil { + attrs.SetTime("due", *qct.Due) + } + return nil +} + func (ut *URLType) MarshalAttributes(attrs common.Attributes) error { attrs.Set("url", ut.URL) if ut.Insecure {
--- a/schema/gemma.sql Fri Jan 25 14:33:03 2019 +0100 +++ b/schema/gemma.sql Fri Jan 25 16:07:09 2019 +0100 @@ -630,10 +630,7 @@ REFERENCES internal.user_profiles(username) ON DELETE CASCADE ON UPDATE CASCADE, - kind varchar NOT NULL, - send_email boolean NOT NULL DEFAULT false, - cron varchar, - url varchar + kind varchar NOT NULL ) CREATE TABLE import_configuration_attributes (