18 Commits

Author SHA1 Message Date
126374fbac Print stack trace of all goroutines when emergency halting if verbose 2025-09-15 20:40:08 -04:00
02f06d857e Fix deadlock when deleting actors 2025-09-14 19:22:42 -04:00
3d25441e7a Log config file paths after the --config one is added 2025-05-21 15:24:23 -04:00
735d314a19 examples/panic: Make an example to show off aforementioned feature 2025-05-21 13:50:34 -04:00
4f4c7a0627 Print stack trace when actors panic 2025-05-21 13:50:16 -04:00
461d0b77e9 Print out all config file paths on startup 2025-05-21 09:49:44 -04:00
861d6af1d7 Add --crash flag to crash and print stack trace on panic 2025-04-08 10:29:27 -04:00
ac2db05d06 Fix local config path 2025-03-10 01:38:17 -04:00
f4904884ad Improve named actor interface 2025-03-09 01:56:09 -05:00
7d0620fe3e Fix system config file path 2025-03-09 01:55:11 -05:00
3115c5feef Actors are formatted better when logged 2025-03-09 01:54:53 -05:00
10ca4f4671 Add "named" actor interface 2025-02-05 16:49:27 -05:00
e0c8825949 Clarify documentation for Actor 2025-01-31 17:26:45 -05:00
b12ffdb0a0 Say *why* an actor failed 2025-01-30 21:12:55 -05:00
bfca8c6f4a Fix util_test.go 2025-01-30 19:59:25 -05:00
641624ef3b examples/broken: Demonstrate new fix working 2025-01-30 19:58:11 -05:00
bae737e6b8 Environment exits if shutdown takes too long 2025-01-30 19:57:40 -05:00
0077a5d115 examples/http: Fix logging 2025-01-30 19:39:37 -05:00
9 changed files with 163 additions and 31 deletions

View File

@@ -3,20 +3,40 @@ package camfish
import "context" import "context"
// Actor is a participant in the environment. All public methods on an actor // Actor is a participant in the environment. All public methods on an actor
// must be safe for concurrent use by multiple goroutines. Additionally, any // should be safe for concurrent use by multiple goroutines except for AddFlags,
// type which explicitly implements Actor should: // Init, Configure, and ProcessConfig. Additionally, any type which explicitly
// implements Actor should:
//
// - Treat all public fields, values, indices, etc. as immutable // - Treat all public fields, values, indices, etc. as immutable
// - Satisfy Actor as a pointer, not a value // - Satisfy Actor as a pointer, not a value
// - Not have a constructor // - Not have a constructor
//
// The CAMFISH environment will use interfaces in this package to probe actors
// for methods. If an actor is supposed to fulfill one of these interfaces, this
// should be enforced at compile-time by assigning the actor to an anonymous
// global variable of that interface type. For instance, this line will ensure
// that SomeActor fulfills [Resettable]:
//
// var _ camfish.Resettable = new(SomeActor)
type Actor interface { type Actor interface {
// Type returns the type name of the actor. The value returned from this // Type returns the "type name" of the actor. The value returned from
// is used to locate actors capable of performing a specific task, so it // this is used to locate actors capable of performing a specific task,
// absolutely must return the same string every time. Actors implemented // so it absolutely must return the same string every time. It is
// in packages besides this one (i.e. not camfish) must not return the // usually best to have this be unique to each actor. Actors implemented
// string "cron". // in packages other than this one
// (git.tebibyte.media/sashakoshka/camfish) must not return the string
// "cron".
Type() string Type() string
} }
// Named is any object with a name.
type Named interface {
// Name returns the name. This doesn't need to be the same as Type. It
// must return the same string every time. It is used to differentiate
// actors of the same type in logs.
Name() string
}
// FlagAdder is any object that can add [Flag]s to a [FlagSet]. Actors which // FlagAdder is any object that can add [Flag]s to a [FlagSet]. Actors which
// implement this interface will be called upon to add flags during and only // implement this interface will be called upon to add flags during and only
// during the flag parsing phase. // during the flag parsing phase.

View File

@@ -41,6 +41,7 @@ type environment struct {
logDirectory string logDirectory string
configFile string configFile string
verbose bool verbose bool
crash bool
} }
// running stores whether the environment is currently running. // running stores whether the environment is currently running.
@@ -136,11 +137,13 @@ func (this *environment) Del(ctx context.Context, actors ...Actor) error {
channels := []<- chan struct { } { } channels := []<- chan struct { } { }
for _, actor := range actors { for _, actor := range actors {
info := this.info(actor) info := this.info(actor)
if info.done != nil { if info.stopped != nil {
channels = append(channels, info.stopped) channels = append(channels, info.stopped)
} }
info.done()
} }
for _, channel := range channels { for _, channel := range channels {
if channel == nil { continue }
select { select {
case <- channel: case <- channel:
case <- ctx.Done(): case <- ctx.Done():
@@ -249,7 +252,11 @@ func (this *environment) run(actor Actor) {
// contains context information // contains context information
info := this.info(actor) info := this.info(actor)
ctx := info.ctx ctx := info.ctx
defer close(info.stopped) defer func() {
if info.stopped != nil {
close(info.stopped)
}
}()
switch actor := actor.(type) { switch actor := actor.(type) {
case Runnable: case Runnable:
@@ -282,7 +289,12 @@ func (this *environment) runRunnable(ctx context.Context, actor Runnable) (stopE
for { for {
// run actor // run actor
lastStart := time.Now() lastStart := time.Now()
err := panicWrapCtx(ctx, actor.Run) var err error
if this.flags.crash {
err = actor.Run(ctx)
} else {
err = panicWrapCtx(ctx, actor.Run)
}
// detect context cancellation // detect context cancellation
if ctxErr := ctx.Err(); ctxErr != nil { if ctxErr := ctx.Err(); ctxErr != nil {
@@ -298,7 +310,7 @@ func (this *environment) runRunnable(ctx context.Context, actor Runnable) (stopE
return return
} else { } else {
// failure // failure
log.Printf("XXX [%s] failed", typ) log.Printf("XXX [%s] failed: %v", typ, err)
} }
// restart logic // restart logic

27
examples/broken/main.go Normal file
View File

@@ -0,0 +1,27 @@
// Example broken demonstrates how the environment will forcibly kill the
// program if an actor cannot shut down.
package main
import "os"
import "log"
import "context"
import "git.tebibyte.media/sashakoshka/camfish"
func main() {
camfish.Run("broken",
"Example broken demonstrates how the environment will " +
"forcibly kill the program if an actor cannot shut down",
new(broken))
}
// broken is an incorrectly implemented actor that cannot shut down.
type broken struct { }
var _ camfish.Runnable = new(broken)
func (this *broken) Type() string { return "broken" }
func (this *broken) Run(ctx context.Context) error {
log.Println("(i) [broken] wait for approximately 8 minutes")
log.Printf("(i) [broken] if impatient, run: kill -9 %d", os.Getpid())
<- (chan struct { })(nil)
return ctx.Err() // unreachable, of course
}

View File

@@ -42,7 +42,7 @@ func (this *httpServer) Init(ctx context.Context) error {
} }
func (this *httpServer) Run() error { func (this *httpServer) Run() error {
log.Printf("[http] listening on %s", this.server.Addr) log.Printf("(i) [http-server] listening on %s", this.server.Addr)
err := this.server.ListenAndServe() err := this.server.ListenAndServe()
if errors.Is(err, http.ErrServerClosed) { return nil } if errors.Is(err, http.ErrServerClosed) { return nil }
return err return err

35
examples/panic/main.go Normal file
View File

@@ -0,0 +1,35 @@
// Example panic demonstrates how the environment can restart actors if they
// fail.
package main
import "log"
import "time"
import "errors"
import "context"
import "math/rand"
import "git.tebibyte.media/sashakoshka/camfish"
func main() {
camfish.Run("panic",
"Example panic demonstrates how the environment can restart " +
"actors if they fail",
new(actor))
}
// actor is an incorrectly implemented actor that panics and errs randomly.
type actor struct { }
var _ camfish.Runnable = new(actor)
func (this *actor) Type() string { return "panic" }
func (this *actor) Run(ctx context.Context) error {
log.Println("(i) [panic] panicking in 10 seconds")
select {
case <- ctx.Done(): return ctx.Err()
case <- time.After(time.Second * 10):
if rand.Int() % 2 == 0 {
panic("this is a panic")
} else {
return errors.New("this is an error")
}
}
}

4
ini.go
View File

@@ -162,7 +162,7 @@ func configFiles(program string) ([]string, error) {
userConfig, err := os.UserConfigDir() userConfig, err := os.UserConfigDir()
if err != nil { return nil, err } if err != nil { return nil, err }
return []string { return []string {
filepath.Join("/etc", program), filepath.Join("/etc", program, program + ".conf"),
filepath.Join(userConfig, program), filepath.Join(userConfig, program, program + ".conf"),
}, nil }, nil
} }

View File

@@ -6,6 +6,7 @@ import "log"
import "io/fs" import "io/fs"
import "errors" import "errors"
import "context" import "context"
import "runtime"
import "strings" import "strings"
import "path/filepath" import "path/filepath"
import "git.tebibyte.media/sashakoshka/go-cli" import "git.tebibyte.media/sashakoshka/go-cli"
@@ -24,6 +25,7 @@ func (this *environment) phase10FlagParsing() bool {
flagLogDirectory := set.Flag('l', "log-directory", "Write logs to the specified directory", cli.ValString) flagLogDirectory := set.Flag('l', "log-directory", "Write logs to the specified directory", cli.ValString)
flagConfigFile := set.Flag('c', "config-file", "Use this configuration file", cli.ValString) flagConfigFile := set.Flag('c', "config-file", "Use this configuration file", cli.ValString)
flagVerbose := set.Flag('v', "verbose", "Enable verbose output/logging", nil) flagVerbose := set.Flag('v', "verbose", "Enable verbose output/logging", nil)
flagCrash := set.Flag(0, "crash", "Crash when an actor panics", nil)
// ask actors to add flags // ask actors to add flags
actors, done := this.actors.RBorrow() actors, done := this.actors.RBorrow()
@@ -60,6 +62,9 @@ func (this *environment) phase10FlagParsing() bool {
if _, ok := flagVerbose.First(); ok { if _, ok := flagVerbose.First(); ok {
this.flags.verbose = true this.flags.verbose = true
} }
if _, ok := flagCrash.First(); ok {
this.flags.crash = true
}
return true return true
} }
@@ -128,6 +133,12 @@ func (this *environment) phase30ConfigurationParsing() bool {
if this.flags.configFile != "" { if this.flags.configFile != "" {
paths = append(paths, this.flags.configFile) paths = append(paths, this.flags.configFile)
} }
if this.Verb() {
log.Println("(i) (30) have configuration files:")
for _, paths := range paths {
log.Println("(i) (30) -", paths)
}
}
// parse every config and merge them all // parse every config and merge them all
configs := make([]iniConfig, 0, len(paths)) configs := make([]iniConfig, 0, len(paths))
for _, path := range paths { for _, path := range paths {
@@ -255,6 +266,24 @@ func (this *environment) phase70_5Trimming() bool {
} }
func (this *environment) phase80Shutdown() bool { func (this *environment) phase80Shutdown() bool {
ctx, done := context.WithTimeout(
context.Background(),
defaul(this.timing.shutdownTimeout.Load(), defaultShutdownTimeout))
defer done()
go func() {
<- ctx.Done()
if errors.Is(context.Cause(ctx), context.DeadlineExceeded) {
log.Println("XXX (80) shutdown timeout expired, performing emergency halt")
if Verb() {
dumpBuffer := make([]byte, 8192)
runtime.Stack(dumpBuffer, true)
log.Printf("XXX (80) stack trace of all goroutines:\n%s", dumpBuffer)
}
log.Printf("====== [%s] END =======", this.name)
os.Exit(1)
}
}()
cause := context.Cause(this.ctx) cause := context.Cause(this.ctx)
if cause != nil { if cause != nil {
log.Println("XXX (80) shutting down because:", cause) log.Println("XXX (80) shutting down because:", cause)

31
util.go
View File

@@ -12,6 +12,19 @@ import "strings"
import "context" import "context"
import "sync/atomic" import "sync/atomic"
import "unicode/utf8" import "unicode/utf8"
import "runtime/debug"
func panicErr(message any, stack []byte) (err error) {
if panErr, ok := message.(error); ok {
err = panErr
} else {
err = errors.New(fmt.Sprint(message))
}
if stack != nil {
err = fmt.Errorf("%w: %s", err, stack)
}
return err
}
func defaul[T comparable](value, def T) T { func defaul[T comparable](value, def T) T {
var zero T var zero T
@@ -22,11 +35,7 @@ func defaul[T comparable](value, def T) T {
func panicWrap(f func() error) (err error) { func panicWrap(f func() error) (err error) {
defer func () { defer func () {
if pan := recover(); pan != nil { if pan := recover(); pan != nil {
if panErr, ok := pan.(error); ok { err = panicErr(pan, debug.Stack())
err = panErr
} else {
err = errors.New(fmt.Sprint(pan))
}
} }
} () } ()
@@ -37,11 +46,7 @@ func panicWrap(f func() error) (err error) {
func panicWrapCtx(ctx context.Context, f func(context.Context) error) (err error) { func panicWrapCtx(ctx context.Context, f func(context.Context) error) (err error) {
defer func () { defer func () {
if pan := recover(); pan != nil { if pan := recover(); pan != nil {
if panErr, ok := pan.(error); ok { err = panicErr(pan, debug.Stack())
err = panErr
} else {
err = errors.New(fmt.Sprint(pan))
}
} }
} () } ()
@@ -77,7 +82,11 @@ func logActors (actors iter.Seq[Actor]) {
} }
types := make(map[string] int) types := make(map[string] int)
for actor := range actors { for actor := range actors {
types[actor.Type()] += 1 typ := actor.Type()
if named, ok := actor.(Named); ok {
typ = fmt.Sprintf("%s/%s", typ, named.Name())
}
types[typ] += 1
} }
for typ, count := range types { for typ, count := range types {
if count > 1 { if count > 1 {

View File

@@ -17,17 +17,17 @@ func TestDefaul(test *testing.T) {
} }
func TestPanicWrap(test *testing.T) { func TestPanicWrap(test *testing.T) {
err := panicWrap(func (ctx context.Context) error { err := panicWrap(func () error {
return errors.New("test case 0") return errors.New("test case 0")
}) })
test.Log(err) test.Log(err)
if err.Error() != "test case 0" { test.Fatal("not equal") } if err.Error() != "test case 0" { test.Fatal("not equal") }
err = panicWrap(func (ctx context.Context) error { err = panicWrap(func () error {
panic(errors.New("test case 1")) panic(errors.New("test case 1"))
}) })
test.Log(err) test.Log(err)
if err.Error() != "test case 1" { test.Fatal("not equal") } if err.Error() != "test case 1" { test.Fatal("not equal") }
err = panicWrap( func (ctx context.Context) error { err = panicWrap( func () error {
return nil return nil
}) })
test.Log(err) test.Log(err)
@@ -35,17 +35,17 @@ func TestPanicWrap(test *testing.T) {
} }
func TestPanicWrapCtx(test *testing.T) { func TestPanicWrapCtx(test *testing.T) {
err := panicWrap(context.Background(), func (ctx context.Context) error { err := panicWrapCtx(context.Background(), func (ctx context.Context) error {
return errors.New("test case 0") return errors.New("test case 0")
}) })
test.Log(err) test.Log(err)
if err.Error() != "test case 0" { test.Fatal("not equal") } if err.Error() != "test case 0" { test.Fatal("not equal") }
err = panicWrap(context.Background(), func (ctx context.Context) error { err = panicWrapCtx(context.Background(), func (ctx context.Context) error {
panic(errors.New("test case 1")) panic(errors.New("test case 1"))
}) })
test.Log(err) test.Log(err)
if err.Error() != "test case 1" { test.Fatal("not equal") } if err.Error() != "test case 1" { test.Fatal("not equal") }
err = panicWrap(context.Background(), func (ctx context.Context) error { err = panicWrapCtx(context.Background(), func (ctx context.Context) error {
return nil return nil
}) })
test.Log(err) test.Log(err)