Compare commits
29 Commits
| Author | SHA1 | Date | |
|---|---|---|---|
| a03f6a70a4 | |||
| 0a4cc1143e | |||
| a58b5932e7 | |||
| 5d25b3fb9a | |||
| f97f5010e2 | |||
| 8f8d2e13b3 | |||
| 4cef5df83e | |||
| 32c8e7f7c3 | |||
| f0611e53ce | |||
| c0064323d8 | |||
| eca2b35057 | |||
| d34af2c4ee | |||
| e21cd9ed11 | |||
| 70dc9702bd | |||
| 613e21597b | |||
| 126374fbac | |||
| 02f06d857e | |||
| 3d25441e7a | |||
| 735d314a19 | |||
| 4f4c7a0627 | |||
| 461d0b77e9 | |||
| 861d6af1d7 | |||
| ac2db05d06 | |||
| f4904884ad | |||
| 7d0620fe3e | |||
| 3115c5feef | |||
| 10ca4f4671 | |||
| e0c8825949 | |||
| b12ffdb0a0 |
53
actor.go
53
actor.go
@@ -3,20 +3,40 @@ package camfish
|
||||
import "context"
|
||||
|
||||
// Actor is a participant in the environment. All public methods on an actor
|
||||
// must be safe for concurrent use by multiple goroutines. Additionally, any
|
||||
// type which explicitly implements Actor should:
|
||||
// should be safe for concurrent use by multiple goroutines except for AddFlags,
|
||||
// Init, Configure, and ProcessConfig. Additionally, any type which explicitly
|
||||
// implements Actor should:
|
||||
//
|
||||
// - Treat all public fields, values, indices, etc. as immutable
|
||||
// - Satisfy Actor as a pointer, not a value
|
||||
// - Not have a constructor
|
||||
//
|
||||
// The CAMFISH environment will use interfaces in this package to probe actors
|
||||
// for methods. If an actor is supposed to fulfill one of these interfaces, this
|
||||
// should be enforced at compile-time by assigning the actor to an anonymous
|
||||
// global variable of that interface type. For instance, this line will ensure
|
||||
// that SomeActor fulfills [Resettable]:
|
||||
//
|
||||
// var _ camfish.Resettable = new(SomeActor)
|
||||
type Actor interface {
|
||||
// Type returns the type name of the actor. The value returned from this
|
||||
// is used to locate actors capable of performing a specific task, so it
|
||||
// absolutely must return the same string every time. Actors implemented
|
||||
// in packages besides this one (i.e. not camfish) must not return the
|
||||
// string "cron".
|
||||
// Type returns the "type name" of the actor. The value returned from
|
||||
// this is used to locate actors capable of performing a specific task,
|
||||
// so it absolutely must return the same string every time. It is
|
||||
// usually best to have this be unique to each actor. Actors implemented
|
||||
// in packages other than this one
|
||||
// (git.tebibyte.media/sashakoshka/camfish) must not return the string
|
||||
// "cron".
|
||||
Type() string
|
||||
}
|
||||
|
||||
// Named is any object with a name.
|
||||
type Named interface {
|
||||
// Name returns the name. This doesn't need to be the same as Type. It
|
||||
// must return the same string every time. It is used to differentiate
|
||||
// actors of the same type in logs.
|
||||
Name() string
|
||||
}
|
||||
|
||||
// FlagAdder is any object that can add [Flag]s to a [FlagSet]. Actors which
|
||||
// implement this interface will be called upon to add flags during and only
|
||||
// during the flag parsing phase.
|
||||
@@ -107,3 +127,22 @@ type RunShutdownable interface {
|
||||
// should be shut down.
|
||||
Shutdown(ctx context.Context) error
|
||||
}
|
||||
|
||||
// Cleanupable is any object that must be cleaned up after it has stopped for
|
||||
// good. Actors which implement this interface will be cleaned up after they
|
||||
// are deleted from the environment.
|
||||
type Cleanupable interface {
|
||||
Cleanup(ctx context.Context) error
|
||||
}
|
||||
|
||||
// MainRunnable is any object with a function that must be bound to the main
|
||||
// thread. Only one actor may implement this at a time, and it must have been
|
||||
// added at the start of the environment as an argument to the run function.
|
||||
type MainRunnable interface {
|
||||
// RunMain is run in the main thread and must stop once ShutdownMain
|
||||
// is called.
|
||||
RunMain() error
|
||||
// ShutdownMain is like [RunShutdownable.Shutdown], but unblocks RunMain
|
||||
// instead of Run.
|
||||
ShutdownMain(ctx context.Context) error
|
||||
}
|
||||
|
||||
133
actors/http.go
Normal file
133
actors/http.go
Normal file
@@ -0,0 +1,133 @@
|
||||
package http
|
||||
|
||||
import "fmt"
|
||||
import "log"
|
||||
import "errors"
|
||||
import "context"
|
||||
import "net/http"
|
||||
import cf "git.tebibyte.media/sashakoshka/camfish"
|
||||
|
||||
var _ cf.Actor = new(HTTP)
|
||||
var _ cf.RunShutdownable = new(HTTP)
|
||||
var _ cf.Configurable = new(HTTP)
|
||||
var _ cf.Initializable = new(HTTP)
|
||||
|
||||
// HTTP is an actor providing an HTTP server. Its configuration options are
|
||||
// as follows:
|
||||
//
|
||||
// - http.address: The host:port to serve on
|
||||
// - http.cert-file: The location of the public TLS/SSL certificate
|
||||
// - http.key-file: The location of the private TLS/SSL key
|
||||
//
|
||||
// If neither cert-file nor key-file are specified, the actor will serve
|
||||
// over HTTPS instead of HTTP.
|
||||
type HTTP struct {
|
||||
// Typ determines the actor's type. If empty, it defaults to "http".
|
||||
// Once the actor has been used in any way at all, this field must
|
||||
// never be modified.
|
||||
Typ string
|
||||
// Handler is used to handle HTTP requests. If nil, the server will
|
||||
// respond with a "404 Not found" error.
|
||||
// Once the actor has been used in any way at all, this field must
|
||||
// never be modified.
|
||||
Handler http.Handler
|
||||
// DefaultAddr specifies the default address to serve on if none is
|
||||
// specified. This itself defaults to localhost:8080.
|
||||
DefaultAddr
|
||||
|
||||
server *http.Server
|
||||
address string
|
||||
certFile string
|
||||
keyFile string
|
||||
}
|
||||
|
||||
// Type returns "http", or this.Typ if specified.
|
||||
func (this *HTTP) Type() string {
|
||||
if this.Typ == "" {
|
||||
return "http"
|
||||
}
|
||||
return this.Typ
|
||||
}
|
||||
|
||||
// Configure configures the actor.
|
||||
func (this *HTTP) Configure(conf cf.Config) error {
|
||||
{
|
||||
value := conf.Get(this.Type() + ".address")
|
||||
if value == "" {
|
||||
if this.DefaultAddr == "" {
|
||||
value = "localhost:8080"
|
||||
} else {
|
||||
value = this.DefaultAddr
|
||||
}
|
||||
this.address = value
|
||||
}
|
||||
{
|
||||
certFile := this.Type() + "cert-file"
|
||||
keyFile := this.Type() + "key-file"
|
||||
this.certFile = conf.Get(certFile)
|
||||
this.keyFile = conf.Get(keyFile)
|
||||
if this.certFile == "" && this.keyFile != "" ||
|
||||
this.certFile != "" && this.keyFile == "" {
|
||||
return fmt.Errorf(
|
||||
"both %s and %s http.key-file must be specified, or neither",
|
||||
this.certFile,
|
||||
this.keyFile)
|
||||
}
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// Init initializes the actor.
|
||||
func (this *HTTP) Init(ctx context.Context) error {
|
||||
this.server = &http.Server {
|
||||
Addr: this.address,
|
||||
Handler: this.Handler,
|
||||
}
|
||||
return ctx.Err()
|
||||
}
|
||||
|
||||
// Run runs the actor.
|
||||
func (this *HTTP) Run() error {
|
||||
log.Printf("(i) [%s] listening on %s", this.Type(), this.address)
|
||||
err := this.server.ListenAndServe()
|
||||
if errors.Is(err, http.ErrServerClosed) { return nil }
|
||||
return err
|
||||
}
|
||||
|
||||
// Shutdown shuts down the actor.
|
||||
func (this *HTTP) Shutdown(ctx context.Context) error {
|
||||
return this.server.Shutdown(ctx)
|
||||
}
|
||||
|
||||
var _ cf.Actor = new(HTTPServer)
|
||||
var _ cf.RunShutdownable = new(HTTPServer)
|
||||
var _ cf.Initializable = new(HTTPServer)
|
||||
|
||||
// HTTPServer lets you use a pre-existing [http.Server] as an actor. It has
|
||||
// no configuration or anything.
|
||||
type HTTPServer struct {
|
||||
// Server must be non-nil.
|
||||
*HTTP.Server
|
||||
|
||||
// Typ determines the actor's type. If empty, it defaults to "http".
|
||||
// Once the actor has been used in any way at all, this field must
|
||||
// never be modified.
|
||||
Typ string
|
||||
}
|
||||
|
||||
|
||||
// Type returns "http", or this.Typ if specified.
|
||||
func (this *HTTPServer) Type() string {
|
||||
if this.Typ == "" {
|
||||
return "http"
|
||||
}
|
||||
return this.Typ
|
||||
}
|
||||
|
||||
// Run runs the actor.
|
||||
func (this *HTTPServer) Run() error {
|
||||
log.Printf("(i) [%s] listening on %s", this.Type(), this.Addr)
|
||||
err := this.server.ListenAndServe()
|
||||
if errors.Is(err, http.ErrServerClosed) { return nil }
|
||||
return err
|
||||
}
|
||||
43
actorset.go
43
actorset.go
@@ -35,6 +35,8 @@ type actorInfo struct {
|
||||
// incremented automatically for each actor that is added. It is never
|
||||
// less than one.
|
||||
order int
|
||||
// initial is true if this actor was added as an argument to [Run].
|
||||
initial bool
|
||||
}
|
||||
|
||||
// actorSetIface holds only the add/del/clear methods of actorSet.
|
||||
@@ -60,23 +62,12 @@ func (sets *actorSets) All() iter.Seq[actorSetIface] {
|
||||
|
||||
// add adds an actor under the given parent context. This is a write operation.
|
||||
func (this *actorSets) add(ctx context.Context, actor Actor) {
|
||||
if this.inf == nil { this.inf = make(map[Actor] actorInfo)}
|
||||
actorCtx, done := context.WithCancel(ctx)
|
||||
this.nextOrder ++
|
||||
info := actorInfo {
|
||||
ctx: actorCtx,
|
||||
done: done,
|
||||
order: this.nextOrder,
|
||||
}
|
||||
_, isRunnable := actor.(Runnable)
|
||||
_, isRunShutdownable := actor.(RunShutdownable)
|
||||
if isRunnable || isRunShutdownable {
|
||||
info.stopped = make(chan struct { })
|
||||
}
|
||||
this.inf[actor] = info
|
||||
for set := range this.All() {
|
||||
set.add(actor)
|
||||
}
|
||||
this.addInternal(actorInfo { }, ctx, actor)
|
||||
}
|
||||
|
||||
// addInitial is like add, but marks the actor as initial. This is a write operation.
|
||||
func (this *actorSets) addInitial(ctx context.Context, actor Actor) {
|
||||
this.addInternal(actorInfo { initial: true }, ctx, actor)
|
||||
}
|
||||
|
||||
// del removes an actor. This is a write operation.
|
||||
@@ -101,6 +92,24 @@ func (this *actorSets) info(actor Actor) actorInfo {
|
||||
return this.inf[actor]
|
||||
}
|
||||
|
||||
func (this *actorSets) addInternal(inf actorInfo, ctx context.Context, actor Actor) {
|
||||
if this.inf == nil { this.inf = make(map[Actor] actorInfo)}
|
||||
actorCtx, done := context.WithCancel(ctx)
|
||||
this.nextOrder ++
|
||||
inf.ctx = actorCtx
|
||||
inf.done = done
|
||||
inf.order = this.nextOrder
|
||||
_, isRunnable := actor.(Runnable)
|
||||
_, isRunShutdownable := actor.(RunShutdownable)
|
||||
if isRunnable || isRunShutdownable {
|
||||
inf.stopped = make(chan struct { })
|
||||
}
|
||||
this.inf[actor] = inf
|
||||
for set := range this.All() {
|
||||
set.add(actor)
|
||||
}
|
||||
}
|
||||
|
||||
// sortActors sorts actors according to the order in which they were added.
|
||||
func sortActors[T comparable] (sets *actorSets, actors []T) []T {
|
||||
slices.SortFunc(actors, func (left, right T) int {
|
||||
|
||||
4
cron.go
4
cron.go
@@ -7,6 +7,7 @@ import "context"
|
||||
// tasks on time intervals.
|
||||
type cron struct {
|
||||
trimFunc func() bool
|
||||
fastTiming bool
|
||||
timing struct {
|
||||
trimInterval time.Duration
|
||||
}
|
||||
@@ -28,6 +29,9 @@ func (this *cron) Configure (config Config) error {
|
||||
}
|
||||
this.timing.trimInterval = value
|
||||
}
|
||||
if this.fastTiming {
|
||||
this.timing.trimInterval = time.Second * 10
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
|
||||
167
environment.go
167
environment.go
@@ -8,6 +8,7 @@ import "sync"
|
||||
import "time"
|
||||
import "errors"
|
||||
import "context"
|
||||
import "runtime"
|
||||
import "sync/atomic"
|
||||
import "golang.org/x/sync/errgroup"
|
||||
import "git.tebibyte.media/sashakoshka/go-util/sync"
|
||||
@@ -19,9 +20,11 @@ const defaultRestartInitialInterval = 8 * time.Second
|
||||
const defaultRestartInitialIncrement = 8 * time.Second
|
||||
const defaultRestartInitialMaximum = 1 * time.Hour
|
||||
const defaultResetTimeout = 8 * time.Minute
|
||||
const defaultCleanupTimeout = 1 * time.Minute
|
||||
const defaultTrimInterval = 1 * time.Minute
|
||||
const defaultTrimTimeout = 1 * time.Minute
|
||||
const defaultShutdownTimeout = 8 * time.Minute
|
||||
const defaultSigintTimeout = 16 * time.Minute
|
||||
|
||||
// environment is an object which handles requests by package-level functions.
|
||||
// It is only a separate object for testing purposes.
|
||||
@@ -29,10 +32,13 @@ type environment struct {
|
||||
name string
|
||||
description string
|
||||
actors usync.RWMonitor[*actorSets]
|
||||
main MainRunnable
|
||||
ctx context.Context
|
||||
done context.CancelCauseFunc
|
||||
group sync.WaitGroup
|
||||
noneLeft chan struct { }
|
||||
conf MutableConfig
|
||||
cron *cron
|
||||
|
||||
// flags stores information from built-in flags.
|
||||
flags struct {
|
||||
@@ -41,6 +47,9 @@ type environment struct {
|
||||
logDirectory string
|
||||
configFile string
|
||||
verbose bool
|
||||
crash bool
|
||||
crashOnError bool
|
||||
fastTiming bool
|
||||
}
|
||||
|
||||
// running stores whether the environment is currently running.
|
||||
@@ -58,8 +67,10 @@ type environment struct {
|
||||
restartIntervalIncrement atomicDuration
|
||||
restartIntervalMaximum atomicDuration
|
||||
resetTimeout atomicDuration
|
||||
cleanupTimeout atomicDuration
|
||||
trimTimeout atomicDuration
|
||||
shutdownTimeout atomicDuration
|
||||
sigintTimeout atomicDuration
|
||||
}
|
||||
}
|
||||
|
||||
@@ -72,16 +83,19 @@ func (this *environment) Run(name, description string, actors ...Actor) {
|
||||
|
||||
this.ctx, this.done = context.WithCancelCause(context.Background())
|
||||
defer this.done(nil)
|
||||
daemon.OnSigint(func() { this.done(ErrProcessKilled) })
|
||||
daemon.OnSigint(this.handleSigint)
|
||||
defer log.SetOutput(os.Stderr)
|
||||
|
||||
this.name = name
|
||||
this.description = description
|
||||
this.actors = usync.NewRWMonitor(&actorSets { })
|
||||
this.addToSets(actors...)
|
||||
this.addToSets(&cron {
|
||||
this.cron = &cron {
|
||||
trimFunc: this.phase70_5Trimming,
|
||||
})
|
||||
}
|
||||
this.addToSetsInitial(actors...)
|
||||
this.addToSetsInitial(this.cron)
|
||||
|
||||
this.noneLeft = make(chan struct { })
|
||||
|
||||
if !this.phase10FlagParsing() { os.Exit(2) }
|
||||
if !this.phase13PidFileCreation() { os.Exit(1) }
|
||||
@@ -103,6 +117,21 @@ func (this *environment) Done(cause error) {
|
||||
// Add implements the package-level function [Add].
|
||||
func (this *environment) Add(ctx context.Context, actors ...Actor) error {
|
||||
this.addToSets(actors...)
|
||||
cleanUp := func() {
|
||||
this.delFromSets(actors...)
|
||||
}
|
||||
|
||||
for _, actor := range actors {
|
||||
if actor, ok := actor.(Configurable); ok {
|
||||
err := actor.Configure(this.conf)
|
||||
if err != nil {
|
||||
cleanUp()
|
||||
return fmt.Errorf (
|
||||
"could not apply configuration to %s: %w",
|
||||
actor.(Actor).Type(), err)
|
||||
}
|
||||
}
|
||||
}
|
||||
initializable := make([]Initializable, 0, len(actors))
|
||||
for _, actor := range actors {
|
||||
if actor, ok := actor.(Initializable); ok {
|
||||
@@ -110,16 +139,12 @@ func (this *environment) Add(ctx context.Context, actors ...Actor) error {
|
||||
}
|
||||
}
|
||||
err := this.initializeActors(ctx, initializable...)
|
||||
if err != nil { return err }
|
||||
for _, actor := range actors {
|
||||
if actor, ok := actor.(Configurable); ok {
|
||||
err := actor.Configure(this.conf)
|
||||
if err != nil {
|
||||
return fmt.Errorf (
|
||||
"could not apply configuration to %s: %w",
|
||||
actor.(Actor).Type(), err)
|
||||
}
|
||||
if err != nil {
|
||||
if this.flags.crashOnError {
|
||||
panic(fmt.Sprint(err))
|
||||
}
|
||||
cleanUp()
|
||||
return err
|
||||
}
|
||||
for _, actor := range actors {
|
||||
_, isRunnable := actor.(Runnable)
|
||||
@@ -136,11 +161,15 @@ func (this *environment) Del(ctx context.Context, actors ...Actor) error {
|
||||
channels := []<- chan struct { } { }
|
||||
for _, actor := range actors {
|
||||
info := this.info(actor)
|
||||
if info.done != nil {
|
||||
if info.stopped != nil {
|
||||
channels = append(channels, info.stopped)
|
||||
}
|
||||
if info.done != nil {
|
||||
info.done()
|
||||
}
|
||||
}
|
||||
for _, channel := range channels {
|
||||
if channel == nil { continue }
|
||||
select {
|
||||
case <- channel:
|
||||
case <- ctx.Done():
|
||||
@@ -195,6 +224,15 @@ func (this *environment) addToSets(actors ...Actor) {
|
||||
}
|
||||
}
|
||||
|
||||
// addToSetsInitial adds the actors to the actorSets in a thread-safe manner.
|
||||
func (this *environment) addToSetsInitial(actors ...Actor) {
|
||||
thisActors, done := this.actors.Borrow()
|
||||
defer done()
|
||||
for _, actor := range actors {
|
||||
thisActors.addInitial(this.ctx, actor)
|
||||
}
|
||||
}
|
||||
|
||||
// delFromSets deletes the actors from the actorSets in a thread-safe manner.
|
||||
func (this *environment) delFromSets(actors ...Actor) {
|
||||
thisActors, done := this.actors.Borrow()
|
||||
@@ -220,17 +258,37 @@ func (this *environment) start(actor Actor) {
|
||||
go this.run(actor)
|
||||
}
|
||||
|
||||
// run runs the given actor, restarting it if it fails. This function will exit
|
||||
// when the actor's context is canceled. The actor will be removed from the
|
||||
// run runs the given actor. This function will exit
|
||||
// when the actor's context is canceled, or the actor has stopped or exited.
|
||||
// The actor will be removed from the
|
||||
// environment once this function exits, and the environment's wait group
|
||||
// counter will be decremented. note that this function will never increment the
|
||||
// wait group counter, so start should usually be used instead.
|
||||
func (this *environment) run(actor Actor) {
|
||||
typ := actor.Type()
|
||||
|
||||
// clean up when done
|
||||
defer this.group.Done()
|
||||
defer func() {
|
||||
this.group.Done()
|
||||
this.delFromSets(actor)
|
||||
if actor, ok := actor.(Cleanupable); ok {
|
||||
ctx, done := context.WithTimeout(
|
||||
context.Background(),
|
||||
defaul(
|
||||
this.timing.cleanupTimeout.Load(),
|
||||
defaultCleanupTimeout))
|
||||
defer done()
|
||||
err := actor.Cleanup(ctx)
|
||||
if err != nil {
|
||||
log.Printf("XXX [%s] failed to cleanup: %v", typ, err)
|
||||
if this.flags.crashOnError {
|
||||
panic(fmt.Sprint(err))
|
||||
}
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// logging
|
||||
typ := actor.Type()
|
||||
if this.Verb() { log.Printf("(i) [%s] running", typ) }
|
||||
var stopErr error
|
||||
var exited bool
|
||||
@@ -242,14 +300,21 @@ func (this *environment) run(actor Actor) {
|
||||
if this.Verb() { log.Printf("(i) [%s] stopped", typ) }
|
||||
}
|
||||
} else {
|
||||
log.Printf("!!! [%s] stopped with error: %v", typ, stopErr)
|
||||
log.Printf("XXX [%s] stopped with error: %v", typ, stopErr)
|
||||
if this.flags.crashOnError {
|
||||
panic(fmt.Sprint(stopErr))
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
// contains context information
|
||||
info := this.info(actor)
|
||||
ctx := info.ctx
|
||||
defer close(info.stopped)
|
||||
defer func() {
|
||||
if info.stopped != nil {
|
||||
close(info.stopped)
|
||||
}
|
||||
}()
|
||||
|
||||
switch actor := actor.(type) {
|
||||
case Runnable:
|
||||
@@ -282,7 +347,12 @@ func (this *environment) runRunnable(ctx context.Context, actor Runnable) (stopE
|
||||
for {
|
||||
// run actor
|
||||
lastStart := time.Now()
|
||||
err := panicWrapCtx(ctx, actor.Run)
|
||||
var err error
|
||||
if this.flags.crash {
|
||||
err = actor.Run(ctx)
|
||||
} else {
|
||||
err = panicWrapCtx(ctx, actor.Run)
|
||||
}
|
||||
|
||||
// detect context cancellation
|
||||
if ctxErr := ctx.Err(); ctxErr != nil {
|
||||
@@ -298,12 +368,28 @@ func (this *environment) runRunnable(ctx context.Context, actor Runnable) (stopE
|
||||
return
|
||||
} else {
|
||||
// failure
|
||||
log.Printf("XXX [%s] failed", typ)
|
||||
if this.flags.crashOnError {
|
||||
panic(fmt.Sprint(err))
|
||||
}
|
||||
|
||||
// if an actor isn't resettable, don't reset
|
||||
// or restart it
|
||||
if _, ok := actor.(Resettable); !ok {
|
||||
stopErr = err
|
||||
return
|
||||
}
|
||||
|
||||
// print the failure message here because we won't be
|
||||
// returning to run().
|
||||
log.Printf("XXX [%s] failed: %v", typ, err)
|
||||
}
|
||||
|
||||
// restart logic
|
||||
if time.Since(lastStart) < restartThreshold {
|
||||
log.Printf("!!! [%s] failed too soon, restarting in %v", typ, restartInterval)
|
||||
if this.flags.crashOnError {
|
||||
panic("failed too soon")
|
||||
}
|
||||
timer := time.NewTimer(restartInterval)
|
||||
select {
|
||||
case <- timer.C:
|
||||
@@ -320,7 +406,8 @@ func (this *environment) runRunnable(ctx context.Context, actor Runnable) (stopE
|
||||
restartInterval = restartInitialInterval
|
||||
}
|
||||
|
||||
// reset if needed
|
||||
// reset if needed. this condition will always be true because
|
||||
// this code path is restricted to Resettable anyway
|
||||
if actor, ok := actor.(Resettable); ok {
|
||||
if this.Verb() { log.Printf("... [%s] resetting", typ) }
|
||||
func() {
|
||||
@@ -329,6 +416,9 @@ func (this *environment) runRunnable(ctx context.Context, actor Runnable) (stopE
|
||||
err := actor.Reset(ctx)
|
||||
if err != nil {
|
||||
log.Printf("XXX [%s] failed to reset", typ)
|
||||
if this.flags.crashOnError {
|
||||
panic("failed to reset")
|
||||
}
|
||||
}
|
||||
}()
|
||||
if this.Verb() { log.Printf(".// [%s] reset", typ) }
|
||||
@@ -388,6 +478,7 @@ func (this *environment) applyConfig() error {
|
||||
}
|
||||
return nil
|
||||
}
|
||||
// TODO: trim interval
|
||||
err := parseDuration("init-timeout", &this.timing.initTimeout)
|
||||
if err != nil { return err }
|
||||
err = parseDuration("restart-threshold", &this.timing.restartThreshold)
|
||||
@@ -400,13 +491,43 @@ func (this *environment) applyConfig() error {
|
||||
if err != nil { return err }
|
||||
err = parseDuration("reset-timeout", &this.timing.resetTimeout)
|
||||
if err != nil { return err }
|
||||
err = parseDuration("cleanup-timeout", &this.timing.cleanupTimeout)
|
||||
if err != nil { return err }
|
||||
err = parseDuration("trim-timeout", &this.timing.trimTimeout)
|
||||
if err != nil { return err }
|
||||
err = parseDuration("shutdown-timeout", &this.timing.shutdownTimeout)
|
||||
if err != nil { return err }
|
||||
err = parseDuration("sigint-timeout", &this.timing.sigintTimeout)
|
||||
if err != nil { return err }
|
||||
|
||||
if this.flags.fastTiming {
|
||||
this.timing.shutdownTimeout.Store(time.Second * 10)
|
||||
this.timing.sigintTimeout.Store(time.Second * 12)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// handleSigint is called when the program receives a SIGINT signal.
|
||||
func (this *environment) handleSigint() {
|
||||
this.done(ErrProcessKilled)
|
||||
go func() {
|
||||
time.Sleep(defaul(this.timing.sigintTimeout.Load(), defaultSigintTimeout))
|
||||
this.emergencyHalt("final SIGNINT deadline expired")
|
||||
}()
|
||||
}
|
||||
|
||||
func (this *environment) emergencyHalt(reason string) {
|
||||
log.Printf("XXX (80) %s, performing emergency halt", reason)
|
||||
if Verb() || this.flags.crashOnError {
|
||||
dumpBuffer := make([]byte, 8192)
|
||||
runtime.Stack(dumpBuffer, true)
|
||||
log.Printf("XXX (80) stack trace of all goroutines:\n%s", dumpBuffer)
|
||||
}
|
||||
log.Printf("====== [%s] END =======", this.name)
|
||||
os.Exit(1)
|
||||
}
|
||||
|
||||
type runShutdownableShim struct {
|
||||
underlying RunShutdownable
|
||||
shutdownTimeout time.Duration
|
||||
|
||||
41
examples/panic/main.go
Normal file
41
examples/panic/main.go
Normal file
@@ -0,0 +1,41 @@
|
||||
// Example panic demonstrates how the environment can restart actors if they
|
||||
// fail.
|
||||
package main
|
||||
|
||||
import "log"
|
||||
import "time"
|
||||
import "errors"
|
||||
import "context"
|
||||
import "math/rand"
|
||||
import "git.tebibyte.media/sashakoshka/camfish"
|
||||
|
||||
func main() {
|
||||
camfish.Run("panic",
|
||||
"Example panic demonstrates how the environment can restart " +
|
||||
"actors if they fail",
|
||||
new(actor))
|
||||
}
|
||||
|
||||
// actor is an incorrectly implemented actor that panics and errs randomly.
|
||||
type actor struct { }
|
||||
var _ camfish.Runnable = new(actor)
|
||||
var _ camfish.Resettable = new(actor)
|
||||
func (this *actor) Type() string { return "panic" }
|
||||
|
||||
func (this *actor) Run(ctx context.Context) error {
|
||||
log.Println("(i) [panic] panicking in 10 seconds")
|
||||
select {
|
||||
case <- ctx.Done(): return ctx.Err()
|
||||
case <- time.After(time.Second * 10):
|
||||
if rand.Int() % 2 == 0 {
|
||||
panic("this is a panic")
|
||||
} else {
|
||||
return errors.New("this is an error")
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (this *actor) Reset(ctx context.Context) error {
|
||||
log.Println("(i) [panic] here is where we would reset the actor")
|
||||
return ctx.Err()
|
||||
}
|
||||
4
ini.go
4
ini.go
@@ -162,7 +162,7 @@ func configFiles(program string) ([]string, error) {
|
||||
userConfig, err := os.UserConfigDir()
|
||||
if err != nil { return nil, err }
|
||||
return []string {
|
||||
filepath.Join("/etc", program),
|
||||
filepath.Join(userConfig, program),
|
||||
filepath.Join("/etc", program, program + ".conf"),
|
||||
filepath.Join(userConfig, program, program + ".conf"),
|
||||
}, nil
|
||||
}
|
||||
|
||||
109
phases.go
109
phases.go
@@ -6,6 +6,7 @@ import "log"
|
||||
import "io/fs"
|
||||
import "errors"
|
||||
import "context"
|
||||
import "runtime"
|
||||
import "strings"
|
||||
import "path/filepath"
|
||||
import "git.tebibyte.media/sashakoshka/go-cli"
|
||||
@@ -18,12 +19,15 @@ func (this *environment) phase10FlagParsing() bool {
|
||||
name: this.name,
|
||||
description: this.description,
|
||||
}
|
||||
flagHelp := set.Flag('h', "help", "Display usage information and exit", nil)
|
||||
flagPidFile := set.Flag('p', "pid-file", "Write the PID to the specified file", cli.ValString)
|
||||
flagUser := set.Flag('u', "user", "The user:group to run as", cli.ValString)
|
||||
flagLogDirectory := set.Flag('l', "log-directory", "Write logs to the specified directory", cli.ValString)
|
||||
flagConfigFile := set.Flag('c', "config-file", "Use this configuration file", cli.ValString)
|
||||
flagVerbose := set.Flag('v', "verbose", "Enable verbose output/logging", nil)
|
||||
flagHelp := set.Flag('h', "help", "Display usage information and exit", nil)
|
||||
flagPidFile := set.Flag('p', "pid-file", "Write the PID to the specified file", cli.ValString)
|
||||
flagUser := set.Flag('u', "user", "The user:group to run as", cli.ValString)
|
||||
flagLogDirectory := set.Flag('l', "log-directory", "Write logs to the specified directory", cli.ValString)
|
||||
flagConfigFile := set.Flag('c', "config-file", "Use this configuration file", cli.ValString)
|
||||
flagVerbose := set.Flag('v', "verbose", "(debug) Enable verbose output/logging", nil)
|
||||
flagCrash := set.Flag(0, "crash", "(debug) Crash when an actor panics", nil)
|
||||
flagCrashOnError := set.Flag(0, "crash-on-error", "(debug) Crash when an actor experiences any error", nil)
|
||||
flagFastTiming := set.Flag(0, "fast-timing", "(debug) Make timed things happen faster/more often", nil)
|
||||
|
||||
// ask actors to add flags
|
||||
actors, done := this.actors.RBorrow()
|
||||
@@ -60,6 +64,17 @@ func (this *environment) phase10FlagParsing() bool {
|
||||
if _, ok := flagVerbose.First(); ok {
|
||||
this.flags.verbose = true
|
||||
}
|
||||
if _, ok := flagCrash.First(); ok {
|
||||
this.flags.crash = true
|
||||
}
|
||||
if _, ok := flagCrashOnError.First(); ok {
|
||||
this.flags.crash = true
|
||||
this.flags.crashOnError = true
|
||||
}
|
||||
if _, ok := flagFastTiming.First(); ok {
|
||||
this.flags.fastTiming = true
|
||||
this.cron.fastTiming = true
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
@@ -128,6 +143,12 @@ func (this *environment) phase30ConfigurationParsing() bool {
|
||||
if this.flags.configFile != "" {
|
||||
paths = append(paths, this.flags.configFile)
|
||||
}
|
||||
if this.Verb() {
|
||||
log.Println("(i) (30) have configuration files:")
|
||||
for _, paths := range paths {
|
||||
log.Println("(i) (30) -", paths)
|
||||
}
|
||||
}
|
||||
// parse every config and merge them all
|
||||
configs := make([]iniConfig, 0, len(paths))
|
||||
for _, path := range paths {
|
||||
@@ -178,6 +199,8 @@ func (this *environment) phase50ConfigurationApplication() bool {
|
||||
actors, done := this.actors.RBorrow()
|
||||
defer done()
|
||||
for _, actor := range sortActors(actors, actors.configurable.all()) {
|
||||
if !actors.info(actor.(Actor)).initial { continue }
|
||||
if this.Verb() { log.Printf ("... (50) applying configuration to %s", actor.(Actor).Type())}
|
||||
err := actor.Configure(this.conf)
|
||||
if err != nil {
|
||||
log.Printf (
|
||||
@@ -192,13 +215,24 @@ func (this *environment) phase50ConfigurationApplication() bool {
|
||||
|
||||
func (this *environment) phase60Initialization() bool {
|
||||
if this.Verb() { log.Println("... (60) initializing") }
|
||||
// this fucking sucks in sorry
|
||||
var initializable []Initializable
|
||||
func() {
|
||||
actors, done := this.actors.RBorrow()
|
||||
defer done()
|
||||
initializable = actors.initializable.all()
|
||||
}()
|
||||
if err := this.initializeActors(this.ctx, initializable...); err != nil {
|
||||
// filter out non-initial actors
|
||||
initializableInitial := initializable
|
||||
index := 0
|
||||
for _, actor := range initializable {
|
||||
if !this.info(actor.(Actor)).initial { continue }
|
||||
initializableInitial = initializable[:index + 1]
|
||||
initializableInitial[index] = actor
|
||||
index ++
|
||||
}
|
||||
|
||||
if err := this.initializeActors(this.ctx, initializableInitial...); err != nil {
|
||||
log.Println("XXX (60) failed to initialize:", err)
|
||||
return false
|
||||
}
|
||||
@@ -207,6 +241,34 @@ func (this *environment) phase60Initialization() bool {
|
||||
}
|
||||
|
||||
func (this *environment) phase70Running() bool {
|
||||
for actor := range this.All() {
|
||||
if actor, ok := actor.(MainRunnable); ok {
|
||||
this.main = actor
|
||||
}
|
||||
}
|
||||
|
||||
bodyReturn := make(chan bool)
|
||||
go func() {
|
||||
bodyReturn <- this.phase70RunningBody()
|
||||
if this.main != nil {
|
||||
shutdownCtx, done := context.WithTimeout(
|
||||
context.Background(),
|
||||
defaul(this.timing.shutdownTimeout.Load(),
|
||||
defaultShutdownTimeout))
|
||||
defer done()
|
||||
this.main.ShutdownMain(shutdownCtx)
|
||||
}
|
||||
}()
|
||||
|
||||
mainReturn := false
|
||||
if this.main != nil {
|
||||
mainReturn = this.phase70_2MainBind()
|
||||
}
|
||||
|
||||
return <- bodyReturn && mainReturn
|
||||
}
|
||||
|
||||
func (this *environment) phase70RunningBody() bool {
|
||||
defer this.Done(nil)
|
||||
if this.Verb() { log.Println("... (70) starting up") }
|
||||
this.running.Store(true)
|
||||
@@ -215,29 +277,45 @@ func (this *environment) phase70Running() bool {
|
||||
actors, done := this.actors.RBorrow()
|
||||
defer done()
|
||||
for _, actor := range actors.runnable.all() {
|
||||
if !actors.info(actor.(Actor)).initial { continue }
|
||||
this.start(actor.(Actor))
|
||||
}
|
||||
for _, actor := range actors.runShutdownable.all() {
|
||||
if !actors.info(actor.(Actor)).initial { continue }
|
||||
this.start(actor.(Actor))
|
||||
}
|
||||
|
||||
}()
|
||||
log.Println(".// (70) startup sequence complete")
|
||||
// await context cancellation or waitgroup completion
|
||||
wgChannel := make(chan struct { }, 1)
|
||||
go func() {
|
||||
this.group.Wait()
|
||||
wgChannel <- struct { } { }
|
||||
close(this.noneLeft)
|
||||
}()
|
||||
select {
|
||||
case <- this.ctx.Done():
|
||||
if this.Verb() { log.Println("(i) (70) canceled") }
|
||||
case <- wgChannel:
|
||||
case <- this.noneLeft:
|
||||
if this.Verb() { log.Println("(i) (70) all actors have finished") }
|
||||
}
|
||||
return true
|
||||
}
|
||||
|
||||
func (this *environment) phase70_2MainBind() bool{
|
||||
mainActor := this.main.(Actor)
|
||||
if this.Verb() { log.Printf("... (70.2) binding %s to main thread", mainActor.Type()) }
|
||||
runtime.LockOSThread()
|
||||
if this.Verb() { log.Printf(".// (70.2) main thread bind complete") }
|
||||
defer runtime.UnlockOSThread()
|
||||
err := panicWrap(this.main.RunMain)
|
||||
if err != nil {
|
||||
log.Printf("XXX [%s] main thread failed: %v", mainActor.Type(), err)
|
||||
return false
|
||||
}
|
||||
if this.Verb() { log.Printf("(i) (70.2) main thread exited") }
|
||||
return true
|
||||
}
|
||||
|
||||
func (this *environment) phase70_5Trimming() bool {
|
||||
if this.Verb() { log.Println("... (70.5) trimming") }
|
||||
var trimmable []Trimmable
|
||||
@@ -248,6 +326,9 @@ func (this *environment) phase70_5Trimming() bool {
|
||||
}()
|
||||
if err := this.trimActors(this.ctx, trimmable...); err != nil {
|
||||
log.Println(".// (70.5) failed to trim:", err)
|
||||
if this.flags.crashOnError {
|
||||
panic(err)
|
||||
}
|
||||
return false
|
||||
}
|
||||
if this.Verb() { log.Println(".// (70.5) trimmed") }
|
||||
@@ -255,6 +336,7 @@ func (this *environment) phase70_5Trimming() bool {
|
||||
}
|
||||
|
||||
func (this *environment) phase80Shutdown() bool {
|
||||
logActors(All())
|
||||
ctx, done := context.WithTimeout(
|
||||
context.Background(),
|
||||
defaul(this.timing.shutdownTimeout.Load(), defaultShutdownTimeout))
|
||||
@@ -262,9 +344,7 @@ func (this *environment) phase80Shutdown() bool {
|
||||
go func() {
|
||||
<- ctx.Done()
|
||||
if errors.Is(context.Cause(ctx), context.DeadlineExceeded) {
|
||||
log.Println("XXX (80) shutdown timeout expired, performing emergency halt")
|
||||
log.Printf("====== [%s] END =======", this.name)
|
||||
os.Exit(1)
|
||||
this.emergencyHalt("shutdown timeout expired")
|
||||
}
|
||||
}()
|
||||
|
||||
@@ -277,6 +357,7 @@ func (this *environment) phase80Shutdown() bool {
|
||||
log.Println(".// (80) shutdown succeeded, goodbye")
|
||||
log.Printf("====== [%s] END =======", this.name)
|
||||
}()
|
||||
this.group.Wait()
|
||||
// wait for all actors to shut down
|
||||
<- this.noneLeft
|
||||
return cause == nil
|
||||
}
|
||||
|
||||
14
run.go
14
run.go
@@ -52,12 +52,14 @@ var env environment
|
||||
//
|
||||
// 70. Running: Actors which implement [Runnable] or [RunShutdownable] are
|
||||
// run, each in their own goroutine. The environment is able to restart
|
||||
// actors which have failed, which entails resetting the actor if it
|
||||
// implements [Resettable], and running the actor again within the same
|
||||
// actors which have failed if they implement [Resettable], which entails
|
||||
// resetting the actor and running it again within the same
|
||||
// goroutine. If an actor does not run for a meaningful amount of time
|
||||
// after resetting/initialization before failing, it is considered erratic
|
||||
// and further attempts to restart it will be spaced by a limited,
|
||||
// constantly increasing time interval. The timing is configurable, but by
|
||||
// constantly increasing time interval.
|
||||
//
|
||||
// The timing is configurable, but by
|
||||
// default the threshold for a meaningful amount of runtime is 16 seconds,
|
||||
// the initial delay interval is 8 seconds, the interval increase per
|
||||
// attempt is 8 seconds, and the maximum interval is one hour.
|
||||
@@ -66,6 +68,12 @@ var env environment
|
||||
// configurable, but by default it is once every minute. When an actor
|
||||
// which implements [Resettable] is reset, it is given a configurable
|
||||
// timeout, which is 8 minutes by default.
|
||||
//
|
||||
// If one of the actors directly passed to Run implements MainRunnable,
|
||||
// it will be bound to the main thread and the CAMFISH environment will
|
||||
// be run in a different thread. If more than one actor implementing
|
||||
// MainRunnable is passed to the Run function, only the first one is
|
||||
// considered.
|
||||
//
|
||||
// 80. Shutdown: This can be triggered by all actors being removed from the
|
||||
// environment, a catastrophic error, [Done] being called, or the program
|
||||
|
||||
49
util.go
49
util.go
@@ -12,6 +12,37 @@ import "strings"
|
||||
import "context"
|
||||
import "sync/atomic"
|
||||
import "unicode/utf8"
|
||||
import "runtime/debug"
|
||||
|
||||
type panicError struct {
|
||||
wrapped error
|
||||
stack []byte
|
||||
}
|
||||
|
||||
func (this panicError) Error() string {
|
||||
if this.stack == nil {
|
||||
return this.wrapped.Error()
|
||||
} else {
|
||||
return fmt.Sprintf("%v: %s", this.wrapped, this.stack)
|
||||
}
|
||||
}
|
||||
|
||||
func (this panicError) Unwrap() error {
|
||||
return this.wrapped
|
||||
}
|
||||
|
||||
func panicErr(message any, stack []byte) (err error) {
|
||||
var wrapped error
|
||||
if panErr, ok := message.(error); ok {
|
||||
wrapped = panErr
|
||||
} else {
|
||||
wrapped = errors.New(fmt.Sprint(message))
|
||||
}
|
||||
return panicError {
|
||||
wrapped: wrapped,
|
||||
stack: stack,
|
||||
}
|
||||
}
|
||||
|
||||
func defaul[T comparable](value, def T) T {
|
||||
var zero T
|
||||
@@ -22,11 +53,7 @@ func defaul[T comparable](value, def T) T {
|
||||
func panicWrap(f func() error) (err error) {
|
||||
defer func () {
|
||||
if pan := recover(); pan != nil {
|
||||
if panErr, ok := pan.(error); ok {
|
||||
err = panErr
|
||||
} else {
|
||||
err = errors.New(fmt.Sprint(pan))
|
||||
}
|
||||
err = panicErr(pan, debug.Stack())
|
||||
}
|
||||
} ()
|
||||
|
||||
@@ -37,11 +64,7 @@ func panicWrap(f func() error) (err error) {
|
||||
func panicWrapCtx(ctx context.Context, f func(context.Context) error) (err error) {
|
||||
defer func () {
|
||||
if pan := recover(); pan != nil {
|
||||
if panErr, ok := pan.(error); ok {
|
||||
err = panErr
|
||||
} else {
|
||||
err = errors.New(fmt.Sprint(pan))
|
||||
}
|
||||
err = panicErr(pan, debug.Stack())
|
||||
}
|
||||
} ()
|
||||
|
||||
@@ -77,7 +100,11 @@ func logActors (actors iter.Seq[Actor]) {
|
||||
}
|
||||
types := make(map[string] int)
|
||||
for actor := range actors {
|
||||
types[actor.Type()] += 1
|
||||
typ := actor.Type()
|
||||
if named, ok := actor.(Named); ok {
|
||||
typ = fmt.Sprintf("%s/%s", typ, named.Name())
|
||||
}
|
||||
types[typ] += 1
|
||||
}
|
||||
for typ, count := range types {
|
||||
if count > 1 {
|
||||
|
||||
Reference in New Issue
Block a user