// Package routines provides utilities for managing long-running goroutines. package routines import "io" import "fmt" import "log" import "time" import "sync" import "errors" import "context" // Func is a routine created from a run function. type Func func (context.Context) error // Run runs the routine. func (fun Func) Run (ctx context.Context) error { return fun(ctx) } // Routine is an object that can be run and stopped. type Routine interface { // Run is a long-running function that does not return until it is // finished, or its context is cancelled. If the context is cancelled, // the function must perform necessary cleanup/shutdown operations and // exit. An error is returned if the routine exited due to an error. Run (context.Context) error } // InitRoutine is a routine that has to be initialized before it can be run. type InitRoutine interface { Routine // Init is an initialization function that is called before any routines // are run, including this one. Routines must not depend on eachother in // this function. Init (context.Context) error } // Manager is a system capable of managing multiple routines, and restarting // them if they fail. type Manager struct { // Routines specifies a list of routines to manage. These are started // when Run() is called. Routines []Routine // RestartDeadline specifies the amount of time a routine has to be // running before failing to be restarted. This is to prevent routines // that immediately fail from just being restarted over and over again. // Defaults to 32 seconds if not set. RestartDeadline time.Duration // Logger, if non-nil, is where log messages will be written to. If it // is nil, messages will be written to the standard logger. To disable // logging altogether, this can be set to io.Discard. Logger io.Writer loggerLock sync.Mutex ctx context.Context failed map[Routine] struct { } failedLock sync.Mutex } // Run spawns all routines in the Routines slice. If a routine exits with an // error and it was running for longer than RestartDeadline, it is restarted. // Run returns only when all routines have exited. func (this *Manager) Run (ctx context.Context) error { ctx, done := context.WithCancel(ctx) this.ctx = ctx this.failed = make(map[Routine] struct { }) var waitGroup sync.WaitGroup for _, routine := range this.Routines { if routine != nil { waitGroup.Add(1) go this.initRoutine(routine, &waitGroup) } } waitGroup.Wait() for _, routine := range this.Routines { if _, failed := this.failed[routine]; !failed { waitGroup.Add(1) go this.runRoutine(routine, &waitGroup) } } this.failed = nil waitGroup.Wait() done() return nil } // Append adds one or more routines to the Routines slice. This has no effect if // the manager is already running. func (this *Manager) Append (routines ...Routine) { this.Routines = append(this.Routines, routines...) } func (this *Manager) log (message ...any) { this.loggerLock.Lock() defer this.loggerLock.Unlock() if this.Logger == nil { log.Println(message...) } else { fmt.Fprintln(this.Logger, message...) } } func (this *Manager) initRoutine (routine Routine, group *sync.WaitGroup) { defer group.Done() if initRoutine, ok := routine.(InitRoutine); ok { err := initRoutine.Init(this.ctx) if err != nil { this.log("XXX routine failed to initialize:", err) this.failedLock.Lock() defer this.failedLock.Unlock() this.failed[routine] = struct { } { } } } } func (this *Manager) runRoutine (routine Routine, group *sync.WaitGroup) { defer group.Done() restartDeadline := this.RestartDeadline if restartDeadline == 0 { restartDeadline = 32 * time.Second } for { lastStart := time.Now() err := panicWrap(routine.Run, this.ctx) if ctxErr := this.ctx.Err(); ctxErr != nil { if err == nil { this.log("(i) stopped routine") } else { this.log("!!! stopped routine, with error:", err) } break } if err == nil { this.log("(i) routine exited") break } else { this.log("XXX routine failed:", err) } if time.Since(lastStart) < this.RestartDeadline { this.log("!!! not restarting routine, failed too soon") break } else { this.log("(i) routine is being restarted") } } } func panicWrap (f func (context.Context) error, ctx context.Context) (err error) { defer func () { if pan := recover(); pan != nil { err = errors.New(fmt.Sprint(pan)) } } () err = f(ctx) return }