7 Commits

4 changed files with 81 additions and 13 deletions

View File

@@ -41,6 +41,7 @@ type environment struct {
logDirectory string
configFile string
verbose bool
crash bool
}
// running stores whether the environment is currently running.
@@ -136,11 +137,13 @@ func (this *environment) Del(ctx context.Context, actors ...Actor) error {
channels := []<- chan struct { } { }
for _, actor := range actors {
info := this.info(actor)
if info.done != nil {
if info.stopped != nil {
channels = append(channels, info.stopped)
}
info.done()
}
for _, channel := range channels {
if channel == nil { continue }
select {
case <- channel:
case <- ctx.Done():
@@ -249,7 +252,11 @@ func (this *environment) run(actor Actor) {
// contains context information
info := this.info(actor)
ctx := info.ctx
defer close(info.stopped)
defer func() {
if info.stopped != nil {
close(info.stopped)
}
}()
switch actor := actor.(type) {
case Runnable:
@@ -282,7 +289,12 @@ func (this *environment) runRunnable(ctx context.Context, actor Runnable) (stopE
for {
// run actor
lastStart := time.Now()
err := panicWrapCtx(ctx, actor.Run)
var err error
if this.flags.crash {
err = actor.Run(ctx)
} else {
err = panicWrapCtx(ctx, actor.Run)
}
// detect context cancellation
if ctxErr := ctx.Err(); ctxErr != nil {

35
examples/panic/main.go Normal file
View File

@@ -0,0 +1,35 @@
// Example panic demonstrates how the environment can restart actors if they
// fail.
package main
import "log"
import "time"
import "errors"
import "context"
import "math/rand"
import "git.tebibyte.media/sashakoshka/camfish"
func main() {
camfish.Run("panic",
"Example panic demonstrates how the environment can restart " +
"actors if they fail",
new(actor))
}
// actor is an incorrectly implemented actor that panics and errs randomly.
type actor struct { }
var _ camfish.Runnable = new(actor)
func (this *actor) Type() string { return "panic" }
func (this *actor) Run(ctx context.Context) error {
log.Println("(i) [panic] panicking in 10 seconds")
select {
case <- ctx.Done(): return ctx.Err()
case <- time.After(time.Second * 10):
if rand.Int() % 2 == 0 {
panic("this is a panic")
} else {
return errors.New("this is an error")
}
}
}

View File

@@ -6,6 +6,7 @@ import "log"
import "io/fs"
import "errors"
import "context"
import "runtime"
import "strings"
import "path/filepath"
import "git.tebibyte.media/sashakoshka/go-cli"
@@ -24,6 +25,7 @@ func (this *environment) phase10FlagParsing() bool {
flagLogDirectory := set.Flag('l', "log-directory", "Write logs to the specified directory", cli.ValString)
flagConfigFile := set.Flag('c', "config-file", "Use this configuration file", cli.ValString)
flagVerbose := set.Flag('v', "verbose", "Enable verbose output/logging", nil)
flagCrash := set.Flag(0, "crash", "Crash when an actor panics", nil)
// ask actors to add flags
actors, done := this.actors.RBorrow()
@@ -60,6 +62,9 @@ func (this *environment) phase10FlagParsing() bool {
if _, ok := flagVerbose.First(); ok {
this.flags.verbose = true
}
if _, ok := flagCrash.First(); ok {
this.flags.crash = true
}
return true
}
@@ -128,6 +133,12 @@ func (this *environment) phase30ConfigurationParsing() bool {
if this.flags.configFile != "" {
paths = append(paths, this.flags.configFile)
}
if this.Verb() {
log.Println("(i) (30) have configuration files:")
for _, paths := range paths {
log.Println("(i) (30) -", paths)
}
}
// parse every config and merge them all
configs := make([]iniConfig, 0, len(paths))
for _, path := range paths {
@@ -263,6 +274,11 @@ func (this *environment) phase80Shutdown() bool {
<- ctx.Done()
if errors.Is(context.Cause(ctx), context.DeadlineExceeded) {
log.Println("XXX (80) shutdown timeout expired, performing emergency halt")
if Verb() {
dumpBuffer := make([]byte, 8192)
runtime.Stack(dumpBuffer, true)
log.Printf("XXX (80) stack trace of all goroutines:\n%s", dumpBuffer)
}
log.Printf("====== [%s] END =======", this.name)
os.Exit(1)
}

25
util.go
View File

@@ -12,6 +12,19 @@ import "strings"
import "context"
import "sync/atomic"
import "unicode/utf8"
import "runtime/debug"
func panicErr(message any, stack []byte) (err error) {
if panErr, ok := message.(error); ok {
err = panErr
} else {
err = errors.New(fmt.Sprint(message))
}
if stack != nil {
err = fmt.Errorf("%w: %s", err, stack)
}
return err
}
func defaul[T comparable](value, def T) T {
var zero T
@@ -22,11 +35,7 @@ func defaul[T comparable](value, def T) T {
func panicWrap(f func() error) (err error) {
defer func () {
if pan := recover(); pan != nil {
if panErr, ok := pan.(error); ok {
err = panErr
} else {
err = errors.New(fmt.Sprint(pan))
}
err = panicErr(pan, debug.Stack())
}
} ()
@@ -37,11 +46,7 @@ func panicWrap(f func() error) (err error) {
func panicWrapCtx(ctx context.Context, f func(context.Context) error) (err error) {
defer func () {
if pan := recover(); pan != nil {
if panErr, ok := pan.(error); ok {
err = panErr
} else {
err = errors.New(fmt.Sprint(pan))
}
err = panicErr(pan, debug.Stack())
}
} ()