Compare commits

...

22 Commits
v0.2.0 ... main

Author SHA1 Message Date
861d6af1d7 Add --crash flag to crash and print stack trace on panic 2025-04-08 10:29:27 -04:00
ac2db05d06 Fix local config path 2025-03-10 01:38:17 -04:00
f4904884ad Improve named actor interface 2025-03-09 01:56:09 -05:00
7d0620fe3e Fix system config file path 2025-03-09 01:55:11 -05:00
3115c5feef Actors are formatted better when logged 2025-03-09 01:54:53 -05:00
10ca4f4671 Add "named" actor interface 2025-02-05 16:49:27 -05:00
e0c8825949 Clarify documentation for Actor 2025-01-31 17:26:45 -05:00
b12ffdb0a0 Say *why* an actor failed 2025-01-30 21:12:55 -05:00
bfca8c6f4a Fix util_test.go 2025-01-30 19:59:25 -05:00
641624ef3b examples/broken: Demonstrate new fix working 2025-01-30 19:58:11 -05:00
bae737e6b8 Environment exits if shutdown takes too long 2025-01-30 19:57:40 -05:00
0077a5d115 examples/http: Fix logging 2025-01-30 19:39:37 -05:00
772e9ca290 examples/http: Add HTTP example to demonstrate RunShutdownable 2025-01-30 19:30:07 -05:00
5e38cec135 Environment now actually runs RunShutdownable actors 2025-01-30 19:29:39 -05:00
326db33ecc Environment can now run RunShutdownable actors 2025-01-30 19:29:23 -05:00
dc7c7b5c73 Add variation of panicWrap that does not take in a context 2025-01-30 19:29:02 -05:00
1133e261bf Add support for RunShutdownable to actorSet 2025-01-30 19:28:33 -05:00
b4c55decc6 Document RunShutdownable in Run 2025-01-30 18:28:16 -05:00
2cb1e5cb3a Clarify temporal semantics for RunShutdownable.Shutdown 2025-01-30 18:27:37 -05:00
bbe833bb53 Add the RunShutdowner actor interface 2025-01-30 15:54:13 -05:00
e1ccb4d6e8 Fix phase logging 2025-01-30 15:27:34 -05:00
b026250cab mock: Fix tests 2025-01-30 15:27:23 -05:00
11 changed files with 311 additions and 47 deletions

View File

@ -3,20 +3,40 @@ package camfish
import "context"
// Actor is a participant in the environment. All public methods on an actor
// must be safe for concurrent use by multiple goroutines. Additionally, any
// type which explicitly implements Actor should:
// should be safe for concurrent use by multiple goroutines except for AddFlags,
// Init, Configure, and ProcessConfig. Additionally, any type which explicitly
// implements Actor should:
//
// - Treat all public fields, values, indices, etc. as immutable
// - Satisfy Actor as a pointer, not a value
// - Not have a constructor
//
// The CAMFISH environment will use interfaces in this package to probe actors
// for methods. If an actor is supposed to fulfill one of these interfaces, this
// should be enforced at compile-time by assigning the actor to an anonymous
// global variable of that interface type. For instance, this line will ensure
// that SomeActor fulfills [Resettable]:
//
// var _ camfish.Resettable = new(SomeActor)
type Actor interface {
// Type returns the type name of the actor. The value returned from this
// is used to locate actors capable of performing a specific task, so it
// absolutely must return the same string every time. Actors implemented
// in packages besides this one (i.e. not camfish) must not return the
// string "cron".
// Type returns the "type name" of the actor. The value returned from
// this is used to locate actors capable of performing a specific task,
// so it absolutely must return the same string every time. It is
// usually best to have this be unique to each actor. Actors implemented
// in packages other than this one
// (git.tebibyte.media/sashakoshka/camfish) must not return the string
// "cron".
Type() string
}
// Named is any object with a name.
type Named interface {
// Name returns the name. This doesn't need to be the same as Type. It
// must return the same string every time. It is used to differentiate
// actors of the same type in logs.
Name() string
}
// FlagAdder is any object that can add [Flag]s to a [FlagSet]. Actors which
// implement this interface will be called upon to add flags during and only
// during the flag parsing phase.
@ -88,3 +108,22 @@ type Resettable interface {
// invalid and any process which depends on it should be shut down.
Reset(ctx context.Context) error
}
// RunShutdownable is any object that needs a context in order to shut down.
// Actors which implement this interface cannot implement the Runnable
// interface. This can be used to run an http.Server as an actor.
type RunShutdownable interface {
// Run is similar to [Runnable.Run], but takes no context and blocks
// until Shutdown has run and exited. It may also return when something
// goes wrong and it cannot continue, in which case it must return a
// non-nil error explaining why. Shutdown does not need to be called in
// the latter case.
Run() error
// Shutdown shuts down the actor. It must unblock Run in all cases even
// on failure, context expiration, etc. Shutdown must return when or
// before the context expires, and must return ctx.Err if there is no
// other error to be returned. If Shutdown returns any error, the object
// must be treated as invalid and any other process which depends on it
// should be shut down.
Shutdown(ctx context.Context) error
}

View File

@ -16,6 +16,7 @@ type actorSets struct {
configurable actorSet[Configurable]
initializable actorSet[Initializable]
runnable actorSet[Runnable]
runShutdownable actorSet[RunShutdownable]
trimmable actorSet[Trimmable]
}
@ -52,6 +53,7 @@ func (sets *actorSets) All() iter.Seq[actorSetIface] {
yield(&sets.configurable)
yield(&sets.initializable)
yield(&sets.runnable)
yield(&sets.runShutdownable)
yield(&sets.trimmable)
}
}
@ -66,7 +68,9 @@ func (this *actorSets) add(ctx context.Context, actor Actor) {
done: done,
order: this.nextOrder,
}
if _, ok := actor.(Runnable); ok {
_, isRunnable := actor.(Runnable)
_, isRunShutdownable := actor.(RunShutdownable)
if isRunnable || isRunShutdownable {
info.stopped = make(chan struct { })
}
this.inf[actor] = info

View File

@ -41,6 +41,7 @@ type environment struct {
logDirectory string
configFile string
verbose bool
crash bool
}
// running stores whether the environment is currently running.
@ -122,7 +123,9 @@ func (this *environment) Add(ctx context.Context, actors ...Actor) error {
}
}
for _, actor := range actors {
if actor, ok := actor.(Runnable); ok {
_, isRunnable := actor.(Runnable)
_, isRunShutdownable := actor.(RunShutdownable)
if isRunnable || isRunShutdownable {
this.start(actor)
}
}
@ -213,7 +216,7 @@ func (this *environment) info(actor Actor) actorInfo {
// start increments the wait group by one and starts the given actor in the
// background, restarting it if it fails. this function will exit immediately.
// see the documentation for run for details.
func (this *environment) start(actor Runnable) {
func (this *environment) start(actor Actor) {
this.group.Add(1)
go this.run(actor)
}
@ -223,14 +226,12 @@ func (this *environment) start(actor Runnable) {
// environment once this function exits, and the environment's wait group
// counter will be decremented. note that this function will never increment the
// wait group counter, so start should usually be used instead.
func (this *environment) run(actor Runnable) {
func (this *environment) run(actor Actor) {
// clean up when done
defer this.group.Done()
// logging
acto, ok := actor.(Actor)
if !ok { return }
typ := acto.Type()
typ := actor.Type()
if this.Verb() { log.Printf("(i) [%s] running", typ) }
var stopErr error
var exited bool
@ -247,10 +248,26 @@ func (this *environment) run(actor Runnable) {
}()
// contains context information
info := this.info(acto)
info := this.info(actor)
ctx := info.ctx
defer close(info.stopped)
switch actor := actor.(type) {
case Runnable:
stopErr, exited = this.runRunnable(ctx, actor)
case RunShutdownable:
stopErr, exited = this.runRunnable(ctx, &runShutdownableShim {
shutdownTimeout: defaul(this.timing.shutdownTimeout.Load(), defaultShutdownTimeout),
underlying: actor,
})
default:
panic("actor was neither Runnable or RunShutdownable")
}
}
// runRunnable runs an actor implementing [Runnable]. this should only be called
// from within [environment.run].
func (this *environment) runRunnable(ctx context.Context, actor Runnable) (stopErr error, exited bool) {
// timing
restartThreshold := defaul(this.timing.restartThreshold.Load(), defaultRestartThreshold)
restartInitialInterval := defaul(this.timing.restartInitialInterval.Load(), defaultRestartInitialInterval)
@ -259,11 +276,19 @@ func (this *environment) run(actor Runnable) {
resetTimeout := defaul(this.timing.resetTimeout.Load(), defaultResetTimeout)
restartInterval := restartInitialInterval
// main loop
acto, ok := actor.(Actor)
if !ok { return }
typ := acto.Type()
for {
// run actor
lastStart := time.Now()
err := panicWrap(ctx, actor.Run)
var err error
if this.flags.crash {
err = actor.Run(ctx)
} else {
err = panicWrapCtx(ctx, actor.Run)
}
// detect context cancellation
if ctxErr := ctx.Err(); ctxErr != nil {
@ -279,7 +304,7 @@ func (this *environment) run(actor Runnable) {
return
} else {
// failure
log.Printf("XXX [%s] failed", typ)
log.Printf("XXX [%s] failed: %v", typ, err)
}
// restart logic
@ -387,3 +412,24 @@ func (this *environment) applyConfig() error {
if err != nil { return err }
return nil
}
type runShutdownableShim struct {
underlying RunShutdownable
shutdownTimeout time.Duration
}
func (this *runShutdownableShim) Type() string {
return this.underlying.(Actor).Type()
}
func (this *runShutdownableShim) Run(ctx context.Context) error {
ctx, done := context.WithCancel(ctx)
defer done()
go func() {
<- ctx.Done()
shutdownCtx, done := context.WithTimeout(context.Background(), this.shutdownTimeout)
defer done()
this.underlying.Shutdown(shutdownCtx)
}()
return this.underlying.Run()
}

27
examples/broken/main.go Normal file
View File

@ -0,0 +1,27 @@
// Example broken demonstrates how the environment will forcibly kill the
// program if an actor cannot shut down.
package main
import "os"
import "log"
import "context"
import "git.tebibyte.media/sashakoshka/camfish"
func main() {
camfish.Run("broken",
"Example broken demonstrates how the environment will " +
"forcibly kill the program if an actor cannot shut down",
new(broken))
}
// broken is an incorrectly implemented actor that cannot shut down.
type broken struct { }
var _ camfish.Runnable = new(broken)
func (this *broken) Type() string { return "broken" }
func (this *broken) Run(ctx context.Context) error {
log.Println("(i) [broken] wait for approximately 8 minutes")
log.Printf("(i) [broken] if impatient, run: kill -9 %d", os.Getpid())
<- (chan struct { })(nil)
return ctx.Err() // unreachable, of course
}

90
examples/http/main.go Normal file
View File

@ -0,0 +1,90 @@
// Example http demonstrates the usage of [camfish.RunShutdowner] to run an http
// server.
package main
import "fmt"
import "log"
import "iter"
import "errors"
import "context"
import "net/http"
import "git.tebibyte.media/sashakoshka/camfish"
import "git.tebibyte.media/sashakoshka/go-util/sync"
func main() {
camfish.Run("http",
"Example http demonstrates the usage of " +
"camfish.RunShutdowner to run an http server",
new(httpServer),
new(database))
}
// httpServer serves data over http.
type httpServer struct {
server *http.Server
database *database
}
var _ camfish.RunShutdownable = new(httpServer)
var _ camfish.Initializable = new(httpServer)
func (this *httpServer) Type() string { return "http-server" }
func (this *httpServer) Init(ctx context.Context) error {
this.server = &http.Server {
Addr: "localhost:8080",
Handler: this,
}
if actor, ok := camfish.Find("database").(*database); ok {
this.database = actor
} else {
return errors.New("could not locate database")
}
return ctx.Err()
}
func (this *httpServer) Run() error {
log.Printf("(i) [http-server] listening on %s", this.server.Addr)
err := this.server.ListenAndServe()
if errors.Is(err, http.ErrServerClosed) { return nil }
return err
}
func (this *httpServer) Shutdown(ctx context.Context) error {
return this.server.Shutdown(ctx)
}
func (this *httpServer) ServeHTTP(res http.ResponseWriter, req *http.Request) {
fmt.Fprintf(res, "<!DOCTYPE html><html><head><title>inventory</title></head><body>")
fmt.Fprintf(res, "<table><tr><th>Item</th><th>Count</th></tr>")
for item, count := range this.database.Inventory() {
fmt.Fprintf(res, "<tr><td>%s</td><td>%d</td></tr>", item, count)
}
fmt.Fprintf(res, "</table>")
fmt.Fprintf(res, "</body></html>")
}
// database provides data that can be served.
type database struct {
inventory usync.RWMonitor[map[string] int]
}
func (this *database) Type() string { return "database" }
func (this *database) Init(ctx context.Context) error {
this.inventory.Set(map[string] int {
"screws": 34,
"blood": 90,
"paperclips": 5230,
"wood": 3,
"grains of rice": 238409,
})
return ctx.Err()
}
func (this *database) Inventory() iter.Seq2[string, int] {
return func(yield func(string, int) bool) {
inventory, done := this.inventory.RBorrow()
defer done()
for item, amount := range inventory {
yield(item, amount)
}
}
}

4
ini.go
View File

@ -162,7 +162,7 @@ func configFiles(program string) ([]string, error) {
userConfig, err := os.UserConfigDir()
if err != nil { return nil, err }
return []string {
filepath.Join("/etc", program),
filepath.Join(userConfig, program),
filepath.Join("/etc", program, program + ".conf"),
filepath.Join(userConfig, program, program + ".conf"),
}, nil
}

View File

@ -8,16 +8,16 @@ func TestConfig(test *testing.T) {
"multiple": []string { "item0", "item1" },
"empty": []string { },
}
if correct, got := config.Get("single"), "aslkdjasd"; correct != got {
if correct, got := "aslkdjasd", config.Get("single"); correct != got {
test.Fatal("not equal:", got)
}
if correct, got := config.Get("multiple"), "item0"; correct != got {
if correct, got := "item0", config.Get("multiple"); correct != got {
test.Fatal("not equal:", got)
}
if correct, got := config.Get("empty"), ""; correct != got {
if correct, got := "", config.Get("empty"); correct != got {
test.Fatal("not equal:", got)
}
if correct, got := config.Get("non-existent"), ""; correct != got {
if correct, got := "", config.Get("non-existent"); correct != got {
test.Fatal("not equal:", got)
}
for index, value := range config.GetAll("single") {

View File

@ -24,6 +24,7 @@ func (this *environment) phase10FlagParsing() bool {
flagLogDirectory := set.Flag('l', "log-directory", "Write logs to the specified directory", cli.ValString)
flagConfigFile := set.Flag('c', "config-file", "Use this configuration file", cli.ValString)
flagVerbose := set.Flag('v', "verbose", "Enable verbose output/logging", nil)
flagCrash := set.Flag(0, "crash", "Crash when an actor panics", nil)
// ask actors to add flags
actors, done := this.actors.RBorrow()
@ -60,6 +61,9 @@ func (this *environment) phase10FlagParsing() bool {
if _, ok := flagVerbose.First(); ok {
this.flags.verbose = true
}
if _, ok := flagCrash.First(); ok {
this.flags.crash = true
}
return true
}
@ -199,7 +203,7 @@ func (this *environment) phase60Initialization() bool {
initializable = actors.initializable.all()
}()
if err := this.initializeActors(this.ctx, initializable...); err != nil {
log.Println(".// (60) failed to initialize:", err)
log.Println("XXX (60) failed to initialize:", err)
return false
}
if this.Verb() { log.Println(".// (60) initialized") }
@ -215,7 +219,10 @@ func (this *environment) phase70Running() bool {
actors, done := this.actors.RBorrow()
defer done()
for _, actor := range actors.runnable.all() {
this.start(actor)
this.start(actor.(Actor))
}
for _, actor := range actors.runShutdownable.all() {
this.start(actor.(Actor))
}
}()
@ -252,6 +259,19 @@ func (this *environment) phase70_5Trimming() bool {
}
func (this *environment) phase80Shutdown() bool {
ctx, done := context.WithTimeout(
context.Background(),
defaul(this.timing.shutdownTimeout.Load(), defaultShutdownTimeout))
defer done()
go func() {
<- ctx.Done()
if errors.Is(context.Cause(ctx), context.DeadlineExceeded) {
log.Println("XXX (80) shutdown timeout expired, performing emergency halt")
log.Printf("====== [%s] END =======", this.name)
os.Exit(1)
}
}()
cause := context.Cause(this.ctx)
if cause != nil {
log.Println("XXX (80) shutting down because:", cause)

31
run.go
View File

@ -50,21 +50,22 @@ var env environment
// is configurable, but by default it is 8 minutes. The vast majority of
// actors should initialize in under 100 milliseconds.
//
// 70. Running: Actors which implement [Runnable] are run, each in their own
// goroutine. The environment is able to restart actors which have failed,
// which entails resetting the actor if it implements [Resettable], and
// running the actor again within the same goroutine. If an actor does not
// run for a meaningful amount of time after resetting/initialization
// before failing, it is considered erratic and further attempts to restart
// it will be spaced by a limited, constantly increasing time interval. The
// timing is configurable, but by default the threshold for a meaningful
// amount of runtime is 16 seconds, the initial delay interval is 8
// seconds, the interval increase per attempt is 8 seconds, and the maximum
// interval is one hour. Additionally, programs which implement [Trimmable]
// will be trimmed regularly whenever they are running. The trimming
// interval is also configurable, but by default it is once every minute.
// When an actor which implements [Resettable] is reset, it is given a
// configurable timeout, which is 8 minutes by default.
// 70. Running: Actors which implement [Runnable] or [RunShutdownable] are
// run, each in their own goroutine. The environment is able to restart
// actors which have failed, which entails resetting the actor if it
// implements [Resettable], and running the actor again within the same
// goroutine. If an actor does not run for a meaningful amount of time
// after resetting/initialization before failing, it is considered erratic
// and further attempts to restart it will be spaced by a limited,
// constantly increasing time interval. The timing is configurable, but by
// default the threshold for a meaningful amount of runtime is 16 seconds,
// the initial delay interval is 8 seconds, the interval increase per
// attempt is 8 seconds, and the maximum interval is one hour.
// Additionally, programs which implement [Trimmable] will be trimmed
// regularly whenever they are running. The trimming interval is also
// configurable, but by default it is once every minute. When an actor
// which implements [Resettable] is reset, it is given a configurable
// timeout, which is 8 minutes by default.
//
// 80. Shutdown: This can be triggered by all actors being removed from the
// environment, a catastrophic error, [Done] being called, or the program

23
util.go
View File

@ -19,7 +19,22 @@ func defaul[T comparable](value, def T) T {
return value
}
func panicWrap(ctx context.Context, f func (context.Context) error) (err error) {
func panicWrap(f func() error) (err error) {
defer func () {
if pan := recover(); pan != nil {
if panErr, ok := pan.(error); ok {
err = panErr
} else {
err = errors.New(fmt.Sprint(pan))
}
}
} ()
err = f()
return
}
func panicWrapCtx(ctx context.Context, f func(context.Context) error) (err error) {
defer func () {
if pan := recover(); pan != nil {
if panErr, ok := pan.(error); ok {
@ -62,7 +77,11 @@ func logActors (actors iter.Seq[Actor]) {
}
types := make(map[string] int)
for actor := range actors {
types[actor.Type()] += 1
typ := actor.Type()
if named, ok := actor.(Named); ok {
typ = fmt.Sprintf("%s/%s", typ, named.Name())
}
types[typ] += 1
}
for typ, count := range types {
if count > 1 {

View File

@ -17,17 +17,35 @@ func TestDefaul(test *testing.T) {
}
func TestPanicWrap(test *testing.T) {
err := panicWrap(context.Background(), func (ctx context.Context) error {
err := panicWrap(func () error {
return errors.New("test case 0")
})
test.Log(err)
if err.Error() != "test case 0" { test.Fatal("not equal") }
err = panicWrap(context.Background(), func (ctx context.Context) error {
err = panicWrap(func () error {
panic(errors.New("test case 1"))
})
test.Log(err)
if err.Error() != "test case 1" { test.Fatal("not equal") }
err = panicWrap(context.Background(), func (ctx context.Context) error {
err = panicWrap( func () error {
return nil
})
test.Log(err)
if err != nil { test.Fatal("not equal") }
}
func TestPanicWrapCtx(test *testing.T) {
err := panicWrapCtx(context.Background(), func (ctx context.Context) error {
return errors.New("test case 0")
})
test.Log(err)
if err.Error() != "test case 0" { test.Fatal("not equal") }
err = panicWrapCtx(context.Background(), func (ctx context.Context) error {
panic(errors.New("test case 1"))
})
test.Log(err)
if err.Error() != "test case 1" { test.Fatal("not equal") }
err = panicWrapCtx(context.Background(), func (ctx context.Context) error {
return nil
})
test.Log(err)