Initial commit

This commit is contained in:
Sasha Koshka 2024-10-30 18:09:43 -04:00
parent 9e2108838a
commit 57e6f63124

View File

@ -7,43 +7,23 @@ import "log"
import "time" import "time"
import "sync" import "sync"
import "errors" import "errors"
import "context"
type routine struct { // Func is a routine created from a run function.
run, shutdown func () error type Func func (context.Context) error
// Run runs the routine.
func (fun Func) Run (ctx context.Context) error {
return fun(ctx)
} }
func (routine routine) Run () error {
if routine.run == nil {
return nil
} else {
return routine.run()
}
}
func (routine routine) Shutdown () error {
if routine.shutdown == nil {
return nil
} else {
return routine.shutdown()
}
}
// From creates a routine from a separate run and shutdown function.
func From (run, shutdown func () error) Routine {
return routine {
run: run,
shutdown: shutdown,
}
}
// Routine is an object that can be run and stopped. // Routine is an object that can be run and stopped.
type Routine interface { type Routine interface {
// Run is a long-running function that does not return until it is // Run is a long-running function that does not return until it is
// finished. An error is returned if the routine exited due to an error. // finished, or its context is cancelled. If the context is cancelled,
Run () error // the function must perform necessary cleanup/shutdown operations and
// exit. An error is returned if the routine exited due to an error.
// Shutdown stops Run. Run (context.Context) error
Shutdown () error
} }
// Manager is a system capable of managing multiple routines, and restarting // Manager is a system capable of managing multiple routines, and restarting
@ -63,99 +43,82 @@ type Manager struct {
// logging altogether, this can be set to io.Discard. // logging altogether, this can be set to io.Discard.
Logger io.Writer Logger io.Writer
stoppingMutex sync.Mutex ctx context.Context
stopping bool
} }
// Run spawns all routines in the Routines slice. If a routine exits with an // Run spawns all routines in the Routines slice. If a routine exits with an
// error and it was running for longer than RestartDeadline, it is restarted. // error and it was running for longer than RestartDeadline, it is restarted.
// Run returns only when all routines have exited. // Run returns only when all routines have exited.
func (manager *Manager) Run () error { func (this *Manager) Run (ctx context.Context) error {
ctx, done := context.WithCancel(ctx)
this.ctx = ctx
var waitGroup sync.WaitGroup var waitGroup sync.WaitGroup
for _, routine := range this.Routines {
for _, routine := range manager.Routines {
if routine != nil { if routine != nil {
waitGroup.Add(1) waitGroup.Add(1)
go manager.runRoutine(routine, &waitGroup) go this.runRoutine(routine, &waitGroup)
} }
} }
waitGroup.Wait() waitGroup.Wait()
done()
return nil return nil
} }
// Shutdown shuts down all routines in the manager.
func (manager *Manager) Shutdown () (err error) {
manager.stoppingMutex.Lock()
manager.stopping = true
manager.stoppingMutex.Unlock()
for _, routine := range manager.Routines {
routineErr := routine.Shutdown()
if routineErr != nil {
err = routineErr
}
}
return
}
// Append adds one or more routines to the Routines slice. This has no effect if // Append adds one or more routines to the Routines slice. This has no effect if
// the manager is already running. // the manager is already running.
func (manager *Manager) Append (routines ...Routine) { func (this *Manager) Append (routines ...Routine) {
manager.Routines = append(manager.Routines, routines...) this.Routines = append(this.Routines, routines...)
} }
func (manager *Manager) log (message ...any) { func (this *Manager) log (message ...any) {
if manager.Logger == nil { if this.Logger == nil {
log.Println(message...) log.Println(message...)
} else { } else {
fmt.Fprintln(manager.Logger, message...) fmt.Fprintln(this.Logger, message...)
} }
} }
func (manager *Manager) runRoutine (routine Routine, group *sync.WaitGroup) { func (this *Manager) runRoutine (routine Routine, group *sync.WaitGroup) {
defer group.Done() defer group.Done()
for { for {
lastStart := time.Now() lastStart := time.Now()
err := panicWrap(routine.Run) err := panicWrap(routine.Run, this.ctx)
stopping := false if ctxErr := this.ctx.Err(); ctxErr != nil {
manager.stoppingMutex.Lock()
stopping = manager.stopping
manager.stoppingMutex.Unlock()
if stopping {
if err == nil { if err == nil {
manager.log("(i) stopped routine") this.log("(i) stopped routine")
} else { } else {
manager.log("!!! stopped routine, with error:", err) this.log("!!! stopped routine, with error:", err)
} }
break break
} }
if err == nil { if err == nil {
manager.log("(i) routine exited") this.log("(i) routine exited")
break break
} else { } else {
manager.log("XXX routine failed:", err) this.log("XXX routine failed:", err)
} }
if time.Since(lastStart) < manager.RestartDeadline { if time.Since(lastStart) < this.RestartDeadline {
manager.log("!!! not restarting routine, failed too soon") this.log("!!! not restarting routine, failed too soon")
break break
} else { } else {
manager.log("(i) routine is being restarted") this.log("(i) routine is being restarted")
} }
} }
} }
func panicWrap (f func () error) (err error) { func panicWrap (f func (context.Context) error, ctx context.Context) (err error) {
defer func () { defer func () {
if pan := recover(); pan != nil { if pan := recover(); pan != nil {
err = errors.New(fmt.Sprint(pan)) err = errors.New(fmt.Sprint(pan))
} }
} () } ()
err = f() err = f(ctx)
return return
} }