// Package routines provides utilities for managing long-running goroutines. package routines import "io" import "fmt" import "log" import "time" import "sync" import "errors" type routine struct { run, shutdown func () error } func (routine routine) Run () error { if routine.run == nil { return nil } else { return routine.run() } } func (routine routine) Shutdown () error { if routine.shutdown == nil { return nil } else { return routine.shutdown() } } // From creates a routine from a separate run and shutdown function. func From (run, shutdown func () error) Routine { return routine { run: run, shutdown: shutdown, } } // Routine is an object that can be run and stopped. type Routine interface { // Run is a long-running function that does not return until it is // finished. An error is returned if the routine exited due to an error. Run () error // Shutdown stops Run. Shutdown () error } // Manager is a system capable of managing multiple routines, and restarting // them if they fail. type Manager struct { // Routines specifies a list of routines to manage. These are started // when Run() is called. Routines []Routine // RestartDeadline specifies the amount of time a routine has to be // running before failing to be restarted. This is to prevent routines // that immediately fail from just being restarted over and over again. RestartDeadline time.Duration // Logger, if non-nil, is where log messages will be written to. If it // is nil, messages will be written to the standard logger. To disable // logging altogether, this can be set to io.Discard. Logger io.Writer stoppingMutex sync.Mutex stopping bool } // Run spawns all routines in the Routines slice. If a routine exits with an // error and it was running for longer than RestartDeadline, it is restarted. // Run returns only when all routines have exited. func (manager *Manager) Run () error { var waitGroup sync.WaitGroup for _, routine := range manager.Routines { if routine != nil { waitGroup.Add(1) go manager.runRoutine(routine, &waitGroup) } } waitGroup.Wait() return nil } // Shutdown shuts down all routines in the manager. func (manager *Manager) Shutdown () (err error) { manager.stoppingMutex.Lock() manager.stopping = true manager.stoppingMutex.Unlock() for _, routine := range manager.Routines { routineErr := routine.Shutdown() if routineErr != nil { err = routineErr } } return } // Append adds one or more routines to the Routines slice. This has no effect if // the manager is already running. func (manager *Manager) Append (routines ...Routine) { manager.Routines = append(manager.Routines, routines...) } func (manager *Manager) log (message ...any) { if manager.Logger == nil { log.Println(message...) } else { fmt.Fprintln(manager.Logger, message...) } } func (manager *Manager) runRoutine (routine Routine, group *sync.WaitGroup) { defer group.Done() for { lastStart := time.Now() err := panicWrap(routine.Run) stopping := false manager.stoppingMutex.Lock() stopping = manager.stopping manager.stoppingMutex.Unlock() if stopping { if err == nil { manager.log("(i) stopped routine") } else { manager.log("!!! stopped routine, with error:", err) } break } if err == nil { manager.log("(i) routine exited") break } else { manager.log("XXX routine failed:", err) } if time.Since(lastStart) < manager.RestartDeadline { manager.log("!!! not restarting routine, failed too soon") break } else { manager.log("(i) routine is being restarted") } } } func panicWrap (f func () error) (err error) { defer func () { if pan := recover(); pan != nil { err = errors.New(fmt.Sprint(pan)) } } () err = f() return }