18 Commits
v0.3.4 ... main

Author SHA1 Message Date
a03f6a70a4 actors: Add pre-made HTTP actors 2025-12-29 15:58:33 -05:00
0a4cc1143e Assorted bug fixes 2025-12-29 15:57:39 -05:00
a58b5932e7 Fix actors added during initialization process being run twice
Also broke out the emergency halt process into a new function,
which is also now ALWAYS called 16 minutes after sigint has been
pressed, regardless if the shutdown even began in the first place.
2025-11-26 11:46:53 -05:00
5d25b3fb9a Main-locked threads are panic wrapped 2025-11-24 19:43:25 -05:00
f97f5010e2 Remove actors added through Add if they fail to initialize/config 2025-11-24 19:41:48 -05:00
8f8d2e13b3 Change "stopped with error" symbol from warn to fatal 2025-11-24 10:40:41 -05:00
4cef5df83e Don't print duplicate stack traces/errors when things crash 2025-11-24 10:38:44 -05:00
32c8e7f7c3 Add MainRunnable interface for locking to the main thread 2025-11-24 10:21:41 -05:00
f0611e53ce examples/panic: Make the actor resettable 2025-11-23 13:41:05 -05:00
c0064323d8 Don't attempt to restart non-resettable actors 2025-11-23 13:38:54 -05:00
eca2b35057 Fix crash when deleting an actor 2025-09-17 20:55:33 -04:00
d34af2c4ee Add Cleanupable interface
The Cleanup method is called on actors when they exit
2025-09-17 20:45:01 -04:00
e21cd9ed11 The cron actor now respects --fast-timing 2025-09-17 17:03:27 -04:00
70dc9702bd Add a --fast-timing option to make things happen sooner
The smaller time scale is better for debugging
2025-09-17 14:54:12 -04:00
613e21597b Add a --crash-on-error flag to crash whenever an actor returns err 2025-09-17 11:36:12 -04:00
126374fbac Print stack trace of all goroutines when emergency halting if verbose 2025-09-15 20:40:08 -04:00
02f06d857e Fix deadlock when deleting actors 2025-09-14 19:22:42 -04:00
3d25441e7a Log config file paths after the --config one is added 2025-05-21 15:24:23 -04:00
9 changed files with 450 additions and 67 deletions

View File

@@ -127,3 +127,22 @@ type RunShutdownable interface {
// should be shut down.
Shutdown(ctx context.Context) error
}
// Cleanupable is any object that must be cleaned up after it has stopped for
// good. Actors which implement this interface will be cleaned up after they
// are deleted from the environment.
type Cleanupable interface {
Cleanup(ctx context.Context) error
}
// MainRunnable is any object with a function that must be bound to the main
// thread. Only one actor may implement this at a time, and it must have been
// added at the start of the environment as an argument to the run function.
type MainRunnable interface {
// RunMain is run in the main thread and must stop once ShutdownMain
// is called.
RunMain() error
// ShutdownMain is like [RunShutdownable.Shutdown], but unblocks RunMain
// instead of Run.
ShutdownMain(ctx context.Context) error
}

133
actors/http.go Normal file
View File

@@ -0,0 +1,133 @@
package http
import "fmt"
import "log"
import "errors"
import "context"
import "net/http"
import cf "git.tebibyte.media/sashakoshka/camfish"
var _ cf.Actor = new(HTTP)
var _ cf.RunShutdownable = new(HTTP)
var _ cf.Configurable = new(HTTP)
var _ cf.Initializable = new(HTTP)
// HTTP is an actor providing an HTTP server. Its configuration options are
// as follows:
//
// - http.address: The host:port to serve on
// - http.cert-file: The location of the public TLS/SSL certificate
// - http.key-file: The location of the private TLS/SSL key
//
// If neither cert-file nor key-file are specified, the actor will serve
// over HTTPS instead of HTTP.
type HTTP struct {
// Typ determines the actor's type. If empty, it defaults to "http".
// Once the actor has been used in any way at all, this field must
// never be modified.
Typ string
// Handler is used to handle HTTP requests. If nil, the server will
// respond with a "404 Not found" error.
// Once the actor has been used in any way at all, this field must
// never be modified.
Handler http.Handler
// DefaultAddr specifies the default address to serve on if none is
// specified. This itself defaults to localhost:8080.
DefaultAddr
server *http.Server
address string
certFile string
keyFile string
}
// Type returns "http", or this.Typ if specified.
func (this *HTTP) Type() string {
if this.Typ == "" {
return "http"
}
return this.Typ
}
// Configure configures the actor.
func (this *HTTP) Configure(conf cf.Config) error {
{
value := conf.Get(this.Type() + ".address")
if value == "" {
if this.DefaultAddr == "" {
value = "localhost:8080"
} else {
value = this.DefaultAddr
}
this.address = value
}
{
certFile := this.Type() + "cert-file"
keyFile := this.Type() + "key-file"
this.certFile = conf.Get(certFile)
this.keyFile = conf.Get(keyFile)
if this.certFile == "" && this.keyFile != "" ||
this.certFile != "" && this.keyFile == "" {
return fmt.Errorf(
"both %s and %s http.key-file must be specified, or neither",
this.certFile,
this.keyFile)
}
}
return nil
}
// Init initializes the actor.
func (this *HTTP) Init(ctx context.Context) error {
this.server = &http.Server {
Addr: this.address,
Handler: this.Handler,
}
return ctx.Err()
}
// Run runs the actor.
func (this *HTTP) Run() error {
log.Printf("(i) [%s] listening on %s", this.Type(), this.address)
err := this.server.ListenAndServe()
if errors.Is(err, http.ErrServerClosed) { return nil }
return err
}
// Shutdown shuts down the actor.
func (this *HTTP) Shutdown(ctx context.Context) error {
return this.server.Shutdown(ctx)
}
var _ cf.Actor = new(HTTPServer)
var _ cf.RunShutdownable = new(HTTPServer)
var _ cf.Initializable = new(HTTPServer)
// HTTPServer lets you use a pre-existing [http.Server] as an actor. It has
// no configuration or anything.
type HTTPServer struct {
// Server must be non-nil.
*HTTP.Server
// Typ determines the actor's type. If empty, it defaults to "http".
// Once the actor has been used in any way at all, this field must
// never be modified.
Typ string
}
// Type returns "http", or this.Typ if specified.
func (this *HTTPServer) Type() string {
if this.Typ == "" {
return "http"
}
return this.Typ
}
// Run runs the actor.
func (this *HTTPServer) Run() error {
log.Printf("(i) [%s] listening on %s", this.Type(), this.Addr)
err := this.server.ListenAndServe()
if errors.Is(err, http.ErrServerClosed) { return nil }
return err
}

View File

@@ -35,6 +35,8 @@ type actorInfo struct {
// incremented automatically for each actor that is added. It is never
// less than one.
order int
// initial is true if this actor was added as an argument to [Run].
initial bool
}
// actorSetIface holds only the add/del/clear methods of actorSet.
@@ -60,23 +62,12 @@ func (sets *actorSets) All() iter.Seq[actorSetIface] {
// add adds an actor under the given parent context. This is a write operation.
func (this *actorSets) add(ctx context.Context, actor Actor) {
if this.inf == nil { this.inf = make(map[Actor] actorInfo)}
actorCtx, done := context.WithCancel(ctx)
this.nextOrder ++
info := actorInfo {
ctx: actorCtx,
done: done,
order: this.nextOrder,
}
_, isRunnable := actor.(Runnable)
_, isRunShutdownable := actor.(RunShutdownable)
if isRunnable || isRunShutdownable {
info.stopped = make(chan struct { })
}
this.inf[actor] = info
for set := range this.All() {
set.add(actor)
}
this.addInternal(actorInfo { }, ctx, actor)
}
// addInitial is like add, but marks the actor as initial. This is a write operation.
func (this *actorSets) addInitial(ctx context.Context, actor Actor) {
this.addInternal(actorInfo { initial: true }, ctx, actor)
}
// del removes an actor. This is a write operation.
@@ -101,6 +92,24 @@ func (this *actorSets) info(actor Actor) actorInfo {
return this.inf[actor]
}
func (this *actorSets) addInternal(inf actorInfo, ctx context.Context, actor Actor) {
if this.inf == nil { this.inf = make(map[Actor] actorInfo)}
actorCtx, done := context.WithCancel(ctx)
this.nextOrder ++
inf.ctx = actorCtx
inf.done = done
inf.order = this.nextOrder
_, isRunnable := actor.(Runnable)
_, isRunShutdownable := actor.(RunShutdownable)
if isRunnable || isRunShutdownable {
inf.stopped = make(chan struct { })
}
this.inf[actor] = inf
for set := range this.All() {
set.add(actor)
}
}
// sortActors sorts actors according to the order in which they were added.
func sortActors[T comparable] (sets *actorSets, actors []T) []T {
slices.SortFunc(actors, func (left, right T) int {

View File

@@ -7,6 +7,7 @@ import "context"
// tasks on time intervals.
type cron struct {
trimFunc func() bool
fastTiming bool
timing struct {
trimInterval time.Duration
}
@@ -28,6 +29,9 @@ func (this *cron) Configure (config Config) error {
}
this.timing.trimInterval = value
}
if this.fastTiming {
this.timing.trimInterval = time.Second * 10
}
return nil
}

View File

@@ -8,6 +8,7 @@ import "sync"
import "time"
import "errors"
import "context"
import "runtime"
import "sync/atomic"
import "golang.org/x/sync/errgroup"
import "git.tebibyte.media/sashakoshka/go-util/sync"
@@ -19,9 +20,11 @@ const defaultRestartInitialInterval = 8 * time.Second
const defaultRestartInitialIncrement = 8 * time.Second
const defaultRestartInitialMaximum = 1 * time.Hour
const defaultResetTimeout = 8 * time.Minute
const defaultCleanupTimeout = 1 * time.Minute
const defaultTrimInterval = 1 * time.Minute
const defaultTrimTimeout = 1 * time.Minute
const defaultShutdownTimeout = 8 * time.Minute
const defaultSigintTimeout = 16 * time.Minute
// environment is an object which handles requests by package-level functions.
// It is only a separate object for testing purposes.
@@ -29,10 +32,13 @@ type environment struct {
name string
description string
actors usync.RWMonitor[*actorSets]
main MainRunnable
ctx context.Context
done context.CancelCauseFunc
group sync.WaitGroup
noneLeft chan struct { }
conf MutableConfig
cron *cron
// flags stores information from built-in flags.
flags struct {
@@ -42,6 +48,8 @@ type environment struct {
configFile string
verbose bool
crash bool
crashOnError bool
fastTiming bool
}
// running stores whether the environment is currently running.
@@ -59,8 +67,10 @@ type environment struct {
restartIntervalIncrement atomicDuration
restartIntervalMaximum atomicDuration
resetTimeout atomicDuration
cleanupTimeout atomicDuration
trimTimeout atomicDuration
shutdownTimeout atomicDuration
sigintTimeout atomicDuration
}
}
@@ -73,16 +83,19 @@ func (this *environment) Run(name, description string, actors ...Actor) {
this.ctx, this.done = context.WithCancelCause(context.Background())
defer this.done(nil)
daemon.OnSigint(func() { this.done(ErrProcessKilled) })
daemon.OnSigint(this.handleSigint)
defer log.SetOutput(os.Stderr)
this.name = name
this.description = description
this.actors = usync.NewRWMonitor(&actorSets { })
this.addToSets(actors...)
this.addToSets(&cron {
this.cron = &cron {
trimFunc: this.phase70_5Trimming,
})
}
this.addToSetsInitial(actors...)
this.addToSetsInitial(this.cron)
this.noneLeft = make(chan struct { })
if !this.phase10FlagParsing() { os.Exit(2) }
if !this.phase13PidFileCreation() { os.Exit(1) }
@@ -104,6 +117,21 @@ func (this *environment) Done(cause error) {
// Add implements the package-level function [Add].
func (this *environment) Add(ctx context.Context, actors ...Actor) error {
this.addToSets(actors...)
cleanUp := func() {
this.delFromSets(actors...)
}
for _, actor := range actors {
if actor, ok := actor.(Configurable); ok {
err := actor.Configure(this.conf)
if err != nil {
cleanUp()
return fmt.Errorf (
"could not apply configuration to %s: %w",
actor.(Actor).Type(), err)
}
}
}
initializable := make([]Initializable, 0, len(actors))
for _, actor := range actors {
if actor, ok := actor.(Initializable); ok {
@@ -111,16 +139,12 @@ func (this *environment) Add(ctx context.Context, actors ...Actor) error {
}
}
err := this.initializeActors(ctx, initializable...)
if err != nil { return err }
for _, actor := range actors {
if actor, ok := actor.(Configurable); ok {
err := actor.Configure(this.conf)
if err != nil {
return fmt.Errorf (
"could not apply configuration to %s: %w",
actor.(Actor).Type(), err)
}
if err != nil {
if this.flags.crashOnError {
panic(fmt.Sprint(err))
}
cleanUp()
return err
}
for _, actor := range actors {
_, isRunnable := actor.(Runnable)
@@ -137,11 +161,15 @@ func (this *environment) Del(ctx context.Context, actors ...Actor) error {
channels := []<- chan struct { } { }
for _, actor := range actors {
info := this.info(actor)
if info.done != nil {
if info.stopped != nil {
channels = append(channels, info.stopped)
}
if info.done != nil {
info.done()
}
}
for _, channel := range channels {
if channel == nil { continue }
select {
case <- channel:
case <- ctx.Done():
@@ -196,6 +224,15 @@ func (this *environment) addToSets(actors ...Actor) {
}
}
// addToSetsInitial adds the actors to the actorSets in a thread-safe manner.
func (this *environment) addToSetsInitial(actors ...Actor) {
thisActors, done := this.actors.Borrow()
defer done()
for _, actor := range actors {
thisActors.addInitial(this.ctx, actor)
}
}
// delFromSets deletes the actors from the actorSets in a thread-safe manner.
func (this *environment) delFromSets(actors ...Actor) {
thisActors, done := this.actors.Borrow()
@@ -221,17 +258,37 @@ func (this *environment) start(actor Actor) {
go this.run(actor)
}
// run runs the given actor, restarting it if it fails. This function will exit
// when the actor's context is canceled. The actor will be removed from the
// run runs the given actor. This function will exit
// when the actor's context is canceled, or the actor has stopped or exited.
// The actor will be removed from the
// environment once this function exits, and the environment's wait group
// counter will be decremented. note that this function will never increment the
// wait group counter, so start should usually be used instead.
func (this *environment) run(actor Actor) {
typ := actor.Type()
// clean up when done
defer this.group.Done()
defer func() {
this.group.Done()
this.delFromSets(actor)
if actor, ok := actor.(Cleanupable); ok {
ctx, done := context.WithTimeout(
context.Background(),
defaul(
this.timing.cleanupTimeout.Load(),
defaultCleanupTimeout))
defer done()
err := actor.Cleanup(ctx)
if err != nil {
log.Printf("XXX [%s] failed to cleanup: %v", typ, err)
if this.flags.crashOnError {
panic(fmt.Sprint(err))
}
}
}
}()
// logging
typ := actor.Type()
if this.Verb() { log.Printf("(i) [%s] running", typ) }
var stopErr error
var exited bool
@@ -243,14 +300,21 @@ func (this *environment) run(actor Actor) {
if this.Verb() { log.Printf("(i) [%s] stopped", typ) }
}
} else {
log.Printf("!!! [%s] stopped with error: %v", typ, stopErr)
log.Printf("XXX [%s] stopped with error: %v", typ, stopErr)
if this.flags.crashOnError {
panic(fmt.Sprint(stopErr))
}
}
}()
// contains context information
info := this.info(actor)
ctx := info.ctx
defer close(info.stopped)
defer func() {
if info.stopped != nil {
close(info.stopped)
}
}()
switch actor := actor.(type) {
case Runnable:
@@ -304,12 +368,28 @@ func (this *environment) runRunnable(ctx context.Context, actor Runnable) (stopE
return
} else {
// failure
if this.flags.crashOnError {
panic(fmt.Sprint(err))
}
// if an actor isn't resettable, don't reset
// or restart it
if _, ok := actor.(Resettable); !ok {
stopErr = err
return
}
// print the failure message here because we won't be
// returning to run().
log.Printf("XXX [%s] failed: %v", typ, err)
}
// restart logic
if time.Since(lastStart) < restartThreshold {
log.Printf("!!! [%s] failed too soon, restarting in %v", typ, restartInterval)
if this.flags.crashOnError {
panic("failed too soon")
}
timer := time.NewTimer(restartInterval)
select {
case <- timer.C:
@@ -326,7 +406,8 @@ func (this *environment) runRunnable(ctx context.Context, actor Runnable) (stopE
restartInterval = restartInitialInterval
}
// reset if needed
// reset if needed. this condition will always be true because
// this code path is restricted to Resettable anyway
if actor, ok := actor.(Resettable); ok {
if this.Verb() { log.Printf("... [%s] resetting", typ) }
func() {
@@ -335,6 +416,9 @@ func (this *environment) runRunnable(ctx context.Context, actor Runnable) (stopE
err := actor.Reset(ctx)
if err != nil {
log.Printf("XXX [%s] failed to reset", typ)
if this.flags.crashOnError {
panic("failed to reset")
}
}
}()
if this.Verb() { log.Printf(".// [%s] reset", typ) }
@@ -394,6 +478,7 @@ func (this *environment) applyConfig() error {
}
return nil
}
// TODO: trim interval
err := parseDuration("init-timeout", &this.timing.initTimeout)
if err != nil { return err }
err = parseDuration("restart-threshold", &this.timing.restartThreshold)
@@ -406,13 +491,43 @@ func (this *environment) applyConfig() error {
if err != nil { return err }
err = parseDuration("reset-timeout", &this.timing.resetTimeout)
if err != nil { return err }
err = parseDuration("cleanup-timeout", &this.timing.cleanupTimeout)
if err != nil { return err }
err = parseDuration("trim-timeout", &this.timing.trimTimeout)
if err != nil { return err }
err = parseDuration("shutdown-timeout", &this.timing.shutdownTimeout)
if err != nil { return err }
err = parseDuration("sigint-timeout", &this.timing.sigintTimeout)
if err != nil { return err }
if this.flags.fastTiming {
this.timing.shutdownTimeout.Store(time.Second * 10)
this.timing.sigintTimeout.Store(time.Second * 12)
}
return nil
}
// handleSigint is called when the program receives a SIGINT signal.
func (this *environment) handleSigint() {
this.done(ErrProcessKilled)
go func() {
time.Sleep(defaul(this.timing.sigintTimeout.Load(), defaultSigintTimeout))
this.emergencyHalt("final SIGNINT deadline expired")
}()
}
func (this *environment) emergencyHalt(reason string) {
log.Printf("XXX (80) %s, performing emergency halt", reason)
if Verb() || this.flags.crashOnError {
dumpBuffer := make([]byte, 8192)
runtime.Stack(dumpBuffer, true)
log.Printf("XXX (80) stack trace of all goroutines:\n%s", dumpBuffer)
}
log.Printf("====== [%s] END =======", this.name)
os.Exit(1)
}
type runShutdownableShim struct {
underlying RunShutdownable
shutdownTimeout time.Duration

View File

@@ -18,7 +18,8 @@ func main() {
// actor is an incorrectly implemented actor that panics and errs randomly.
type actor struct { }
var _ camfish.Runnable = new(actor)
var _ camfish.Runnable = new(actor)
var _ camfish.Resettable = new(actor)
func (this *actor) Type() string { return "panic" }
func (this *actor) Run(ctx context.Context) error {
@@ -33,3 +34,8 @@ func (this *actor) Run(ctx context.Context) error {
}
}
}
func (this *actor) Reset(ctx context.Context) error {
log.Println("(i) [panic] here is where we would reset the actor")
return ctx.Err()
}

107
phases.go
View File

@@ -6,6 +6,7 @@ import "log"
import "io/fs"
import "errors"
import "context"
import "runtime"
import "strings"
import "path/filepath"
import "git.tebibyte.media/sashakoshka/go-cli"
@@ -18,13 +19,15 @@ func (this *environment) phase10FlagParsing() bool {
name: this.name,
description: this.description,
}
flagHelp := set.Flag('h', "help", "Display usage information and exit", nil)
flagPidFile := set.Flag('p', "pid-file", "Write the PID to the specified file", cli.ValString)
flagUser := set.Flag('u', "user", "The user:group to run as", cli.ValString)
flagLogDirectory := set.Flag('l', "log-directory", "Write logs to the specified directory", cli.ValString)
flagConfigFile := set.Flag('c', "config-file", "Use this configuration file", cli.ValString)
flagVerbose := set.Flag('v', "verbose", "Enable verbose output/logging", nil)
flagCrash := set.Flag(0, "crash", "Crash when an actor panics", nil)
flagHelp := set.Flag('h', "help", "Display usage information and exit", nil)
flagPidFile := set.Flag('p', "pid-file", "Write the PID to the specified file", cli.ValString)
flagUser := set.Flag('u', "user", "The user:group to run as", cli.ValString)
flagLogDirectory := set.Flag('l', "log-directory", "Write logs to the specified directory", cli.ValString)
flagConfigFile := set.Flag('c', "config-file", "Use this configuration file", cli.ValString)
flagVerbose := set.Flag('v', "verbose", "(debug) Enable verbose output/logging", nil)
flagCrash := set.Flag(0, "crash", "(debug) Crash when an actor panics", nil)
flagCrashOnError := set.Flag(0, "crash-on-error", "(debug) Crash when an actor experiences any error", nil)
flagFastTiming := set.Flag(0, "fast-timing", "(debug) Make timed things happen faster/more often", nil)
// ask actors to add flags
actors, done := this.actors.RBorrow()
@@ -64,6 +67,14 @@ func (this *environment) phase10FlagParsing() bool {
if _, ok := flagCrash.First(); ok {
this.flags.crash = true
}
if _, ok := flagCrashOnError.First(); ok {
this.flags.crash = true
this.flags.crashOnError = true
}
if _, ok := flagFastTiming.First(); ok {
this.flags.fastTiming = true
this.cron.fastTiming = true
}
return true
}
@@ -129,15 +140,15 @@ func (this *environment) phase30ConfigurationParsing() bool {
log.Println("!!! (30) could not determine location of file(s):", err)
return true
}
if this.flags.configFile != "" {
paths = append(paths, this.flags.configFile)
}
if this.Verb() {
log.Println("(i) (30) have configuration files:")
for _, paths := range paths {
log.Println("(i) (30) -", paths)
}
}
if this.flags.configFile != "" {
paths = append(paths, this.flags.configFile)
}
// parse every config and merge them all
configs := make([]iniConfig, 0, len(paths))
for _, path := range paths {
@@ -188,6 +199,8 @@ func (this *environment) phase50ConfigurationApplication() bool {
actors, done := this.actors.RBorrow()
defer done()
for _, actor := range sortActors(actors, actors.configurable.all()) {
if !actors.info(actor.(Actor)).initial { continue }
if this.Verb() { log.Printf ("... (50) applying configuration to %s", actor.(Actor).Type())}
err := actor.Configure(this.conf)
if err != nil {
log.Printf (
@@ -202,13 +215,24 @@ func (this *environment) phase50ConfigurationApplication() bool {
func (this *environment) phase60Initialization() bool {
if this.Verb() { log.Println("... (60) initializing") }
// this fucking sucks in sorry
var initializable []Initializable
func() {
actors, done := this.actors.RBorrow()
defer done()
initializable = actors.initializable.all()
}()
if err := this.initializeActors(this.ctx, initializable...); err != nil {
// filter out non-initial actors
initializableInitial := initializable
index := 0
for _, actor := range initializable {
if !this.info(actor.(Actor)).initial { continue }
initializableInitial = initializable[:index + 1]
initializableInitial[index] = actor
index ++
}
if err := this.initializeActors(this.ctx, initializableInitial...); err != nil {
log.Println("XXX (60) failed to initialize:", err)
return false
}
@@ -217,6 +241,34 @@ func (this *environment) phase60Initialization() bool {
}
func (this *environment) phase70Running() bool {
for actor := range this.All() {
if actor, ok := actor.(MainRunnable); ok {
this.main = actor
}
}
bodyReturn := make(chan bool)
go func() {
bodyReturn <- this.phase70RunningBody()
if this.main != nil {
shutdownCtx, done := context.WithTimeout(
context.Background(),
defaul(this.timing.shutdownTimeout.Load(),
defaultShutdownTimeout))
defer done()
this.main.ShutdownMain(shutdownCtx)
}
}()
mainReturn := false
if this.main != nil {
mainReturn = this.phase70_2MainBind()
}
return <- bodyReturn && mainReturn
}
func (this *environment) phase70RunningBody() bool {
defer this.Done(nil)
if this.Verb() { log.Println("... (70) starting up") }
this.running.Store(true)
@@ -225,29 +277,45 @@ func (this *environment) phase70Running() bool {
actors, done := this.actors.RBorrow()
defer done()
for _, actor := range actors.runnable.all() {
if !actors.info(actor.(Actor)).initial { continue }
this.start(actor.(Actor))
}
for _, actor := range actors.runShutdownable.all() {
if !actors.info(actor.(Actor)).initial { continue }
this.start(actor.(Actor))
}
}()
log.Println(".// (70) startup sequence complete")
// await context cancellation or waitgroup completion
wgChannel := make(chan struct { }, 1)
go func() {
this.group.Wait()
wgChannel <- struct { } { }
close(this.noneLeft)
}()
select {
case <- this.ctx.Done():
if this.Verb() { log.Println("(i) (70) canceled") }
case <- wgChannel:
case <- this.noneLeft:
if this.Verb() { log.Println("(i) (70) all actors have finished") }
}
return true
}
func (this *environment) phase70_2MainBind() bool{
mainActor := this.main.(Actor)
if this.Verb() { log.Printf("... (70.2) binding %s to main thread", mainActor.Type()) }
runtime.LockOSThread()
if this.Verb() { log.Printf(".// (70.2) main thread bind complete") }
defer runtime.UnlockOSThread()
err := panicWrap(this.main.RunMain)
if err != nil {
log.Printf("XXX [%s] main thread failed: %v", mainActor.Type(), err)
return false
}
if this.Verb() { log.Printf("(i) (70.2) main thread exited") }
return true
}
func (this *environment) phase70_5Trimming() bool {
if this.Verb() { log.Println("... (70.5) trimming") }
var trimmable []Trimmable
@@ -258,6 +326,9 @@ func (this *environment) phase70_5Trimming() bool {
}()
if err := this.trimActors(this.ctx, trimmable...); err != nil {
log.Println(".// (70.5) failed to trim:", err)
if this.flags.crashOnError {
panic(err)
}
return false
}
if this.Verb() { log.Println(".// (70.5) trimmed") }
@@ -265,6 +336,7 @@ func (this *environment) phase70_5Trimming() bool {
}
func (this *environment) phase80Shutdown() bool {
logActors(All())
ctx, done := context.WithTimeout(
context.Background(),
defaul(this.timing.shutdownTimeout.Load(), defaultShutdownTimeout))
@@ -272,9 +344,7 @@ func (this *environment) phase80Shutdown() bool {
go func() {
<- ctx.Done()
if errors.Is(context.Cause(ctx), context.DeadlineExceeded) {
log.Println("XXX (80) shutdown timeout expired, performing emergency halt")
log.Printf("====== [%s] END =======", this.name)
os.Exit(1)
this.emergencyHalt("shutdown timeout expired")
}
}()
@@ -287,6 +357,7 @@ func (this *environment) phase80Shutdown() bool {
log.Println(".// (80) shutdown succeeded, goodbye")
log.Printf("====== [%s] END =======", this.name)
}()
this.group.Wait()
// wait for all actors to shut down
<- this.noneLeft
return cause == nil
}

14
run.go
View File

@@ -52,12 +52,14 @@ var env environment
//
// 70. Running: Actors which implement [Runnable] or [RunShutdownable] are
// run, each in their own goroutine. The environment is able to restart
// actors which have failed, which entails resetting the actor if it
// implements [Resettable], and running the actor again within the same
// actors which have failed if they implement [Resettable], which entails
// resetting the actor and running it again within the same
// goroutine. If an actor does not run for a meaningful amount of time
// after resetting/initialization before failing, it is considered erratic
// and further attempts to restart it will be spaced by a limited,
// constantly increasing time interval. The timing is configurable, but by
// constantly increasing time interval.
//
// The timing is configurable, but by
// default the threshold for a meaningful amount of runtime is 16 seconds,
// the initial delay interval is 8 seconds, the interval increase per
// attempt is 8 seconds, and the maximum interval is one hour.
@@ -66,6 +68,12 @@ var env environment
// configurable, but by default it is once every minute. When an actor
// which implements [Resettable] is reset, it is given a configurable
// timeout, which is 8 minutes by default.
//
// If one of the actors directly passed to Run implements MainRunnable,
// it will be bound to the main thread and the CAMFISH environment will
// be run in a different thread. If more than one actor implementing
// MainRunnable is passed to the Run function, only the first one is
// considered.
//
// 80. Shutdown: This can be triggered by all actors being removed from the
// environment, a catastrophic error, [Done] being called, or the program

32
util.go
View File

@@ -14,16 +14,34 @@ import "sync/atomic"
import "unicode/utf8"
import "runtime/debug"
func panicErr(message any, stack []byte) (err error) {
if panErr, ok := message.(error); ok {
err = panErr
type panicError struct {
wrapped error
stack []byte
}
func (this panicError) Error() string {
if this.stack == nil {
return this.wrapped.Error()
} else {
err = errors.New(fmt.Sprint(message))
return fmt.Sprintf("%v: %s", this.wrapped, this.stack)
}
if stack != nil {
err = fmt.Errorf("%w: %s", err, stack)
}
func (this panicError) Unwrap() error {
return this.wrapped
}
func panicErr(message any, stack []byte) (err error) {
var wrapped error
if panErr, ok := message.(error); ok {
wrapped = panErr
} else {
wrapped = errors.New(fmt.Sprint(message))
}
return panicError {
wrapped: wrapped,
stack: stack,
}
return err
}
func defaul[T comparable](value, def T) T {