diff --git a/routines/routines.go b/routines/routines.go index 9d9431a..233ceb7 100644 --- a/routines/routines.go +++ b/routines/routines.go @@ -7,43 +7,23 @@ import "log" import "time" import "sync" import "errors" +import "context" -type routine struct { - run, shutdown func () error +// Func is a routine created from a run function. +type Func func (context.Context) error + +// Run runs the routine. +func (fun Func) Run (ctx context.Context) error { + return fun(ctx) } -func (routine routine) Run () error { - if routine.run == nil { - return nil - } else { - return routine.run() - } -} - -func (routine routine) Shutdown () error { - if routine.shutdown == nil { - return nil - } else { - return routine.shutdown() - } -} - -// From creates a routine from a separate run and shutdown function. -func From (run, shutdown func () error) Routine { - return routine { - run: run, - shutdown: shutdown, - } -} - // Routine is an object that can be run and stopped. type Routine interface { // Run is a long-running function that does not return until it is - // finished. An error is returned if the routine exited due to an error. - Run () error - - // Shutdown stops Run. - Shutdown () error + // finished, or its context is cancelled. If the context is cancelled, + // the function must perform necessary cleanup/shutdown operations and + // exit. An error is returned if the routine exited due to an error. + Run (context.Context) error } // Manager is a system capable of managing multiple routines, and restarting @@ -63,99 +43,82 @@ type Manager struct { // logging altogether, this can be set to io.Discard. Logger io.Writer - stoppingMutex sync.Mutex - stopping bool + ctx context.Context } // Run spawns all routines in the Routines slice. If a routine exits with an // error and it was running for longer than RestartDeadline, it is restarted. -// Run returns only when all routines have exited. -func (manager *Manager) Run () error { +// Run returns only when all routines have exited. +func (this *Manager) Run (ctx context.Context) error { + ctx, done := context.WithCancel(ctx) + this.ctx = ctx + var waitGroup sync.WaitGroup - - for _, routine := range manager.Routines { + for _, routine := range this.Routines { if routine != nil { waitGroup.Add(1) - go manager.runRoutine(routine, &waitGroup) + go this.runRoutine(routine, &waitGroup) } } - waitGroup.Wait() + + done() return nil } -// Shutdown shuts down all routines in the manager. -func (manager *Manager) Shutdown () (err error) { - manager.stoppingMutex.Lock() - manager.stopping = true - manager.stoppingMutex.Unlock() - - for _, routine := range manager.Routines { - routineErr := routine.Shutdown() - if routineErr != nil { - err = routineErr - } - } - return -} - // Append adds one or more routines to the Routines slice. This has no effect if // the manager is already running. -func (manager *Manager) Append (routines ...Routine) { - manager.Routines = append(manager.Routines, routines...) +func (this *Manager) Append (routines ...Routine) { + this.Routines = append(this.Routines, routines...) } -func (manager *Manager) log (message ...any) { - if manager.Logger == nil { +func (this *Manager) log (message ...any) { + if this.Logger == nil { log.Println(message...) } else { - fmt.Fprintln(manager.Logger, message...) + fmt.Fprintln(this.Logger, message...) } } -func (manager *Manager) runRoutine (routine Routine, group *sync.WaitGroup) { +func (this *Manager) runRoutine (routine Routine, group *sync.WaitGroup) { defer group.Done() for { lastStart := time.Now() - err := panicWrap(routine.Run) - - stopping := false - manager.stoppingMutex.Lock() - stopping = manager.stopping - manager.stoppingMutex.Unlock() - if stopping { + err := panicWrap(routine.Run, this.ctx) + + if ctxErr := this.ctx.Err(); ctxErr != nil { if err == nil { - manager.log("(i) stopped routine") + this.log("(i) stopped routine") } else { - manager.log("!!! stopped routine, with error:", err) + this.log("!!! stopped routine, with error:", err) } break } if err == nil { - manager.log("(i) routine exited") + this.log("(i) routine exited") break } else { - manager.log("XXX routine failed:", err) + this.log("XXX routine failed:", err) } - if time.Since(lastStart) < manager.RestartDeadline { - manager.log("!!! not restarting routine, failed too soon") + if time.Since(lastStart) < this.RestartDeadline { + this.log("!!! not restarting routine, failed too soon") break } else { - manager.log("(i) routine is being restarted") + this.log("(i) routine is being restarted") } } } -func panicWrap (f func () error) (err error) { +func panicWrap (f func (context.Context) error, ctx context.Context) (err error) { defer func () { if pan := recover(); pan != nil { err = errors.New(fmt.Sprint(pan)) } } () - err = f() + err = f(ctx) return }