routines: Add system for routine initialization

This commit is contained in:
Sasha Koshka 2024-11-03 17:20:58 -05:00
parent 57e6f63124
commit 816c3e1fc3

View File

@ -26,6 +26,16 @@ type Routine interface {
Run (context.Context) error Run (context.Context) error
} }
// InitRoutine is a routine that has to be initialized before it can be run.
type InitRoutine interface {
Routine
// Init is an initialization function that is called before any routines
// are run, including this one. Routines must not depend on eachother in
// this function.
Init (context.Context) error
}
// Manager is a system capable of managing multiple routines, and restarting // Manager is a system capable of managing multiple routines, and restarting
// them if they fail. // them if they fail.
type Manager struct { type Manager struct {
@ -42,8 +52,12 @@ type Manager struct {
// is nil, messages will be written to the standard logger. To disable // is nil, messages will be written to the standard logger. To disable
// logging altogether, this can be set to io.Discard. // logging altogether, this can be set to io.Discard.
Logger io.Writer Logger io.Writer
loggerLock sync.Mutex
ctx context.Context ctx context.Context
failed map[Routine] struct { }
failedLock sync.Mutex
} }
// Run spawns all routines in the Routines slice. If a routine exits with an // Run spawns all routines in the Routines slice. If a routine exits with an
@ -52,14 +66,24 @@ type Manager struct {
func (this *Manager) Run (ctx context.Context) error { func (this *Manager) Run (ctx context.Context) error {
ctx, done := context.WithCancel(ctx) ctx, done := context.WithCancel(ctx)
this.ctx = ctx this.ctx = ctx
this.failed = make(map[Routine] struct { })
var waitGroup sync.WaitGroup var waitGroup sync.WaitGroup
for _, routine := range this.Routines { for _, routine := range this.Routines {
if routine != nil { if routine != nil {
waitGroup.Add(1)
go this.initRoutine(routine, &waitGroup)
}
}
waitGroup.Wait()
for _, routine := range this.Routines {
if _, failed := this.failed[routine]; !failed {
waitGroup.Add(1) waitGroup.Add(1)
go this.runRoutine(routine, &waitGroup) go this.runRoutine(routine, &waitGroup)
} }
} }
this.failed = nil
waitGroup.Wait() waitGroup.Wait()
done() done()
@ -73,6 +97,8 @@ func (this *Manager) Append (routines ...Routine) {
} }
func (this *Manager) log (message ...any) { func (this *Manager) log (message ...any) {
this.loggerLock.Lock()
defer this.loggerLock.Unlock()
if this.Logger == nil { if this.Logger == nil {
log.Println(message...) log.Println(message...)
} else { } else {
@ -80,6 +106,20 @@ func (this *Manager) log (message ...any) {
} }
} }
func (this *Manager) initRoutine (routine Routine, group *sync.WaitGroup) {
defer group.Done()
if initRoutine, ok := routine.(InitRoutine); ok {
err := initRoutine.Init(this.ctx)
if err != nil {
this.log("XXX routine failed to initialize:", err)
this.failedLock.Lock()
defer this.failedLock.Unlock()
this.failed[routine] = struct { } { }
}
}
}
func (this *Manager) runRoutine (routine Routine, group *sync.WaitGroup) { func (this *Manager) runRoutine (routine Routine, group *sync.WaitGroup) {
defer group.Done() defer group.Done()