# HG changeset patch # User Sascha L. Teichmann # Date 1545327559 -3600 # Node ID 85386ad17d34e4409d592cc78ec856f5220a01f9 # Parent 06f08458d66689144802500c0c97d59b856d8c4c Scheduled imports: Don't track the user in the running scheduler. The user is stored in the database already. diff -r 06f08458d666 -r 85386ad17d34 pkg/controllers/importconfig.go --- a/pkg/controllers/importconfig.go Thu Dec 20 18:14:28 2018 +0100 +++ b/pkg/controllers/importconfig.go Thu Dec 20 18:39:19 2018 +0100 @@ -149,7 +149,6 @@ if err = scheduler.BindAction( string(importConfig.Kind), cron.String, - session.User, id, ); err != nil { return @@ -321,7 +320,6 @@ if err = scheduler.BindAction( string(importConfig.Kind), string(*importConfig.Cron), - session.User, id, ); err != nil { return diff -r 06f08458d666 -r 85386ad17d34 pkg/controllers/user.go --- a/pkg/controllers/user.go Thu Dec 20 18:14:28 2018 +0100 +++ b/pkg/controllers/user.go Thu Dec 20 18:39:19 2018 +0100 @@ -17,6 +17,7 @@ import ( "bytes" + "context" "database/sql" "fmt" "log" @@ -71,6 +72,10 @@ ST_XMax(map_extent), ST_YMax(map_extent) FROM users.list_users WHERE username = $1` + + scheduledIDsSQL = ` +SELECT id from waterway.import_configuration +WHERE username = $1` ) var ( @@ -96,6 +101,30 @@ (inkluding import errors) and details on the concerned import.`)) ) +func scheduledIDs( + ctx context.Context, + conn *sql.Conn, + user string, +) (map[int64]struct{}, error) { + ids := map[int64]struct{}{} + rows, err := conn.QueryContext(ctx, scheduledIDsSQL, user) + if err != nil { + return nil, nil + } + defer rows.Close() + for rows.Next() { + var id int64 + if err := rows.Scan(&id); err != nil { + return nil, err + } + ids[id] = struct{}{} + } + if err := rows.Err(); err != nil { + return nil, err + } + return ids, nil +} + func deleteUser( _ interface{}, req *http.Request, db *sql.Conn, @@ -113,6 +142,16 @@ return } + // Remove scheduled tasks. + ids, err2 := scheduledIDs(req.Context(), db, user) + if err2 == nil { + if len(ids) > 0 { + go func() { scheduler.UnbindByIDs(ids) }() + } + } else { + log.Printf("error: %v\n", err2) + } + var res sql.Result if res, err = db.ExecContext(req.Context(), deleteUserSQL, user); err != nil { @@ -129,7 +168,6 @@ // Running in a go routine should not be necessary. go func() { auth.Sessions.Logout(user) }() - go func() { scheduler.UnbindUser(user) }() jr = JSONResult{Code: http.StatusNoContent} return @@ -207,7 +245,6 @@ if user != newUser.User { // Running in a go routine should not be necessary. go func() { auth.Sessions.Logout(string(user)) }() - go func() { scheduler.UnbindUser(string(user)) }() } jr = JSONResult{ diff -r 06f08458d666 -r 85386ad17d34 pkg/imports/gmsched.go --- a/pkg/imports/gmsched.go Thu Dec 20 18:14:28 2018 +0100 +++ b/pkg/imports/gmsched.go Thu Dec 20 18:39:19 2018 +0100 @@ -23,7 +23,7 @@ scheduler.RegisterAction("gm", scheduledGM) } -func scheduledGM(user string, id int64) { +func scheduledGM(id int64) { log.Println("info: scheduled GM import") cfg, err := loadIDConfig(id) if err != nil { diff -r 06f08458d666 -r 85386ad17d34 pkg/scheduler/boot.go --- a/pkg/scheduler/boot.go Thu Dec 20 18:14:28 2018 +0100 +++ b/pkg/scheduler/boot.go Thu Dec 20 18:39:19 2018 +0100 @@ -57,7 +57,6 @@ var id int64 if err = rows.Scan( &id, - &ba.User, &ba.Name, &ba.Spec, ); err != nil { diff -r 06f08458d666 -r 85386ad17d34 pkg/scheduler/scheduler.go --- a/pkg/scheduler/scheduler.go Thu Dec 20 18:14:28 2018 +0100 +++ b/pkg/scheduler/scheduler.go Thu Dec 20 18:39:19 2018 +0100 @@ -24,12 +24,11 @@ // ErrNoSuchAction if no fitting action was found. var ErrNoSuchAction = errors.New("No such action") -// Action is called with a user and a configuration id. -type Action func(user string, cfgID int64) +// Action is called with a configuration id. +type Action func(cfgID int64) type userAction struct { scheduler *scheduler - user string name string cfgID int64 } @@ -43,7 +42,7 @@ // Run implements cron.Job. func (ua *userAction) Run() { if a := ua.scheduler.action(ua.name); a != nil { - a(ua.user, ua.cfgID) + a(ua.cfgID) } else { log.Printf("warn: scheduled action '%s' not found.", ua.name) } @@ -65,12 +64,11 @@ } // BoundAction is a complete set of infos for -// an action to be bound to a user, schedule and +// an action to be bound to a schedule and // configuration id. type BoundAction struct { Name string Spec string - User string CfgID int64 } @@ -99,8 +97,6 @@ } job := &userAction{ scheduler: s, - user: ba.User, - name: ba.Name, cfgID: ba.CfgID, } cr.Schedule(schedule, job) @@ -116,16 +112,16 @@ return nil } -// BindAction binds a named action to a user, a cron spec and +// BindAction binds a named action to a cron spec and // a configuration id. -func BindAction(name, spec, user string, cfgID int64) error { - return global.bindAction(name, spec, user, cfgID) +func BindAction(name, spec string, cfgID int64) error { + return global.bindAction(name, spec, cfgID) } // UnbindAction unbinds a named action from a user and // a configuration id. -func UnbindAction(name, user string, cfgID int64) { - global.unbindAction(name, user, cfgID) +func UnbindAction(name string, cfgID int64) { + global.unbindAction(name, cfgID) } // UnbindByID unbinds all schedules with a given id. @@ -134,8 +130,8 @@ } // UnbindUser unbinds all schedules for a given user. -func UnbindUser(user string) { - global.unbindUser(user) +func UnbindByIDs(ids map[int64]struct{}) { + global.unbindByIDs(ids) } // HasAction asks if there is an action with a given name. @@ -149,7 +145,7 @@ return s.actions[name] != nil } -func (s *scheduler) unbindUser(user string) { +func (s *scheduler) unbindByIDs(ids map[int64]struct{}) { s.mu.Lock() defer s.mu.Unlock() @@ -162,8 +158,7 @@ var found bool for _, entry := range entries { ua := entry.Job.(*userAction) - if ua.user == user { - found = true + if _, found = ids[ua.cfgID]; found { break } } @@ -175,7 +170,7 @@ s.cr = cron.New() for _, entry := range entries { ua := entry.Job.(*userAction) - if ua.user != user { + if _, found := ids[ua.cfgID]; !found { s.cr.Schedule(entry.Schedule, entry.Job) } } @@ -212,7 +207,7 @@ s.cr.Start() } -func (s *scheduler) unbindAction(name, user string, cfgID int64) { +func (s *scheduler) unbindAction(name string, cfgID int64) { s.mu.Lock() defer s.mu.Unlock() @@ -221,8 +216,8 @@ var found *userAction for _, entry := range entries { ua := entry.Job.(*userAction) - if ua.name == name && ua.user == user && cfgID == ua.cfgID { - // Already have such a user/action/cfg tuple -> re-schedule. + if ua.name == name && cfgID == ua.cfgID { + // Already have such a action/cfg tuple -> re-schedule. found = ua break } @@ -243,7 +238,7 @@ s.cr.Start() } -func (s *scheduler) bindAction(name, spec, user string, cfgID int64) error { +func (s *scheduler) bindAction(name, spec string, cfgID int64) error { schedule, err := cron.Parse(spec) if err != nil { @@ -258,7 +253,7 @@ var found *userAction for _, entry := range entries { ua := entry.Job.(*userAction) - if ua.name == name && ua.user == user && cfgID == ua.cfgID { + if ua.name == name && cfgID == ua.cfgID { // Already have such a user/action/cfg tuple -> re-schedule. found = ua break @@ -267,7 +262,7 @@ if found == nil { // Add to current plan. - job := &userAction{scheduler: s, user: user, name: name, cfgID: cfgID} + job := &userAction{scheduler: s, name: name, cfgID: cfgID} s.cr.Schedule(schedule, job) } else { // If found re-build all.