Mercurial > gemma
diff pkg/imports/scheduled.go @ 2042:d29ac997eb34 unify_imports
This breaks this branch!!!! Starting to remove the old persistent layer for configured imports.
author | Sascha L. Teichmann <sascha.teichmann@intevation.de> |
---|---|
date | Fri, 25 Jan 2019 16:07:09 +0100 |
parents | 8eeb0b5eb340 |
children | 58b77f6b8764 |
line wrap: on
line diff
--- a/pkg/imports/scheduled.go Fri Jan 25 14:33:03 2019 +0100 +++ b/pkg/imports/scheduled.go Fri Jan 25 16:07:09 2019 +0100 @@ -16,165 +16,13 @@ import ( "context" "database/sql" - "errors" "fmt" "log" - "time" - "gemma.intevation.de/gemma/pkg/common" + "gemma.intevation.de/gemma/pkg/models" "gemma.intevation.de/gemma/pkg/scheduler" ) -// JobKindSetups maps JobKinds to special setup functions. -var JobKindSetups = map[JobKind]func(*IDConfig) (interface{}, error){ - - GMJobKind: func(cfg *IDConfig) (interface{}, error) { - log.Println("info: schedule 'gm' import") - insecure := cfg.Attributes.Bool("insecure") - return &GaugeMeasurement{ - URL: *cfg.URL, - Insecure: insecure, - }, nil - }, - - FAJobKind: func(cfg *IDConfig) (interface{}, error) { - log.Println("info: schedule 'fa' import") - insecure := cfg.Attributes.Bool("insecure") - return &FairwayAvailability{ - URL: *cfg.URL, - Insecure: insecure, - }, nil - }, - - BNJobKind: func(cfg *IDConfig) (interface{}, error) { - log.Println("info: schedule 'bn' import") - insecure := cfg.Attributes.Bool("insecure") - return &Bottleneck{ - URL: *cfg.URL, - Insecure: insecure, - }, nil - }, - - WXJobKind: func(cfg *IDConfig) (interface{}, error) { - log.Println("info: schedule 'wx' import") - ft, found := cfg.Attributes.Get("feature-type") - if !found { - return nil, errors.New("cannot find 'feature-type' attribute") - } - sb, found := cfg.Attributes.Get("sort-by") - if !found { - return nil, errors.New("cannot find 'sort-by' attribute") - } - return &WaterwayAxis{ - URL: *cfg.URL, - FeatureType: ft, - SortBy: sb, - }, nil - }, - - WAJobKind: func(cfg *IDConfig) (interface{}, error) { - log.Println("info: schedule 'wa' import") - ft, found := cfg.Attributes.Get("feature-type") - if !found { - return nil, errors.New("cannot find 'feature-type' attribute") - } - sb, found := cfg.Attributes.Get("sort-by") - if !found { - return nil, errors.New("cannot find 'sort-by' attribute") - } - return &WaterwayArea{ - URL: *cfg.URL, - FeatureType: ft, - SortBy: sb, - }, nil - }, - - FDJobKind: func(cfg *IDConfig) (interface{}, error) { - log.Println("info: schedule 'fd' import") - ft, found := cfg.Attributes.Get("feature-type") - if !found { - return nil, errors.New("cannot find 'feature-type' attribute") - } - sb, found := cfg.Attributes.Get("sort-by") - if !found { - return nil, errors.New("cannot find 'sort-by' attribute") - } - los, found := cfg.Attributes.Int("los") - if !found { - return nil, errors.New("cannot find 'los' attribute") - } - minWidth, found := cfg.Attributes.Int("min-width") - if !found { - return nil, errors.New("cannot find 'min-width' attribute") - } - maxWidth, found := cfg.Attributes.Int("max-width") - if !found { - return nil, errors.New("cannot find 'max-width' attribute") - } - depth, found := cfg.Attributes.Int("depth") - if !found { - return nil, errors.New("cannot find 'depth' attribute") - } - sourceOrganization, found := cfg.Attributes.Get("source-organization") - if !found { - return nil, errors.New("cannot find 'source-organization' attribute") - } - - return &FairwayDimension{ - URL: *cfg.URL, - FeatureType: ft, - SortBy: sb, - LOS: los, - MinWidth: minWidth, - MaxWidth: maxWidth, - Depth: depth, - SourceOrganization: sourceOrganization, - }, nil - }, - - WGJobKind: func(cfg *IDConfig) (interface{}, error) { - log.Println("info: schedule 'wg' import") - username, _ := cfg.Attributes.Get("username") - password, _ := cfg.Attributes.Get("password") - insecure := cfg.Attributes.Bool("insecure") - return &WaterwayGauge{ - URL: *cfg.URL, - Username: username, - Password: password, - Insecure: insecure, - }, nil - }, - - DMVJobKind: func(cfg *IDConfig) (interface{}, error) { - log.Println("info: schedule 'dvm' import") - username, _ := cfg.Attributes.Get("username") - password, _ := cfg.Attributes.Get("password") - insecure := cfg.Attributes.Bool("insecure") - return &DistanceMarksVirtual{ - URL: *cfg.URL, - Username: username, - Password: password, - Insecure: insecure, - }, nil - }, - DMAJobKind: func(cfg *IDConfig) (interface{}, error) { - log.Println("info: schedule 'dma' import") - ft, found := cfg.Attributes.Get("feature-type") - if !found { - return nil, errors.New("cannot find 'feature-type' attribute") - } - sb, found := cfg.Attributes.Get("sort-by") - if !found { - return nil, errors.New("cannot find 'sort-by' attribute") - } - return &DistanceMarksAshore{ - URL: *cfg.URL, - FeatureType: ft, - SortBy: sb, - }, nil - }, -} - func init() { run := func(cfgID int64) { jobID, err := RunConfiguredImport(cfgID) @@ -185,78 +33,98 @@ log.Printf("info: added import #%d to queue\n", jobID) } - for kind := range JobKindSetups { + for kind := range kindToImportModel { scheduler.RegisterAction(string(kind), run) } } // RunConfiguredImportContext runs an import configured from the database. func RunConfiguredImportContext(ctx context.Context, conn *sql.Conn, id int64) (int64, error) { - cfg, err := LoadIDConfigContext(ctx, conn, id) + cfg, err := LoadPersistentConfigContext(ctx, conn, id) return runConfiguredImport(id, cfg, err) } // RunConfiguredImport runs an import configured from the database. func RunConfiguredImport(id int64) (int64, error) { - cfg, err := loadIDConfig(id) + cfg, err := loadPersistentConfig(id) return runConfiguredImport(id, cfg, err) } -func runConfiguredImport(id int64, cfg *IDConfig, err error) (int64, error) { +func runConfiguredImport(id int64, cfg *PersistentConfig, err error) (int64, error) { if err != nil { return 0, err } if cfg == nil { - return 0, fmt.Errorf("no config found for id %d.\n", id) - } - if cfg.URL == nil { - return 0, errors.New("error: No URL specified") + return 0, fmt.Errorf("no config found for id %d.", id) } kind := JobKind(cfg.Kind) - setup := JobKindSetups[kind] - if setup == nil { - return 0, fmt.Errorf("unknown job kind: %s", cfg.Kind) + ctor := ImportModelForJobKind(kind) + if ctor == nil { + return 0, fmt.Errorf("no constructor for kind '%s'.", cfg.Kind) } - what, err := setup(cfg) - if err != nil { - return 0, err + what := ctor() + + if gqc, ok := what.(models.QueueConfigurationGetter); ok { + qc := gqc.GetQueueConfiguration() + _ = qc + // TODO: More. } - var serialized string - if serialized, err = common.ToJSONString(what); err != nil { - return 0, err + if ge, ok := what.(models.EmailTypeGetter); ok { + e := ge.GetEmailType() + _ = e + // TODO: More. } - due, _ := cfg.Attributes.Time("due") + /* + + setup := JobKindSetups[kind] + if setup == nil { + return 0, fmt.Errorf("unknown job kind: %s", cfg.Kind) + } - ret, found := cfg.Attributes.Int("retries") - var retries *int - if found { - retries = &ret - } + what, err := setup(cfg) + if err != nil { + return 0, err + } - dur, found := cfg.Attributes.Duration("wait-retry") - var waitRetry *time.Duration - if found { - waitRetry = &dur - } + var serialized string + if serialized, err = common.ToJSONString(what); err != nil { + return 0, err + } + + due, _ := cfg.Attributes.Time("due") - var jobID int64 - if jobID, err = AddJob( - kind, - due, - retries, - waitRetry, - cfg.User, - cfg.SendEMail, - serialized, - ); err != nil { - return 0, err - } + ret, found := cfg.Attributes.Int("retries") + var retries *int + if found { + retries = &ret + } + + dur, found := cfg.Attributes.Duration("wait-retry") + var waitRetry *time.Duration + if found { + waitRetry = &dur + } - return jobID, nil + var jobID int64 + if jobID, err = AddJob( + kind, + due, + retries, + waitRetry, + cfg.User, + cfg.SendEMail, + serialized, + ); err != nil { + return 0, err + } + + return jobID, nil + */ + return 0, nil }