From 326db33ecc845dd3c8f0eaffab5d5ab2b7e03565 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Thu, 30 Jan 2025 19:29:23 -0500 Subject: [PATCH] Environment can now run RunShutdownable actors --- environment.go | 58 ++++++++++++++++++++++++++++++++++++++++++-------- 1 file changed, 49 insertions(+), 9 deletions(-) diff --git a/environment.go b/environment.go index 86f59b3..cb5688c 100644 --- a/environment.go +++ b/environment.go @@ -122,7 +122,9 @@ func (this *environment) Add(ctx context.Context, actors ...Actor) error { } } for _, actor := range actors { - if actor, ok := actor.(Runnable); ok { + _, isRunnable := actor.(Runnable) + _, isRunShutdownable := actor.(RunShutdownable) + if isRunnable || isRunShutdownable { this.start(actor) } } @@ -213,7 +215,7 @@ func (this *environment) info(actor Actor) actorInfo { // start increments the wait group by one and starts the given actor in the // background, restarting it if it fails. this function will exit immediately. // see the documentation for run for details. -func (this *environment) start(actor Runnable) { +func (this *environment) start(actor Actor) { this.group.Add(1) go this.run(actor) } @@ -223,14 +225,12 @@ func (this *environment) start(actor Runnable) { // environment once this function exits, and the environment's wait group // counter will be decremented. note that this function will never increment the // wait group counter, so start should usually be used instead. -func (this *environment) run(actor Runnable) { +func (this *environment) run(actor Actor) { // clean up when done defer this.group.Done() // logging - acto, ok := actor.(Actor) - if !ok { return } - typ := acto.Type() + typ := actor.Type() if this.Verb() { log.Printf("(i) [%s] running", typ) } var stopErr error var exited bool @@ -247,10 +247,26 @@ func (this *environment) run(actor Runnable) { }() // contains context information - info := this.info(acto) + info := this.info(actor) ctx := info.ctx defer close(info.stopped) + + switch actor := actor.(type) { + case Runnable: + stopErr, exited = this.runRunnable(ctx, actor) + case RunShutdownable: + stopErr, exited = this.runRunnable(ctx, &runShutdownableShim { + shutdownTimeout: defaul(this.timing.shutdownTimeout.Load(), defaultShutdownTimeout), + underlying: actor, + }) + default: + panic("actor was neither Runnable or RunShutdownable") + } +} +// runRunnable runs an actor implementing [Runnable]. this should only be called +// from within [environment.run]. +func (this *environment) runRunnable(ctx context.Context, actor Runnable) (stopErr error, exited bool) { // timing restartThreshold := defaul(this.timing.restartThreshold.Load(), defaultRestartThreshold) restartInitialInterval := defaul(this.timing.restartInitialInterval.Load(), defaultRestartInitialInterval) @@ -259,11 +275,14 @@ func (this *environment) run(actor Runnable) { resetTimeout := defaul(this.timing.resetTimeout.Load(), defaultResetTimeout) restartInterval := restartInitialInterval - // main loop + acto, ok := actor.(Actor) + if !ok { return } + typ := acto.Type() + for { // run actor lastStart := time.Now() - err := panicWrap(ctx, actor.Run) + err := panicWrapCtx(ctx, actor.Run) // detect context cancellation if ctxErr := ctx.Err(); ctxErr != nil { @@ -387,3 +406,24 @@ func (this *environment) applyConfig() error { if err != nil { return err } return nil } + +type runShutdownableShim struct { + underlying RunShutdownable + shutdownTimeout time.Duration +} + +func (this *runShutdownableShim) Type() string { + return this.underlying.(Actor).Type() +} + +func (this *runShutdownableShim) Run(ctx context.Context) error { + ctx, done := context.WithCancel(ctx) + defer done() + go func() { + <- ctx.Done() + shutdownCtx, done := context.WithTimeout(context.Background(), this.shutdownTimeout) + defer done() + this.underlying.Shutdown(shutdownCtx) + }() + return this.underlying.Run() +}