Fix actors added during initialization process being run twice

Also broke out the emergency halt process into a new function,
which is also now ALWAYS called 16 minutes after sigint has been
pressed, regardless if the shutdown even began in the first place.
This commit is contained in:
Sasha Koshka 2025-11-26 11:46:53 -05:00
parent 5d25b3fb9a
commit a58b5932e7
3 changed files with 81 additions and 29 deletions

View File

@ -35,6 +35,8 @@ type actorInfo struct {
// incremented automatically for each actor that is added. It is never
// less than one.
order int
// initial is true if this actor was added as an argument to [Run].
initial bool
}
// actorSetIface holds only the add/del/clear methods of actorSet.
@ -60,23 +62,12 @@ func (sets *actorSets) All() iter.Seq[actorSetIface] {
// add adds an actor under the given parent context. This is a write operation.
func (this *actorSets) add(ctx context.Context, actor Actor) {
if this.inf == nil { this.inf = make(map[Actor] actorInfo)}
actorCtx, done := context.WithCancel(ctx)
this.nextOrder ++
info := actorInfo {
ctx: actorCtx,
done: done,
order: this.nextOrder,
}
_, isRunnable := actor.(Runnable)
_, isRunShutdownable := actor.(RunShutdownable)
if isRunnable || isRunShutdownable {
info.stopped = make(chan struct { })
}
this.inf[actor] = info
for set := range this.All() {
set.add(actor)
}
this.addInternal(actorInfo { }, ctx, actor)
}
// addInitial is like add, but marks the actor as initial. This is a write operation.
func (this *actorSets) addInitial(ctx context.Context, actor Actor) {
this.addInternal(actorInfo { initial: true }, ctx, actor)
}
// del removes an actor. This is a write operation.
@ -101,6 +92,24 @@ func (this *actorSets) info(actor Actor) actorInfo {
return this.inf[actor]
}
func (this *actorSets) addInternal(inf actorInfo, ctx context.Context, actor Actor) {
if this.inf == nil { this.inf = make(map[Actor] actorInfo)}
actorCtx, done := context.WithCancel(ctx)
this.nextOrder ++
inf.ctx = actorCtx
inf.done = done
inf.order = this.nextOrder
_, isRunnable := actor.(Runnable)
_, isRunShutdownable := actor.(RunShutdownable)
if isRunnable || isRunShutdownable {
inf.stopped = make(chan struct { })
}
this.inf[actor] = inf
for set := range this.All() {
set.add(actor)
}
}
// sortActors sorts actors according to the order in which they were added.
func sortActors[T comparable] (sets *actorSets, actors []T) []T {
slices.SortFunc(actors, func (left, right T) int {

View File

@ -8,6 +8,7 @@ import "sync"
import "time"
import "errors"
import "context"
import "runtime"
import "sync/atomic"
import "golang.org/x/sync/errgroup"
import "git.tebibyte.media/sashakoshka/go-util/sync"
@ -23,6 +24,7 @@ const defaultCleanupTimeout = 1 * time.Minute
const defaultTrimInterval = 1 * time.Minute
const defaultTrimTimeout = 1 * time.Minute
const defaultShutdownTimeout = 8 * time.Minute
const defaultSigintTimeout = 16 * time.Minute
// environment is an object which handles requests by package-level functions.
// It is only a separate object for testing purposes.
@ -67,6 +69,7 @@ type environment struct {
cleanupTimeout atomicDuration
trimTimeout atomicDuration
shutdownTimeout atomicDuration
sigintTimeout atomicDuration
}
}
@ -79,7 +82,7 @@ func (this *environment) Run(name, description string, actors ...Actor) {
this.ctx, this.done = context.WithCancelCause(context.Background())
defer this.done(nil)
daemon.OnSigint(func() { this.done(ErrProcessKilled) })
daemon.OnSigint(this.handleSigint)
defer log.SetOutput(os.Stderr)
this.name = name
@ -88,8 +91,8 @@ func (this *environment) Run(name, description string, actors ...Actor) {
this.cron = &cron {
trimFunc: this.phase70_5Trimming,
}
this.addToSets(actors...)
this.addToSets(this.cron)
this.addToSetsInitial(actors...)
this.addToSetsInitial(this.cron)
if !this.phase10FlagParsing() { os.Exit(2) }
if !this.phase13PidFileCreation() { os.Exit(1) }
@ -218,6 +221,15 @@ func (this *environment) addToSets(actors ...Actor) {
}
}
// addToSetsInitial adds the actors to the actorSets in a thread-safe manner.
func (this *environment) addToSetsInitial(actors ...Actor) {
thisActors, done := this.actors.Borrow()
defer done()
for _, actor := range actors {
thisActors.addInitial(this.ctx, actor)
}
}
// delFromSets deletes the actors from the actorSets in a thread-safe manner.
func (this *environment) delFromSets(actors ...Actor) {
thisActors, done := this.actors.Borrow()
@ -482,14 +494,37 @@ func (this *environment) applyConfig() error {
if err != nil { return err }
err = parseDuration("shutdown-timeout", &this.timing.shutdownTimeout)
if err != nil { return err }
err = parseDuration("sigint-timeout", &this.timing.sigintTimeout)
if err != nil { return err }
if this.flags.fastTiming {
this.timing.shutdownTimeout.Store(time.Second * 10)
this.timing.sigintTimeout.Store(time.Second * 12)
}
return nil
}
// handleSigint is called when the program receives a SIGINT signal.
func (this *environment) handleSigint() {
this.done(ErrProcessKilled)
go func() {
time.Sleep(defaul(this.timing.sigintTimeout.Load(), defaultSigintTimeout))
this.emergencyHalt("final SIGNINT deadline expired")
}()
}
func (this *environment) emergencyHalt(reason string) {
log.Printf("XXX (80) %s, performing emergency halt", reason)
if Verb() || this.flags.crashOnError {
dumpBuffer := make([]byte, 8192)
runtime.Stack(dumpBuffer, true)
log.Printf("XXX (80) stack trace of all goroutines:\n%s", dumpBuffer)
}
log.Printf("====== [%s] END =======", this.name)
os.Exit(1)
}
type runShutdownableShim struct {
underlying RunShutdownable
shutdownTimeout time.Duration

View File

@ -199,6 +199,8 @@ func (this *environment) phase50ConfigurationApplication() bool {
actors, done := this.actors.RBorrow()
defer done()
for _, actor := range sortActors(actors, actors.configurable.all()) {
if !actors.info(actor.(Actor)).initial { continue }
if this.Verb() { log.Println ("... (50) applying configuration to %s", actor.(Actor).Type())}
err := actor.Configure(this.conf)
if err != nil {
log.Printf (
@ -213,13 +215,24 @@ func (this *environment) phase50ConfigurationApplication() bool {
func (this *environment) phase60Initialization() bool {
if this.Verb() { log.Println("... (60) initializing") }
// this fucking sucks in sorry
var initializable []Initializable
func() {
actors, done := this.actors.RBorrow()
defer done()
initializable = actors.initializable.all()
}()
if err := this.initializeActors(this.ctx, initializable...); err != nil {
// filter out non-initial actors
initializableInitial := initializable
index := 0
for _, actor := range initializable {
if !this.info(actor.(Actor)).initial { continue }
initializableInitial = initializable[:index + 1]
initializableInitial[index] = actor
index ++
}
if err := this.initializeActors(this.ctx, initializableInitial...); err != nil {
log.Println("XXX (60) failed to initialize:", err)
return false
}
@ -264,9 +277,11 @@ func (this *environment) phase70RunningBody() bool {
actors, done := this.actors.RBorrow()
defer done()
for _, actor := range actors.runnable.all() {
if !actors.info(actor.(Actor)).initial { continue }
this.start(actor.(Actor))
}
for _, actor := range actors.runShutdownable.all() {
if !actors.info(actor.(Actor)).initial { continue }
this.start(actor.(Actor))
}
@ -330,14 +345,7 @@ func (this *environment) phase80Shutdown() bool {
go func() {
<- ctx.Done()
if errors.Is(context.Cause(ctx), context.DeadlineExceeded) {
log.Println("XXX (80) shutdown timeout expired, performing emergency halt")
if Verb() || this.flags.crashOnError {
dumpBuffer := make([]byte, 8192)
runtime.Stack(dumpBuffer, true)
log.Printf("XXX (80) stack trace of all goroutines:\n%s", dumpBuffer)
}
log.Printf("====== [%s] END =======", this.name)
os.Exit(1)
this.emergencyHalt("shutdown timeout expired")
}
}()