Compare commits
18 Commits
772e9ca290
...
v0.3.5
| Author | SHA1 | Date | |
|---|---|---|---|
| 126374fbac | |||
| 02f06d857e | |||
| 3d25441e7a | |||
| 735d314a19 | |||
| 4f4c7a0627 | |||
| 461d0b77e9 | |||
| 861d6af1d7 | |||
| ac2db05d06 | |||
| f4904884ad | |||
| 7d0620fe3e | |||
| 3115c5feef | |||
| 10ca4f4671 | |||
| e0c8825949 | |||
| b12ffdb0a0 | |||
| bfca8c6f4a | |||
| 641624ef3b | |||
| bae737e6b8 | |||
| 0077a5d115 |
34
actor.go
34
actor.go
@@ -3,20 +3,40 @@ package camfish
|
||||
import "context"
|
||||
|
||||
// Actor is a participant in the environment. All public methods on an actor
|
||||
// must be safe for concurrent use by multiple goroutines. Additionally, any
|
||||
// type which explicitly implements Actor should:
|
||||
// should be safe for concurrent use by multiple goroutines except for AddFlags,
|
||||
// Init, Configure, and ProcessConfig. Additionally, any type which explicitly
|
||||
// implements Actor should:
|
||||
//
|
||||
// - Treat all public fields, values, indices, etc. as immutable
|
||||
// - Satisfy Actor as a pointer, not a value
|
||||
// - Not have a constructor
|
||||
//
|
||||
// The CAMFISH environment will use interfaces in this package to probe actors
|
||||
// for methods. If an actor is supposed to fulfill one of these interfaces, this
|
||||
// should be enforced at compile-time by assigning the actor to an anonymous
|
||||
// global variable of that interface type. For instance, this line will ensure
|
||||
// that SomeActor fulfills [Resettable]:
|
||||
//
|
||||
// var _ camfish.Resettable = new(SomeActor)
|
||||
type Actor interface {
|
||||
// Type returns the type name of the actor. The value returned from this
|
||||
// is used to locate actors capable of performing a specific task, so it
|
||||
// absolutely must return the same string every time. Actors implemented
|
||||
// in packages besides this one (i.e. not camfish) must not return the
|
||||
// string "cron".
|
||||
// Type returns the "type name" of the actor. The value returned from
|
||||
// this is used to locate actors capable of performing a specific task,
|
||||
// so it absolutely must return the same string every time. It is
|
||||
// usually best to have this be unique to each actor. Actors implemented
|
||||
// in packages other than this one
|
||||
// (git.tebibyte.media/sashakoshka/camfish) must not return the string
|
||||
// "cron".
|
||||
Type() string
|
||||
}
|
||||
|
||||
// Named is any object with a name.
|
||||
type Named interface {
|
||||
// Name returns the name. This doesn't need to be the same as Type. It
|
||||
// must return the same string every time. It is used to differentiate
|
||||
// actors of the same type in logs.
|
||||
Name() string
|
||||
}
|
||||
|
||||
// FlagAdder is any object that can add [Flag]s to a [FlagSet]. Actors which
|
||||
// implement this interface will be called upon to add flags during and only
|
||||
// during the flag parsing phase.
|
||||
|
||||
@@ -41,6 +41,7 @@ type environment struct {
|
||||
logDirectory string
|
||||
configFile string
|
||||
verbose bool
|
||||
crash bool
|
||||
}
|
||||
|
||||
// running stores whether the environment is currently running.
|
||||
@@ -136,11 +137,13 @@ func (this *environment) Del(ctx context.Context, actors ...Actor) error {
|
||||
channels := []<- chan struct { } { }
|
||||
for _, actor := range actors {
|
||||
info := this.info(actor)
|
||||
if info.done != nil {
|
||||
if info.stopped != nil {
|
||||
channels = append(channels, info.stopped)
|
||||
}
|
||||
info.done()
|
||||
}
|
||||
for _, channel := range channels {
|
||||
if channel == nil { continue }
|
||||
select {
|
||||
case <- channel:
|
||||
case <- ctx.Done():
|
||||
@@ -249,7 +252,11 @@ func (this *environment) run(actor Actor) {
|
||||
// contains context information
|
||||
info := this.info(actor)
|
||||
ctx := info.ctx
|
||||
defer close(info.stopped)
|
||||
defer func() {
|
||||
if info.stopped != nil {
|
||||
close(info.stopped)
|
||||
}
|
||||
}()
|
||||
|
||||
switch actor := actor.(type) {
|
||||
case Runnable:
|
||||
@@ -282,7 +289,12 @@ func (this *environment) runRunnable(ctx context.Context, actor Runnable) (stopE
|
||||
for {
|
||||
// run actor
|
||||
lastStart := time.Now()
|
||||
err := panicWrapCtx(ctx, actor.Run)
|
||||
var err error
|
||||
if this.flags.crash {
|
||||
err = actor.Run(ctx)
|
||||
} else {
|
||||
err = panicWrapCtx(ctx, actor.Run)
|
||||
}
|
||||
|
||||
// detect context cancellation
|
||||
if ctxErr := ctx.Err(); ctxErr != nil {
|
||||
@@ -298,7 +310,7 @@ func (this *environment) runRunnable(ctx context.Context, actor Runnable) (stopE
|
||||
return
|
||||
} else {
|
||||
// failure
|
||||
log.Printf("XXX [%s] failed", typ)
|
||||
log.Printf("XXX [%s] failed: %v", typ, err)
|
||||
}
|
||||
|
||||
// restart logic
|
||||
|
||||
27
examples/broken/main.go
Normal file
27
examples/broken/main.go
Normal file
@@ -0,0 +1,27 @@
|
||||
// Example broken demonstrates how the environment will forcibly kill the
|
||||
// program if an actor cannot shut down.
|
||||
package main
|
||||
|
||||
import "os"
|
||||
import "log"
|
||||
import "context"
|
||||
import "git.tebibyte.media/sashakoshka/camfish"
|
||||
|
||||
func main() {
|
||||
camfish.Run("broken",
|
||||
"Example broken demonstrates how the environment will " +
|
||||
"forcibly kill the program if an actor cannot shut down",
|
||||
new(broken))
|
||||
}
|
||||
|
||||
// broken is an incorrectly implemented actor that cannot shut down.
|
||||
type broken struct { }
|
||||
var _ camfish.Runnable = new(broken)
|
||||
func (this *broken) Type() string { return "broken" }
|
||||
|
||||
func (this *broken) Run(ctx context.Context) error {
|
||||
log.Println("(i) [broken] wait for approximately 8 minutes")
|
||||
log.Printf("(i) [broken] if impatient, run: kill -9 %d", os.Getpid())
|
||||
<- (chan struct { })(nil)
|
||||
return ctx.Err() // unreachable, of course
|
||||
}
|
||||
@@ -42,7 +42,7 @@ func (this *httpServer) Init(ctx context.Context) error {
|
||||
}
|
||||
|
||||
func (this *httpServer) Run() error {
|
||||
log.Printf("[http] listening on %s", this.server.Addr)
|
||||
log.Printf("(i) [http-server] listening on %s", this.server.Addr)
|
||||
err := this.server.ListenAndServe()
|
||||
if errors.Is(err, http.ErrServerClosed) { return nil }
|
||||
return err
|
||||
|
||||
35
examples/panic/main.go
Normal file
35
examples/panic/main.go
Normal file
@@ -0,0 +1,35 @@
|
||||
// Example panic demonstrates how the environment can restart actors if they
|
||||
// fail.
|
||||
package main
|
||||
|
||||
import "log"
|
||||
import "time"
|
||||
import "errors"
|
||||
import "context"
|
||||
import "math/rand"
|
||||
import "git.tebibyte.media/sashakoshka/camfish"
|
||||
|
||||
func main() {
|
||||
camfish.Run("panic",
|
||||
"Example panic demonstrates how the environment can restart " +
|
||||
"actors if they fail",
|
||||
new(actor))
|
||||
}
|
||||
|
||||
// actor is an incorrectly implemented actor that panics and errs randomly.
|
||||
type actor struct { }
|
||||
var _ camfish.Runnable = new(actor)
|
||||
func (this *actor) Type() string { return "panic" }
|
||||
|
||||
func (this *actor) Run(ctx context.Context) error {
|
||||
log.Println("(i) [panic] panicking in 10 seconds")
|
||||
select {
|
||||
case <- ctx.Done(): return ctx.Err()
|
||||
case <- time.After(time.Second * 10):
|
||||
if rand.Int() % 2 == 0 {
|
||||
panic("this is a panic")
|
||||
} else {
|
||||
return errors.New("this is an error")
|
||||
}
|
||||
}
|
||||
}
|
||||
4
ini.go
4
ini.go
@@ -162,7 +162,7 @@ func configFiles(program string) ([]string, error) {
|
||||
userConfig, err := os.UserConfigDir()
|
||||
if err != nil { return nil, err }
|
||||
return []string {
|
||||
filepath.Join("/etc", program),
|
||||
filepath.Join(userConfig, program),
|
||||
filepath.Join("/etc", program, program + ".conf"),
|
||||
filepath.Join(userConfig, program, program + ".conf"),
|
||||
}, nil
|
||||
}
|
||||
|
||||
29
phases.go
29
phases.go
@@ -6,6 +6,7 @@ import "log"
|
||||
import "io/fs"
|
||||
import "errors"
|
||||
import "context"
|
||||
import "runtime"
|
||||
import "strings"
|
||||
import "path/filepath"
|
||||
import "git.tebibyte.media/sashakoshka/go-cli"
|
||||
@@ -24,6 +25,7 @@ func (this *environment) phase10FlagParsing() bool {
|
||||
flagLogDirectory := set.Flag('l', "log-directory", "Write logs to the specified directory", cli.ValString)
|
||||
flagConfigFile := set.Flag('c', "config-file", "Use this configuration file", cli.ValString)
|
||||
flagVerbose := set.Flag('v', "verbose", "Enable verbose output/logging", nil)
|
||||
flagCrash := set.Flag(0, "crash", "Crash when an actor panics", nil)
|
||||
|
||||
// ask actors to add flags
|
||||
actors, done := this.actors.RBorrow()
|
||||
@@ -60,6 +62,9 @@ func (this *environment) phase10FlagParsing() bool {
|
||||
if _, ok := flagVerbose.First(); ok {
|
||||
this.flags.verbose = true
|
||||
}
|
||||
if _, ok := flagCrash.First(); ok {
|
||||
this.flags.crash = true
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -128,6 +133,12 @@ func (this *environment) phase30ConfigurationParsing() bool {
|
||||
if this.flags.configFile != "" {
|
||||
paths = append(paths, this.flags.configFile)
|
||||
}
|
||||
if this.Verb() {
|
||||
log.Println("(i) (30) have configuration files:")
|
||||
for _, paths := range paths {
|
||||
log.Println("(i) (30) -", paths)
|
||||
}
|
||||
}
|
||||
// parse every config and merge them all
|
||||
configs := make([]iniConfig, 0, len(paths))
|
||||
for _, path := range paths {
|
||||
@@ -255,6 +266,24 @@ func (this *environment) phase70_5Trimming() bool {
|
||||
}
|
||||
|
||||
func (this *environment) phase80Shutdown() bool {
|
||||
ctx, done := context.WithTimeout(
|
||||
context.Background(),
|
||||
defaul(this.timing.shutdownTimeout.Load(), defaultShutdownTimeout))
|
||||
defer done()
|
||||
go func() {
|
||||
<- ctx.Done()
|
||||
if errors.Is(context.Cause(ctx), context.DeadlineExceeded) {
|
||||
log.Println("XXX (80) shutdown timeout expired, performing emergency halt")
|
||||
if Verb() {
|
||||
dumpBuffer := make([]byte, 8192)
|
||||
runtime.Stack(dumpBuffer, true)
|
||||
log.Printf("XXX (80) stack trace of all goroutines:\n%s", dumpBuffer)
|
||||
}
|
||||
log.Printf("====== [%s] END =======", this.name)
|
||||
os.Exit(1)
|
||||
}
|
||||
}()
|
||||
|
||||
cause := context.Cause(this.ctx)
|
||||
if cause != nil {
|
||||
log.Println("XXX (80) shutting down because:", cause)
|
||||
|
||||
31
util.go
31
util.go
@@ -12,6 +12,19 @@ import "strings"
|
||||
import "context"
|
||||
import "sync/atomic"
|
||||
import "unicode/utf8"
|
||||
import "runtime/debug"
|
||||
|
||||
func panicErr(message any, stack []byte) (err error) {
|
||||
if panErr, ok := message.(error); ok {
|
||||
err = panErr
|
||||
} else {
|
||||
err = errors.New(fmt.Sprint(message))
|
||||
}
|
||||
if stack != nil {
|
||||
err = fmt.Errorf("%w: %s", err, stack)
|
||||
}
|
||||
return err
|
||||
}
|
||||
|
||||
func defaul[T comparable](value, def T) T {
|
||||
var zero T
|
||||
@@ -22,11 +35,7 @@ func defaul[T comparable](value, def T) T {
|
||||
func panicWrap(f func() error) (err error) {
|
||||
defer func () {
|
||||
if pan := recover(); pan != nil {
|
||||
if panErr, ok := pan.(error); ok {
|
||||
err = panErr
|
||||
} else {
|
||||
err = errors.New(fmt.Sprint(pan))
|
||||
}
|
||||
err = panicErr(pan, debug.Stack())
|
||||
}
|
||||
} ()
|
||||
|
||||
@@ -37,11 +46,7 @@ func panicWrap(f func() error) (err error) {
|
||||
func panicWrapCtx(ctx context.Context, f func(context.Context) error) (err error) {
|
||||
defer func () {
|
||||
if pan := recover(); pan != nil {
|
||||
if panErr, ok := pan.(error); ok {
|
||||
err = panErr
|
||||
} else {
|
||||
err = errors.New(fmt.Sprint(pan))
|
||||
}
|
||||
err = panicErr(pan, debug.Stack())
|
||||
}
|
||||
} ()
|
||||
|
||||
@@ -77,7 +82,11 @@ func logActors (actors iter.Seq[Actor]) {
|
||||
}
|
||||
types := make(map[string] int)
|
||||
for actor := range actors {
|
||||
types[actor.Type()] += 1
|
||||
typ := actor.Type()
|
||||
if named, ok := actor.(Named); ok {
|
||||
typ = fmt.Sprintf("%s/%s", typ, named.Name())
|
||||
}
|
||||
types[typ] += 1
|
||||
}
|
||||
for typ, count := range types {
|
||||
if count > 1 {
|
||||
|
||||
12
util_test.go
12
util_test.go
@@ -17,17 +17,17 @@ func TestDefaul(test *testing.T) {
|
||||
}
|
||||
|
||||
func TestPanicWrap(test *testing.T) {
|
||||
err := panicWrap(func (ctx context.Context) error {
|
||||
err := panicWrap(func () error {
|
||||
return errors.New("test case 0")
|
||||
})
|
||||
test.Log(err)
|
||||
if err.Error() != "test case 0" { test.Fatal("not equal") }
|
||||
err = panicWrap(func (ctx context.Context) error {
|
||||
err = panicWrap(func () error {
|
||||
panic(errors.New("test case 1"))
|
||||
})
|
||||
test.Log(err)
|
||||
if err.Error() != "test case 1" { test.Fatal("not equal") }
|
||||
err = panicWrap( func (ctx context.Context) error {
|
||||
err = panicWrap( func () error {
|
||||
return nil
|
||||
})
|
||||
test.Log(err)
|
||||
@@ -35,17 +35,17 @@ func TestPanicWrap(test *testing.T) {
|
||||
}
|
||||
|
||||
func TestPanicWrapCtx(test *testing.T) {
|
||||
err := panicWrap(context.Background(), func (ctx context.Context) error {
|
||||
err := panicWrapCtx(context.Background(), func (ctx context.Context) error {
|
||||
return errors.New("test case 0")
|
||||
})
|
||||
test.Log(err)
|
||||
if err.Error() != "test case 0" { test.Fatal("not equal") }
|
||||
err = panicWrap(context.Background(), func (ctx context.Context) error {
|
||||
err = panicWrapCtx(context.Background(), func (ctx context.Context) error {
|
||||
panic(errors.New("test case 1"))
|
||||
})
|
||||
test.Log(err)
|
||||
if err.Error() != "test case 1" { test.Fatal("not equal") }
|
||||
err = panicWrap(context.Background(), func (ctx context.Context) error {
|
||||
err = panicWrapCtx(context.Background(), func (ctx context.Context) error {
|
||||
return nil
|
||||
})
|
||||
test.Log(err)
|
||||
|
||||
Reference in New Issue
Block a user