diff --git a/actorset.go b/actorset.go index c43fdbc..f3f2bd5 100644 --- a/actorset.go +++ b/actorset.go @@ -35,6 +35,8 @@ type actorInfo struct { // incremented automatically for each actor that is added. It is never // less than one. order int + // initial is true if this actor was added as an argument to [Run]. + initial bool } // actorSetIface holds only the add/del/clear methods of actorSet. @@ -60,23 +62,12 @@ func (sets *actorSets) All() iter.Seq[actorSetIface] { // add adds an actor under the given parent context. This is a write operation. func (this *actorSets) add(ctx context.Context, actor Actor) { - if this.inf == nil { this.inf = make(map[Actor] actorInfo)} - actorCtx, done := context.WithCancel(ctx) - this.nextOrder ++ - info := actorInfo { - ctx: actorCtx, - done: done, - order: this.nextOrder, - } - _, isRunnable := actor.(Runnable) - _, isRunShutdownable := actor.(RunShutdownable) - if isRunnable || isRunShutdownable { - info.stopped = make(chan struct { }) - } - this.inf[actor] = info - for set := range this.All() { - set.add(actor) - } + this.addInternal(actorInfo { }, ctx, actor) +} + +// addInitial is like add, but marks the actor as initial. This is a write operation. +func (this *actorSets) addInitial(ctx context.Context, actor Actor) { + this.addInternal(actorInfo { initial: true }, ctx, actor) } // del removes an actor. This is a write operation. @@ -101,6 +92,24 @@ func (this *actorSets) info(actor Actor) actorInfo { return this.inf[actor] } +func (this *actorSets) addInternal(inf actorInfo, ctx context.Context, actor Actor) { + if this.inf == nil { this.inf = make(map[Actor] actorInfo)} + actorCtx, done := context.WithCancel(ctx) + this.nextOrder ++ + inf.ctx = actorCtx + inf.done = done + inf.order = this.nextOrder + _, isRunnable := actor.(Runnable) + _, isRunShutdownable := actor.(RunShutdownable) + if isRunnable || isRunShutdownable { + inf.stopped = make(chan struct { }) + } + this.inf[actor] = inf + for set := range this.All() { + set.add(actor) + } +} + // sortActors sorts actors according to the order in which they were added. func sortActors[T comparable] (sets *actorSets, actors []T) []T { slices.SortFunc(actors, func (left, right T) int { diff --git a/environment.go b/environment.go index 0f6db1a..c9ef5d2 100644 --- a/environment.go +++ b/environment.go @@ -8,6 +8,7 @@ import "sync" import "time" import "errors" import "context" +import "runtime" import "sync/atomic" import "golang.org/x/sync/errgroup" import "git.tebibyte.media/sashakoshka/go-util/sync" @@ -23,6 +24,7 @@ const defaultCleanupTimeout = 1 * time.Minute const defaultTrimInterval = 1 * time.Minute const defaultTrimTimeout = 1 * time.Minute const defaultShutdownTimeout = 8 * time.Minute +const defaultSigintTimeout = 16 * time.Minute // environment is an object which handles requests by package-level functions. // It is only a separate object for testing purposes. @@ -67,6 +69,7 @@ type environment struct { cleanupTimeout atomicDuration trimTimeout atomicDuration shutdownTimeout atomicDuration + sigintTimeout atomicDuration } } @@ -79,7 +82,7 @@ func (this *environment) Run(name, description string, actors ...Actor) { this.ctx, this.done = context.WithCancelCause(context.Background()) defer this.done(nil) - daemon.OnSigint(func() { this.done(ErrProcessKilled) }) + daemon.OnSigint(this.handleSigint) defer log.SetOutput(os.Stderr) this.name = name @@ -88,8 +91,8 @@ func (this *environment) Run(name, description string, actors ...Actor) { this.cron = &cron { trimFunc: this.phase70_5Trimming, } - this.addToSets(actors...) - this.addToSets(this.cron) + this.addToSetsInitial(actors...) + this.addToSetsInitial(this.cron) if !this.phase10FlagParsing() { os.Exit(2) } if !this.phase13PidFileCreation() { os.Exit(1) } @@ -218,6 +221,15 @@ func (this *environment) addToSets(actors ...Actor) { } } +// addToSetsInitial adds the actors to the actorSets in a thread-safe manner. +func (this *environment) addToSetsInitial(actors ...Actor) { + thisActors, done := this.actors.Borrow() + defer done() + for _, actor := range actors { + thisActors.addInitial(this.ctx, actor) + } +} + // delFromSets deletes the actors from the actorSets in a thread-safe manner. func (this *environment) delFromSets(actors ...Actor) { thisActors, done := this.actors.Borrow() @@ -482,14 +494,37 @@ func (this *environment) applyConfig() error { if err != nil { return err } err = parseDuration("shutdown-timeout", &this.timing.shutdownTimeout) if err != nil { return err } + err = parseDuration("sigint-timeout", &this.timing.sigintTimeout) + if err != nil { return err } if this.flags.fastTiming { this.timing.shutdownTimeout.Store(time.Second * 10) + this.timing.sigintTimeout.Store(time.Second * 12) } return nil } +// handleSigint is called when the program receives a SIGINT signal. +func (this *environment) handleSigint() { + this.done(ErrProcessKilled) + go func() { + time.Sleep(defaul(this.timing.sigintTimeout.Load(), defaultSigintTimeout)) + this.emergencyHalt("final SIGNINT deadline expired") + }() +} + +func (this *environment) emergencyHalt(reason string) { + log.Printf("XXX (80) %s, performing emergency halt", reason) + if Verb() || this.flags.crashOnError { + dumpBuffer := make([]byte, 8192) + runtime.Stack(dumpBuffer, true) + log.Printf("XXX (80) stack trace of all goroutines:\n%s", dumpBuffer) + } + log.Printf("====== [%s] END =======", this.name) + os.Exit(1) +} + type runShutdownableShim struct { underlying RunShutdownable shutdownTimeout time.Duration diff --git a/phases.go b/phases.go index 2091fe7..8e3ffa5 100644 --- a/phases.go +++ b/phases.go @@ -199,6 +199,8 @@ func (this *environment) phase50ConfigurationApplication() bool { actors, done := this.actors.RBorrow() defer done() for _, actor := range sortActors(actors, actors.configurable.all()) { + if !actors.info(actor.(Actor)).initial { continue } + if this.Verb() { log.Println ("... (50) applying configuration to %s", actor.(Actor).Type())} err := actor.Configure(this.conf) if err != nil { log.Printf ( @@ -213,13 +215,24 @@ func (this *environment) phase50ConfigurationApplication() bool { func (this *environment) phase60Initialization() bool { if this.Verb() { log.Println("... (60) initializing") } + // this fucking sucks in sorry var initializable []Initializable func() { actors, done := this.actors.RBorrow() defer done() initializable = actors.initializable.all() }() - if err := this.initializeActors(this.ctx, initializable...); err != nil { + // filter out non-initial actors + initializableInitial := initializable + index := 0 + for _, actor := range initializable { + if !this.info(actor.(Actor)).initial { continue } + initializableInitial = initializable[:index + 1] + initializableInitial[index] = actor + index ++ + } + + if err := this.initializeActors(this.ctx, initializableInitial...); err != nil { log.Println("XXX (60) failed to initialize:", err) return false } @@ -264,9 +277,11 @@ func (this *environment) phase70RunningBody() bool { actors, done := this.actors.RBorrow() defer done() for _, actor := range actors.runnable.all() { + if !actors.info(actor.(Actor)).initial { continue } this.start(actor.(Actor)) } for _, actor := range actors.runShutdownable.all() { + if !actors.info(actor.(Actor)).initial { continue } this.start(actor.(Actor)) } @@ -330,14 +345,7 @@ func (this *environment) phase80Shutdown() bool { go func() { <- ctx.Done() if errors.Is(context.Cause(ctx), context.DeadlineExceeded) { - log.Println("XXX (80) shutdown timeout expired, performing emergency halt") - if Verb() || this.flags.crashOnError { - dumpBuffer := make([]byte, 8192) - runtime.Stack(dumpBuffer, true) - log.Printf("XXX (80) stack trace of all goroutines:\n%s", dumpBuffer) - } - log.Printf("====== [%s] END =======", this.name) - os.Exit(1) + this.emergencyHalt("shutdown timeout expired") } }()