Compare commits

...

17 Commits
v0.3.1 ... main

Author SHA1 Message Date
eca2b35057 Fix crash when deleting an actor 2025-09-17 20:55:33 -04:00
d34af2c4ee Add Cleanupable interface
The Cleanup method is called on actors when they exit
2025-09-17 20:45:01 -04:00
e21cd9ed11 The cron actor now respects --fast-timing 2025-09-17 17:03:27 -04:00
70dc9702bd Add a --fast-timing option to make things happen sooner
The smaller time scale is better for debugging
2025-09-17 14:54:12 -04:00
613e21597b Add a --crash-on-error flag to crash whenever an actor returns err 2025-09-17 11:36:12 -04:00
126374fbac Print stack trace of all goroutines when emergency halting if verbose 2025-09-15 20:40:08 -04:00
02f06d857e Fix deadlock when deleting actors 2025-09-14 19:22:42 -04:00
3d25441e7a Log config file paths after the --config one is added 2025-05-21 15:24:23 -04:00
735d314a19 examples/panic: Make an example to show off aforementioned feature 2025-05-21 13:50:34 -04:00
4f4c7a0627 Print stack trace when actors panic 2025-05-21 13:50:16 -04:00
461d0b77e9 Print out all config file paths on startup 2025-05-21 09:49:44 -04:00
861d6af1d7 Add --crash flag to crash and print stack trace on panic 2025-04-08 10:29:27 -04:00
ac2db05d06 Fix local config path 2025-03-10 01:38:17 -04:00
f4904884ad Improve named actor interface 2025-03-09 01:56:09 -05:00
7d0620fe3e Fix system config file path 2025-03-09 01:55:11 -05:00
3115c5feef Actors are formatted better when logged 2025-03-09 01:54:53 -05:00
10ca4f4671 Add "named" actor interface 2025-02-05 16:49:27 -05:00
7 changed files with 185 additions and 28 deletions

View File

@ -29,6 +29,14 @@ type Actor interface {
Type() string
}
// Named is any object with a name.
type Named interface {
// Name returns the name. This doesn't need to be the same as Type. It
// must return the same string every time. It is used to differentiate
// actors of the same type in logs.
Name() string
}
// FlagAdder is any object that can add [Flag]s to a [FlagSet]. Actors which
// implement this interface will be called upon to add flags during and only
// during the flag parsing phase.
@ -119,3 +127,10 @@ type RunShutdownable interface {
// should be shut down.
Shutdown(ctx context.Context) error
}
// Cleanupable is any object that must be cleaned up after it has stopped for
// good. Actors which implement this interface will be cleaned up after they
// are deleted from the environment.
type Cleanupable interface {
Cleanup(ctx context.Context) error
}

View File

@ -7,6 +7,7 @@ import "context"
// tasks on time intervals.
type cron struct {
trimFunc func() bool
fastTiming bool
timing struct {
trimInterval time.Duration
}
@ -28,6 +29,9 @@ func (this *cron) Configure (config Config) error {
}
this.timing.trimInterval = value
}
if this.fastTiming {
this.timing.trimInterval = time.Second * 10
}
return nil
}

View File

@ -19,6 +19,7 @@ const defaultRestartInitialInterval = 8 * time.Second
const defaultRestartInitialIncrement = 8 * time.Second
const defaultRestartInitialMaximum = 1 * time.Hour
const defaultResetTimeout = 8 * time.Minute
const defaultCleanupTimeout = 1 * time.Minute
const defaultTrimInterval = 1 * time.Minute
const defaultTrimTimeout = 1 * time.Minute
const defaultShutdownTimeout = 8 * time.Minute
@ -33,6 +34,7 @@ type environment struct {
done context.CancelCauseFunc
group sync.WaitGroup
conf MutableConfig
cron *cron
// flags stores information from built-in flags.
flags struct {
@ -41,6 +43,9 @@ type environment struct {
logDirectory string
configFile string
verbose bool
crash bool
crashOnError bool
fastTiming bool
}
// running stores whether the environment is currently running.
@ -58,6 +63,7 @@ type environment struct {
restartIntervalIncrement atomicDuration
restartIntervalMaximum atomicDuration
resetTimeout atomicDuration
cleanupTimeout atomicDuration
trimTimeout atomicDuration
shutdownTimeout atomicDuration
}
@ -78,10 +84,11 @@ func (this *environment) Run(name, description string, actors ...Actor) {
this.name = name
this.description = description
this.actors = usync.NewRWMonitor(&actorSets { })
this.addToSets(actors...)
this.addToSets(&cron {
this.cron = &cron {
trimFunc: this.phase70_5Trimming,
})
}
this.addToSets(actors...)
this.addToSets(this.cron)
if !this.phase10FlagParsing() { os.Exit(2) }
if !this.phase13PidFileCreation() { os.Exit(1) }
@ -110,7 +117,12 @@ func (this *environment) Add(ctx context.Context, actors ...Actor) error {
}
}
err := this.initializeActors(ctx, initializable...)
if err != nil { return err }
if err != nil {
if this.flags.crashOnError {
panic(fmt.Sprint(err))
}
return err
}
for _, actor := range actors {
if actor, ok := actor.(Configurable); ok {
err := actor.Configure(this.conf)
@ -136,11 +148,15 @@ func (this *environment) Del(ctx context.Context, actors ...Actor) error {
channels := []<- chan struct { } { }
for _, actor := range actors {
info := this.info(actor)
if info.done != nil {
if info.stopped != nil {
channels = append(channels, info.stopped)
}
if info.done != nil {
info.done()
}
}
for _, channel := range channels {
if channel == nil { continue }
select {
case <- channel:
case <- ctx.Done():
@ -226,11 +242,30 @@ func (this *environment) start(actor Actor) {
// counter will be decremented. note that this function will never increment the
// wait group counter, so start should usually be used instead.
func (this *environment) run(actor Actor) {
typ := actor.Type()
// clean up when done
defer this.group.Done()
defer func() {
this.group.Done()
this.delFromSets(actor)
if actor, ok := actor.(Cleanupable); ok {
ctx, done := context.WithTimeout(
context.Background(),
defaul(
this.timing.cleanupTimeout.Load(),
defaultCleanupTimeout))
defer done()
err := actor.Cleanup(ctx)
if err != nil {
log.Printf("XXX [%s] failed to cleanup: %v", typ, err)
if this.flags.crashOnError {
panic(fmt.Sprint(err))
}
}
}
}()
// logging
typ := actor.Type()
if this.Verb() { log.Printf("(i) [%s] running", typ) }
var stopErr error
var exited bool
@ -243,13 +278,20 @@ func (this *environment) run(actor Actor) {
}
} else {
log.Printf("!!! [%s] stopped with error: %v", typ, stopErr)
if this.flags.crashOnError {
panic(fmt.Sprint(stopErr))
}
}
}()
// contains context information
info := this.info(actor)
ctx := info.ctx
defer close(info.stopped)
defer func() {
if info.stopped != nil {
close(info.stopped)
}
}()
switch actor := actor.(type) {
case Runnable:
@ -282,7 +324,12 @@ func (this *environment) runRunnable(ctx context.Context, actor Runnable) (stopE
for {
// run actor
lastStart := time.Now()
err := panicWrapCtx(ctx, actor.Run)
var err error
if this.flags.crash {
err = actor.Run(ctx)
} else {
err = panicWrapCtx(ctx, actor.Run)
}
// detect context cancellation
if ctxErr := ctx.Err(); ctxErr != nil {
@ -299,11 +346,17 @@ func (this *environment) runRunnable(ctx context.Context, actor Runnable) (stopE
} else {
// failure
log.Printf("XXX [%s] failed: %v", typ, err)
if this.flags.crashOnError {
panic(fmt.Sprint(err))
}
}
// restart logic
if time.Since(lastStart) < restartThreshold {
log.Printf("!!! [%s] failed too soon, restarting in %v", typ, restartInterval)
if this.flags.crashOnError {
panic("failed too soon")
}
timer := time.NewTimer(restartInterval)
select {
case <- timer.C:
@ -329,6 +382,9 @@ func (this *environment) runRunnable(ctx context.Context, actor Runnable) (stopE
err := actor.Reset(ctx)
if err != nil {
log.Printf("XXX [%s] failed to reset", typ)
if this.flags.crashOnError {
panic("failed to reset")
}
}
}()
if this.Verb() { log.Printf(".// [%s] reset", typ) }
@ -388,6 +444,7 @@ func (this *environment) applyConfig() error {
}
return nil
}
// TODO: trim interval
err := parseDuration("init-timeout", &this.timing.initTimeout)
if err != nil { return err }
err = parseDuration("restart-threshold", &this.timing.restartThreshold)
@ -400,10 +457,17 @@ func (this *environment) applyConfig() error {
if err != nil { return err }
err = parseDuration("reset-timeout", &this.timing.resetTimeout)
if err != nil { return err }
err = parseDuration("cleanup-timeout", &this.timing.cleanupTimeout)
if err != nil { return err }
err = parseDuration("trim-timeout", &this.timing.trimTimeout)
if err != nil { return err }
err = parseDuration("shutdown-timeout", &this.timing.shutdownTimeout)
if err != nil { return err }
if this.flags.fastTiming {
this.timing.shutdownTimeout.Store(time.Second * 10)
}
return nil
}

35
examples/panic/main.go Normal file
View File

@ -0,0 +1,35 @@
// Example panic demonstrates how the environment can restart actors if they
// fail.
package main
import "log"
import "time"
import "errors"
import "context"
import "math/rand"
import "git.tebibyte.media/sashakoshka/camfish"
func main() {
camfish.Run("panic",
"Example panic demonstrates how the environment can restart " +
"actors if they fail",
new(actor))
}
// actor is an incorrectly implemented actor that panics and errs randomly.
type actor struct { }
var _ camfish.Runnable = new(actor)
func (this *actor) Type() string { return "panic" }
func (this *actor) Run(ctx context.Context) error {
log.Println("(i) [panic] panicking in 10 seconds")
select {
case <- ctx.Done(): return ctx.Err()
case <- time.After(time.Second * 10):
if rand.Int() % 2 == 0 {
panic("this is a panic")
} else {
return errors.New("this is an error")
}
}
}

4
ini.go
View File

@ -162,7 +162,7 @@ func configFiles(program string) ([]string, error) {
userConfig, err := os.UserConfigDir()
if err != nil { return nil, err }
return []string {
filepath.Join("/etc", program),
filepath.Join(userConfig, program),
filepath.Join("/etc", program, program + ".conf"),
filepath.Join(userConfig, program, program + ".conf"),
}, nil
}

View File

@ -6,6 +6,7 @@ import "log"
import "io/fs"
import "errors"
import "context"
import "runtime"
import "strings"
import "path/filepath"
import "git.tebibyte.media/sashakoshka/go-cli"
@ -18,12 +19,15 @@ func (this *environment) phase10FlagParsing() bool {
name: this.name,
description: this.description,
}
flagHelp := set.Flag('h', "help", "Display usage information and exit", nil)
flagPidFile := set.Flag('p', "pid-file", "Write the PID to the specified file", cli.ValString)
flagUser := set.Flag('u', "user", "The user:group to run as", cli.ValString)
flagLogDirectory := set.Flag('l', "log-directory", "Write logs to the specified directory", cli.ValString)
flagConfigFile := set.Flag('c', "config-file", "Use this configuration file", cli.ValString)
flagVerbose := set.Flag('v', "verbose", "Enable verbose output/logging", nil)
flagHelp := set.Flag('h', "help", "Display usage information and exit", nil)
flagPidFile := set.Flag('p', "pid-file", "Write the PID to the specified file", cli.ValString)
flagUser := set.Flag('u', "user", "The user:group to run as", cli.ValString)
flagLogDirectory := set.Flag('l', "log-directory", "Write logs to the specified directory", cli.ValString)
flagConfigFile := set.Flag('c', "config-file", "Use this configuration file", cli.ValString)
flagVerbose := set.Flag('v', "verbose", "(debug) Enable verbose output/logging", nil)
flagCrash := set.Flag(0, "crash", "(debug) Crash when an actor panics", nil)
flagCrashOnError := set.Flag(0, "crash-on-error", "(debug) Crash when an actor experiences any error", nil)
flagFastTiming := set.Flag(0, "fast-timing", "(debug) Make timed things happen faster/more often", nil)
// ask actors to add flags
actors, done := this.actors.RBorrow()
@ -60,6 +64,17 @@ func (this *environment) phase10FlagParsing() bool {
if _, ok := flagVerbose.First(); ok {
this.flags.verbose = true
}
if _, ok := flagCrash.First(); ok {
this.flags.crash = true
}
if _, ok := flagCrashOnError.First(); ok {
this.flags.crash = true
this.flags.crashOnError = true
}
if _, ok := flagFastTiming.First(); ok {
this.flags.fastTiming = true
this.cron.fastTiming = true
}
return true
}
@ -128,6 +143,12 @@ func (this *environment) phase30ConfigurationParsing() bool {
if this.flags.configFile != "" {
paths = append(paths, this.flags.configFile)
}
if this.Verb() {
log.Println("(i) (30) have configuration files:")
for _, paths := range paths {
log.Println("(i) (30) -", paths)
}
}
// parse every config and merge them all
configs := make([]iniConfig, 0, len(paths))
for _, path := range paths {
@ -248,6 +269,9 @@ func (this *environment) phase70_5Trimming() bool {
}()
if err := this.trimActors(this.ctx, trimmable...); err != nil {
log.Println(".// (70.5) failed to trim:", err)
if this.flags.crashOnError {
panic(err)
}
return false
}
if this.Verb() { log.Println(".// (70.5) trimmed") }
@ -255,6 +279,7 @@ func (this *environment) phase70_5Trimming() bool {
}
func (this *environment) phase80Shutdown() bool {
logActors(All())
ctx, done := context.WithTimeout(
context.Background(),
defaul(this.timing.shutdownTimeout.Load(), defaultShutdownTimeout))
@ -263,6 +288,11 @@ func (this *environment) phase80Shutdown() bool {
<- ctx.Done()
if errors.Is(context.Cause(ctx), context.DeadlineExceeded) {
log.Println("XXX (80) shutdown timeout expired, performing emergency halt")
if Verb() || this.flags.crashOnError {
dumpBuffer := make([]byte, 8192)
runtime.Stack(dumpBuffer, true)
log.Printf("XXX (80) stack trace of all goroutines:\n%s", dumpBuffer)
}
log.Printf("====== [%s] END =======", this.name)
os.Exit(1)
}

31
util.go
View File

@ -12,6 +12,19 @@ import "strings"
import "context"
import "sync/atomic"
import "unicode/utf8"
import "runtime/debug"
func panicErr(message any, stack []byte) (err error) {
if panErr, ok := message.(error); ok {
err = panErr
} else {
err = errors.New(fmt.Sprint(message))
}
if stack != nil {
err = fmt.Errorf("%w: %s", err, stack)
}
return err
}
func defaul[T comparable](value, def T) T {
var zero T
@ -22,11 +35,7 @@ func defaul[T comparable](value, def T) T {
func panicWrap(f func() error) (err error) {
defer func () {
if pan := recover(); pan != nil {
if panErr, ok := pan.(error); ok {
err = panErr
} else {
err = errors.New(fmt.Sprint(pan))
}
err = panicErr(pan, debug.Stack())
}
} ()
@ -37,11 +46,7 @@ func panicWrap(f func() error) (err error) {
func panicWrapCtx(ctx context.Context, f func(context.Context) error) (err error) {
defer func () {
if pan := recover(); pan != nil {
if panErr, ok := pan.(error); ok {
err = panErr
} else {
err = errors.New(fmt.Sprint(pan))
}
err = panicErr(pan, debug.Stack())
}
} ()
@ -77,7 +82,11 @@ func logActors (actors iter.Seq[Actor]) {
}
types := make(map[string] int)
for actor := range actors {
types[actor.Type()] += 1
typ := actor.Type()
if named, ok := actor.(Named); ok {
typ = fmt.Sprintf("%s/%s", typ, named.Name())
}
types[typ] += 1
}
for typ, count := range types {
if count > 1 {