Compare commits
No commits in common. "772e9ca29079a996617b2a121d4a2f944567fb46" and "7e5a78fef7cf0b84c194de55bd35d7e1776ff053" have entirely different histories.
772e9ca290
...
7e5a78fef7
19
actor.go
19
actor.go
@ -88,22 +88,3 @@ type Resettable interface {
|
|||||||
// invalid and any process which depends on it should be shut down.
|
// invalid and any process which depends on it should be shut down.
|
||||||
Reset(ctx context.Context) error
|
Reset(ctx context.Context) error
|
||||||
}
|
}
|
||||||
|
|
||||||
// RunShutdownable is any object that needs a context in order to shut down.
|
|
||||||
// Actors which implement this interface cannot implement the Runnable
|
|
||||||
// interface. This can be used to run an http.Server as an actor.
|
|
||||||
type RunShutdownable interface {
|
|
||||||
// Run is similar to [Runnable.Run], but takes no context and blocks
|
|
||||||
// until Shutdown has run and exited. It may also return when something
|
|
||||||
// goes wrong and it cannot continue, in which case it must return a
|
|
||||||
// non-nil error explaining why. Shutdown does not need to be called in
|
|
||||||
// the latter case.
|
|
||||||
Run() error
|
|
||||||
// Shutdown shuts down the actor. It must unblock Run in all cases even
|
|
||||||
// on failure, context expiration, etc. Shutdown must return when or
|
|
||||||
// before the context expires, and must return ctx.Err if there is no
|
|
||||||
// other error to be returned. If Shutdown returns any error, the object
|
|
||||||
// must be treated as invalid and any other process which depends on it
|
|
||||||
// should be shut down.
|
|
||||||
Shutdown(ctx context.Context) error
|
|
||||||
}
|
|
||||||
|
@ -16,7 +16,6 @@ type actorSets struct {
|
|||||||
configurable actorSet[Configurable]
|
configurable actorSet[Configurable]
|
||||||
initializable actorSet[Initializable]
|
initializable actorSet[Initializable]
|
||||||
runnable actorSet[Runnable]
|
runnable actorSet[Runnable]
|
||||||
runShutdownable actorSet[RunShutdownable]
|
|
||||||
trimmable actorSet[Trimmable]
|
trimmable actorSet[Trimmable]
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -53,7 +52,6 @@ func (sets *actorSets) All() iter.Seq[actorSetIface] {
|
|||||||
yield(&sets.configurable)
|
yield(&sets.configurable)
|
||||||
yield(&sets.initializable)
|
yield(&sets.initializable)
|
||||||
yield(&sets.runnable)
|
yield(&sets.runnable)
|
||||||
yield(&sets.runShutdownable)
|
|
||||||
yield(&sets.trimmable)
|
yield(&sets.trimmable)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -68,9 +66,7 @@ func (this *actorSets) add(ctx context.Context, actor Actor) {
|
|||||||
done: done,
|
done: done,
|
||||||
order: this.nextOrder,
|
order: this.nextOrder,
|
||||||
}
|
}
|
||||||
_, isRunnable := actor.(Runnable)
|
if _, ok := actor.(Runnable); ok {
|
||||||
_, isRunShutdownable := actor.(RunShutdownable)
|
|
||||||
if isRunnable || isRunShutdownable {
|
|
||||||
info.stopped = make(chan struct { })
|
info.stopped = make(chan struct { })
|
||||||
}
|
}
|
||||||
this.inf[actor] = info
|
this.inf[actor] = info
|
||||||
|
@ -122,9 +122,7 @@ func (this *environment) Add(ctx context.Context, actors ...Actor) error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
for _, actor := range actors {
|
for _, actor := range actors {
|
||||||
_, isRunnable := actor.(Runnable)
|
if actor, ok := actor.(Runnable); ok {
|
||||||
_, isRunShutdownable := actor.(RunShutdownable)
|
|
||||||
if isRunnable || isRunShutdownable {
|
|
||||||
this.start(actor)
|
this.start(actor)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -215,7 +213,7 @@ func (this *environment) info(actor Actor) actorInfo {
|
|||||||
// start increments the wait group by one and starts the given actor in the
|
// start increments the wait group by one and starts the given actor in the
|
||||||
// background, restarting it if it fails. this function will exit immediately.
|
// background, restarting it if it fails. this function will exit immediately.
|
||||||
// see the documentation for run for details.
|
// see the documentation for run for details.
|
||||||
func (this *environment) start(actor Actor) {
|
func (this *environment) start(actor Runnable) {
|
||||||
this.group.Add(1)
|
this.group.Add(1)
|
||||||
go this.run(actor)
|
go this.run(actor)
|
||||||
}
|
}
|
||||||
@ -225,12 +223,14 @@ func (this *environment) start(actor Actor) {
|
|||||||
// environment once this function exits, and the environment's wait group
|
// environment once this function exits, and the environment's wait group
|
||||||
// counter will be decremented. note that this function will never increment the
|
// counter will be decremented. note that this function will never increment the
|
||||||
// wait group counter, so start should usually be used instead.
|
// wait group counter, so start should usually be used instead.
|
||||||
func (this *environment) run(actor Actor) {
|
func (this *environment) run(actor Runnable) {
|
||||||
// clean up when done
|
// clean up when done
|
||||||
defer this.group.Done()
|
defer this.group.Done()
|
||||||
|
|
||||||
// logging
|
// logging
|
||||||
typ := actor.Type()
|
acto, ok := actor.(Actor)
|
||||||
|
if !ok { return }
|
||||||
|
typ := acto.Type()
|
||||||
if this.Verb() { log.Printf("(i) [%s] running", typ) }
|
if this.Verb() { log.Printf("(i) [%s] running", typ) }
|
||||||
var stopErr error
|
var stopErr error
|
||||||
var exited bool
|
var exited bool
|
||||||
@ -247,26 +247,10 @@ func (this *environment) run(actor Actor) {
|
|||||||
}()
|
}()
|
||||||
|
|
||||||
// contains context information
|
// contains context information
|
||||||
info := this.info(actor)
|
info := this.info(acto)
|
||||||
ctx := info.ctx
|
ctx := info.ctx
|
||||||
defer close(info.stopped)
|
defer close(info.stopped)
|
||||||
|
|
||||||
switch actor := actor.(type) {
|
|
||||||
case Runnable:
|
|
||||||
stopErr, exited = this.runRunnable(ctx, actor)
|
|
||||||
case RunShutdownable:
|
|
||||||
stopErr, exited = this.runRunnable(ctx, &runShutdownableShim {
|
|
||||||
shutdownTimeout: defaul(this.timing.shutdownTimeout.Load(), defaultShutdownTimeout),
|
|
||||||
underlying: actor,
|
|
||||||
})
|
|
||||||
default:
|
|
||||||
panic("actor was neither Runnable or RunShutdownable")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// runRunnable runs an actor implementing [Runnable]. this should only be called
|
|
||||||
// from within [environment.run].
|
|
||||||
func (this *environment) runRunnable(ctx context.Context, actor Runnable) (stopErr error, exited bool) {
|
|
||||||
// timing
|
// timing
|
||||||
restartThreshold := defaul(this.timing.restartThreshold.Load(), defaultRestartThreshold)
|
restartThreshold := defaul(this.timing.restartThreshold.Load(), defaultRestartThreshold)
|
||||||
restartInitialInterval := defaul(this.timing.restartInitialInterval.Load(), defaultRestartInitialInterval)
|
restartInitialInterval := defaul(this.timing.restartInitialInterval.Load(), defaultRestartInitialInterval)
|
||||||
@ -275,14 +259,11 @@ func (this *environment) runRunnable(ctx context.Context, actor Runnable) (stopE
|
|||||||
resetTimeout := defaul(this.timing.resetTimeout.Load(), defaultResetTimeout)
|
resetTimeout := defaul(this.timing.resetTimeout.Load(), defaultResetTimeout)
|
||||||
restartInterval := restartInitialInterval
|
restartInterval := restartInitialInterval
|
||||||
|
|
||||||
acto, ok := actor.(Actor)
|
// main loop
|
||||||
if !ok { return }
|
|
||||||
typ := acto.Type()
|
|
||||||
|
|
||||||
for {
|
for {
|
||||||
// run actor
|
// run actor
|
||||||
lastStart := time.Now()
|
lastStart := time.Now()
|
||||||
err := panicWrapCtx(ctx, actor.Run)
|
err := panicWrap(ctx, actor.Run)
|
||||||
|
|
||||||
// detect context cancellation
|
// detect context cancellation
|
||||||
if ctxErr := ctx.Err(); ctxErr != nil {
|
if ctxErr := ctx.Err(); ctxErr != nil {
|
||||||
@ -406,24 +387,3 @@ func (this *environment) applyConfig() error {
|
|||||||
if err != nil { return err }
|
if err != nil { return err }
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type runShutdownableShim struct {
|
|
||||||
underlying RunShutdownable
|
|
||||||
shutdownTimeout time.Duration
|
|
||||||
}
|
|
||||||
|
|
||||||
func (this *runShutdownableShim) Type() string {
|
|
||||||
return this.underlying.(Actor).Type()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (this *runShutdownableShim) Run(ctx context.Context) error {
|
|
||||||
ctx, done := context.WithCancel(ctx)
|
|
||||||
defer done()
|
|
||||||
go func() {
|
|
||||||
<- ctx.Done()
|
|
||||||
shutdownCtx, done := context.WithTimeout(context.Background(), this.shutdownTimeout)
|
|
||||||
defer done()
|
|
||||||
this.underlying.Shutdown(shutdownCtx)
|
|
||||||
}()
|
|
||||||
return this.underlying.Run()
|
|
||||||
}
|
|
||||||
|
@ -1,90 +0,0 @@
|
|||||||
// Example http demonstrates the usage of [camfish.RunShutdowner] to run an http
|
|
||||||
// server.
|
|
||||||
package main
|
|
||||||
|
|
||||||
import "fmt"
|
|
||||||
import "log"
|
|
||||||
import "iter"
|
|
||||||
import "errors"
|
|
||||||
import "context"
|
|
||||||
import "net/http"
|
|
||||||
import "git.tebibyte.media/sashakoshka/camfish"
|
|
||||||
import "git.tebibyte.media/sashakoshka/go-util/sync"
|
|
||||||
|
|
||||||
func main() {
|
|
||||||
camfish.Run("http",
|
|
||||||
"Example http demonstrates the usage of " +
|
|
||||||
"camfish.RunShutdowner to run an http server",
|
|
||||||
new(httpServer),
|
|
||||||
new(database))
|
|
||||||
}
|
|
||||||
|
|
||||||
// httpServer serves data over http.
|
|
||||||
type httpServer struct {
|
|
||||||
server *http.Server
|
|
||||||
database *database
|
|
||||||
}
|
|
||||||
var _ camfish.RunShutdownable = new(httpServer)
|
|
||||||
var _ camfish.Initializable = new(httpServer)
|
|
||||||
func (this *httpServer) Type() string { return "http-server" }
|
|
||||||
|
|
||||||
func (this *httpServer) Init(ctx context.Context) error {
|
|
||||||
this.server = &http.Server {
|
|
||||||
Addr: "localhost:8080",
|
|
||||||
Handler: this,
|
|
||||||
}
|
|
||||||
if actor, ok := camfish.Find("database").(*database); ok {
|
|
||||||
this.database = actor
|
|
||||||
} else {
|
|
||||||
return errors.New("could not locate database")
|
|
||||||
}
|
|
||||||
return ctx.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (this *httpServer) Run() error {
|
|
||||||
log.Printf("[http] listening on %s", this.server.Addr)
|
|
||||||
err := this.server.ListenAndServe()
|
|
||||||
if errors.Is(err, http.ErrServerClosed) { return nil }
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
func (this *httpServer) Shutdown(ctx context.Context) error {
|
|
||||||
return this.server.Shutdown(ctx)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (this *httpServer) ServeHTTP(res http.ResponseWriter, req *http.Request) {
|
|
||||||
fmt.Fprintf(res, "<!DOCTYPE html><html><head><title>inventory</title></head><body>")
|
|
||||||
fmt.Fprintf(res, "<table><tr><th>Item</th><th>Count</th></tr>")
|
|
||||||
for item, count := range this.database.Inventory() {
|
|
||||||
fmt.Fprintf(res, "<tr><td>%s</td><td>%d</td></tr>", item, count)
|
|
||||||
}
|
|
||||||
fmt.Fprintf(res, "</table>")
|
|
||||||
fmt.Fprintf(res, "</body></html>")
|
|
||||||
}
|
|
||||||
|
|
||||||
// database provides data that can be served.
|
|
||||||
type database struct {
|
|
||||||
inventory usync.RWMonitor[map[string] int]
|
|
||||||
}
|
|
||||||
func (this *database) Type() string { return "database" }
|
|
||||||
|
|
||||||
func (this *database) Init(ctx context.Context) error {
|
|
||||||
this.inventory.Set(map[string] int {
|
|
||||||
"screws": 34,
|
|
||||||
"blood": 90,
|
|
||||||
"paperclips": 5230,
|
|
||||||
"wood": 3,
|
|
||||||
"grains of rice": 238409,
|
|
||||||
})
|
|
||||||
return ctx.Err()
|
|
||||||
}
|
|
||||||
|
|
||||||
func (this *database) Inventory() iter.Seq2[string, int] {
|
|
||||||
return func(yield func(string, int) bool) {
|
|
||||||
inventory, done := this.inventory.RBorrow()
|
|
||||||
defer done()
|
|
||||||
for item, amount := range inventory {
|
|
||||||
yield(item, amount)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
@ -8,16 +8,16 @@ func TestConfig(test *testing.T) {
|
|||||||
"multiple": []string { "item0", "item1" },
|
"multiple": []string { "item0", "item1" },
|
||||||
"empty": []string { },
|
"empty": []string { },
|
||||||
}
|
}
|
||||||
if correct, got := "aslkdjasd", config.Get("single"); correct != got {
|
if correct, got := config.Get("single"), "aslkdjasd"; correct != got {
|
||||||
test.Fatal("not equal:", got)
|
test.Fatal("not equal:", got)
|
||||||
}
|
}
|
||||||
if correct, got := "item0", config.Get("multiple"); correct != got {
|
if correct, got := config.Get("multiple"), "item0"; correct != got {
|
||||||
test.Fatal("not equal:", got)
|
test.Fatal("not equal:", got)
|
||||||
}
|
}
|
||||||
if correct, got := "", config.Get("empty"); correct != got {
|
if correct, got := config.Get("empty"), ""; correct != got {
|
||||||
test.Fatal("not equal:", got)
|
test.Fatal("not equal:", got)
|
||||||
}
|
}
|
||||||
if correct, got := "", config.Get("non-existent"); correct != got {
|
if correct, got := config.Get("non-existent"), ""; correct != got {
|
||||||
test.Fatal("not equal:", got)
|
test.Fatal("not equal:", got)
|
||||||
}
|
}
|
||||||
for index, value := range config.GetAll("single") {
|
for index, value := range config.GetAll("single") {
|
||||||
|
@ -199,7 +199,7 @@ func (this *environment) phase60Initialization() bool {
|
|||||||
initializable = actors.initializable.all()
|
initializable = actors.initializable.all()
|
||||||
}()
|
}()
|
||||||
if err := this.initializeActors(this.ctx, initializable...); err != nil {
|
if err := this.initializeActors(this.ctx, initializable...); err != nil {
|
||||||
log.Println("XXX (60) failed to initialize:", err)
|
log.Println(".// (60) failed to initialize:", err)
|
||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
if this.Verb() { log.Println(".// (60) initialized") }
|
if this.Verb() { log.Println(".// (60) initialized") }
|
||||||
@ -215,10 +215,7 @@ func (this *environment) phase70Running() bool {
|
|||||||
actors, done := this.actors.RBorrow()
|
actors, done := this.actors.RBorrow()
|
||||||
defer done()
|
defer done()
|
||||||
for _, actor := range actors.runnable.all() {
|
for _, actor := range actors.runnable.all() {
|
||||||
this.start(actor.(Actor))
|
this.start(actor)
|
||||||
}
|
|
||||||
for _, actor := range actors.runShutdownable.all() {
|
|
||||||
this.start(actor.(Actor))
|
|
||||||
}
|
}
|
||||||
|
|
||||||
}()
|
}()
|
||||||
|
33
run.go
33
run.go
@ -9,7 +9,7 @@ var env environment
|
|||||||
// when all running actors have stopped. Error and log messages will be printed.
|
// when all running actors have stopped. Error and log messages will be printed.
|
||||||
// The correct way to use this function is to have it be the only thing in main:
|
// The correct way to use this function is to have it be the only thing in main:
|
||||||
//
|
//
|
||||||
// func main() {
|
// func main () {
|
||||||
// camfish.Run("name", "what it does", new(SomeActor), new(AnotherActor))
|
// camfish.Run("name", "what it does", new(SomeActor), new(AnotherActor))
|
||||||
// }
|
// }
|
||||||
//
|
//
|
||||||
@ -50,22 +50,21 @@ var env environment
|
|||||||
// is configurable, but by default it is 8 minutes. The vast majority of
|
// is configurable, but by default it is 8 minutes. The vast majority of
|
||||||
// actors should initialize in under 100 milliseconds.
|
// actors should initialize in under 100 milliseconds.
|
||||||
//
|
//
|
||||||
// 70. Running: Actors which implement [Runnable] or [RunShutdownable] are
|
// 70. Running: Actors which implement [Runnable] are run, each in their own
|
||||||
// run, each in their own goroutine. The environment is able to restart
|
// goroutine. The environment is able to restart actors which have failed,
|
||||||
// actors which have failed, which entails resetting the actor if it
|
// which entails resetting the actor if it implements [Resettable], and
|
||||||
// implements [Resettable], and running the actor again within the same
|
// running the actor again within the same goroutine. If an actor does not
|
||||||
// goroutine. If an actor does not run for a meaningful amount of time
|
// run for a meaningful amount of time after resetting/initialization
|
||||||
// after resetting/initialization before failing, it is considered erratic
|
// before failing, it is considered erratic and further attempts to restart
|
||||||
// and further attempts to restart it will be spaced by a limited,
|
// it will be spaced by a limited, constantly increasing time interval. The
|
||||||
// constantly increasing time interval. The timing is configurable, but by
|
// timing is configurable, but by default the threshold for a meaningful
|
||||||
// default the threshold for a meaningful amount of runtime is 16 seconds,
|
// amount of runtime is 16 seconds, the initial delay interval is 8
|
||||||
// the initial delay interval is 8 seconds, the interval increase per
|
// seconds, the interval increase per attempt is 8 seconds, and the maximum
|
||||||
// attempt is 8 seconds, and the maximum interval is one hour.
|
// interval is one hour. Additionally, programs which implement [Trimmable]
|
||||||
// Additionally, programs which implement [Trimmable] will be trimmed
|
// will be trimmed regularly whenever they are running. The trimming
|
||||||
// regularly whenever they are running. The trimming interval is also
|
// interval is also configurable, but by default it is once every minute.
|
||||||
// configurable, but by default it is once every minute. When an actor
|
// When an actor which implements [Resettable] is reset, it is given a
|
||||||
// which implements [Resettable] is reset, it is given a configurable
|
// configurable timeout, which is 8 minutes by default.
|
||||||
// timeout, which is 8 minutes by default.
|
|
||||||
//
|
//
|
||||||
// 80. Shutdown: This can be triggered by all actors being removed from the
|
// 80. Shutdown: This can be triggered by all actors being removed from the
|
||||||
// environment, a catastrophic error, [Done] being called, or the program
|
// environment, a catastrophic error, [Done] being called, or the program
|
||||||
|
17
util.go
17
util.go
@ -19,22 +19,7 @@ func defaul[T comparable](value, def T) T {
|
|||||||
return value
|
return value
|
||||||
}
|
}
|
||||||
|
|
||||||
func panicWrap(f func() error) (err error) {
|
func panicWrap(ctx context.Context, f func (context.Context) error) (err error) {
|
||||||
defer func () {
|
|
||||||
if pan := recover(); pan != nil {
|
|
||||||
if panErr, ok := pan.(error); ok {
|
|
||||||
err = panErr
|
|
||||||
} else {
|
|
||||||
err = errors.New(fmt.Sprint(pan))
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} ()
|
|
||||||
|
|
||||||
err = f()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
func panicWrapCtx(ctx context.Context, f func(context.Context) error) (err error) {
|
|
||||||
defer func () {
|
defer func () {
|
||||||
if pan := recover(); pan != nil {
|
if pan := recover(); pan != nil {
|
||||||
if panErr, ok := pan.(error); ok {
|
if panErr, ok := pan.(error); ok {
|
||||||
|
18
util_test.go
18
util_test.go
@ -17,24 +17,6 @@ func TestDefaul(test *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestPanicWrap(test *testing.T) {
|
func TestPanicWrap(test *testing.T) {
|
||||||
err := panicWrap(func (ctx context.Context) error {
|
|
||||||
return errors.New("test case 0")
|
|
||||||
})
|
|
||||||
test.Log(err)
|
|
||||||
if err.Error() != "test case 0" { test.Fatal("not equal") }
|
|
||||||
err = panicWrap(func (ctx context.Context) error {
|
|
||||||
panic(errors.New("test case 1"))
|
|
||||||
})
|
|
||||||
test.Log(err)
|
|
||||||
if err.Error() != "test case 1" { test.Fatal("not equal") }
|
|
||||||
err = panicWrap( func (ctx context.Context) error {
|
|
||||||
return nil
|
|
||||||
})
|
|
||||||
test.Log(err)
|
|
||||||
if err != nil { test.Fatal("not equal") }
|
|
||||||
}
|
|
||||||
|
|
||||||
func TestPanicWrapCtx(test *testing.T) {
|
|
||||||
err := panicWrap(context.Background(), func (ctx context.Context) error {
|
err := panicWrap(context.Background(), func (ctx context.Context) error {
|
||||||
return errors.New("test case 0")
|
return errors.New("test case 0")
|
||||||
})
|
})
|
||||||
|
Loading…
Reference in New Issue
Block a user