6 Commits

3 changed files with 72 additions and 16 deletions

View File

@@ -7,6 +7,7 @@ import "context"
// tasks on time intervals. // tasks on time intervals.
type cron struct { type cron struct {
trimFunc func() bool trimFunc func() bool
fastTiming bool
timing struct { timing struct {
trimInterval time.Duration trimInterval time.Duration
} }
@@ -28,6 +29,9 @@ func (this *cron) Configure (config Config) error {
} }
this.timing.trimInterval = value this.timing.trimInterval = value
} }
if this.fastTiming {
this.timing.trimInterval = time.Second * 10
}
return nil return nil
} }

View File

@@ -33,6 +33,7 @@ type environment struct {
done context.CancelCauseFunc done context.CancelCauseFunc
group sync.WaitGroup group sync.WaitGroup
conf MutableConfig conf MutableConfig
cron *cron
// flags stores information from built-in flags. // flags stores information from built-in flags.
flags struct { flags struct {
@@ -42,6 +43,8 @@ type environment struct {
configFile string configFile string
verbose bool verbose bool
crash bool crash bool
crashOnError bool
fastTiming bool
} }
// running stores whether the environment is currently running. // running stores whether the environment is currently running.
@@ -79,10 +82,11 @@ func (this *environment) Run(name, description string, actors ...Actor) {
this.name = name this.name = name
this.description = description this.description = description
this.actors = usync.NewRWMonitor(&actorSets { }) this.actors = usync.NewRWMonitor(&actorSets { })
this.addToSets(actors...) this.cron = &cron {
this.addToSets(&cron {
trimFunc: this.phase70_5Trimming, trimFunc: this.phase70_5Trimming,
}) }
this.addToSets(actors...)
this.addToSets(this.cron)
if !this.phase10FlagParsing() { os.Exit(2) } if !this.phase10FlagParsing() { os.Exit(2) }
if !this.phase13PidFileCreation() { os.Exit(1) } if !this.phase13PidFileCreation() { os.Exit(1) }
@@ -111,7 +115,12 @@ func (this *environment) Add(ctx context.Context, actors ...Actor) error {
} }
} }
err := this.initializeActors(ctx, initializable...) err := this.initializeActors(ctx, initializable...)
if err != nil { return err } if err != nil {
if this.flags.crashOnError {
panic(fmt.Sprint(err))
}
return err
}
for _, actor := range actors { for _, actor := range actors {
if actor, ok := actor.(Configurable); ok { if actor, ok := actor.(Configurable); ok {
err := actor.Configure(this.conf) err := actor.Configure(this.conf)
@@ -137,11 +146,13 @@ func (this *environment) Del(ctx context.Context, actors ...Actor) error {
channels := []<- chan struct { } { } channels := []<- chan struct { } { }
for _, actor := range actors { for _, actor := range actors {
info := this.info(actor) info := this.info(actor)
if info.done != nil { if info.stopped != nil {
channels = append(channels, info.stopped) channels = append(channels, info.stopped)
} }
info.done()
} }
for _, channel := range channels { for _, channel := range channels {
if channel == nil { continue }
select { select {
case <- channel: case <- channel:
case <- ctx.Done(): case <- ctx.Done():
@@ -244,13 +255,20 @@ func (this *environment) run(actor Actor) {
} }
} else { } else {
log.Printf("!!! [%s] stopped with error: %v", typ, stopErr) log.Printf("!!! [%s] stopped with error: %v", typ, stopErr)
if this.flags.crashOnError {
panic(fmt.Sprint(stopErr))
}
} }
}() }()
// contains context information // contains context information
info := this.info(actor) info := this.info(actor)
ctx := info.ctx ctx := info.ctx
defer close(info.stopped) defer func() {
if info.stopped != nil {
close(info.stopped)
}
}()
switch actor := actor.(type) { switch actor := actor.(type) {
case Runnable: case Runnable:
@@ -305,11 +323,17 @@ func (this *environment) runRunnable(ctx context.Context, actor Runnable) (stopE
} else { } else {
// failure // failure
log.Printf("XXX [%s] failed: %v", typ, err) log.Printf("XXX [%s] failed: %v", typ, err)
if this.flags.crashOnError {
panic(fmt.Sprint(err))
}
} }
// restart logic // restart logic
if time.Since(lastStart) < restartThreshold { if time.Since(lastStart) < restartThreshold {
log.Printf("!!! [%s] failed too soon, restarting in %v", typ, restartInterval) log.Printf("!!! [%s] failed too soon, restarting in %v", typ, restartInterval)
if this.flags.crashOnError {
panic("failed too soon")
}
timer := time.NewTimer(restartInterval) timer := time.NewTimer(restartInterval)
select { select {
case <- timer.C: case <- timer.C:
@@ -335,6 +359,9 @@ func (this *environment) runRunnable(ctx context.Context, actor Runnable) (stopE
err := actor.Reset(ctx) err := actor.Reset(ctx)
if err != nil { if err != nil {
log.Printf("XXX [%s] failed to reset", typ) log.Printf("XXX [%s] failed to reset", typ)
if this.flags.crashOnError {
panic("failed to reset")
}
} }
}() }()
if this.Verb() { log.Printf(".// [%s] reset", typ) } if this.Verb() { log.Printf(".// [%s] reset", typ) }
@@ -394,6 +421,7 @@ func (this *environment) applyConfig() error {
} }
return nil return nil
} }
// TODO: trim interval
err := parseDuration("init-timeout", &this.timing.initTimeout) err := parseDuration("init-timeout", &this.timing.initTimeout)
if err != nil { return err } if err != nil { return err }
err = parseDuration("restart-threshold", &this.timing.restartThreshold) err = parseDuration("restart-threshold", &this.timing.restartThreshold)
@@ -410,6 +438,11 @@ func (this *environment) applyConfig() error {
if err != nil { return err } if err != nil { return err }
err = parseDuration("shutdown-timeout", &this.timing.shutdownTimeout) err = parseDuration("shutdown-timeout", &this.timing.shutdownTimeout)
if err != nil { return err } if err != nil { return err }
if this.flags.fastTiming {
this.timing.shutdownTimeout.Store(time.Second * 10)
}
return nil return nil
} }

View File

@@ -6,6 +6,7 @@ import "log"
import "io/fs" import "io/fs"
import "errors" import "errors"
import "context" import "context"
import "runtime"
import "strings" import "strings"
import "path/filepath" import "path/filepath"
import "git.tebibyte.media/sashakoshka/go-cli" import "git.tebibyte.media/sashakoshka/go-cli"
@@ -23,8 +24,10 @@ func (this *environment) phase10FlagParsing() bool {
flagUser := set.Flag('u', "user", "The user:group to run as", cli.ValString) flagUser := set.Flag('u', "user", "The user:group to run as", cli.ValString)
flagLogDirectory := set.Flag('l', "log-directory", "Write logs to the specified directory", cli.ValString) flagLogDirectory := set.Flag('l', "log-directory", "Write logs to the specified directory", cli.ValString)
flagConfigFile := set.Flag('c', "config-file", "Use this configuration file", cli.ValString) flagConfigFile := set.Flag('c', "config-file", "Use this configuration file", cli.ValString)
flagVerbose := set.Flag('v', "verbose", "Enable verbose output/logging", nil) flagVerbose := set.Flag('v', "verbose", "(debug) Enable verbose output/logging", nil)
flagCrash := set.Flag(0, "crash", "Crash when an actor panics", nil) flagCrash := set.Flag(0, "crash", "(debug) Crash when an actor panics", nil)
flagCrashOnError := set.Flag(0, "crash-on-error", "(debug) Crash when an actor experiences any error", nil)
flagFastTiming := set.Flag(0, "fast-timing", "(debug) Make timed things happen faster/more often", nil)
// ask actors to add flags // ask actors to add flags
actors, done := this.actors.RBorrow() actors, done := this.actors.RBorrow()
@@ -64,6 +67,14 @@ func (this *environment) phase10FlagParsing() bool {
if _, ok := flagCrash.First(); ok { if _, ok := flagCrash.First(); ok {
this.flags.crash = true this.flags.crash = true
} }
if _, ok := flagCrashOnError.First(); ok {
this.flags.crash = true
this.flags.crashOnError = true
}
if _, ok := flagFastTiming.First(); ok {
this.flags.fastTiming = true
this.cron.fastTiming = true
}
return true return true
} }
@@ -129,15 +140,15 @@ func (this *environment) phase30ConfigurationParsing() bool {
log.Println("!!! (30) could not determine location of file(s):", err) log.Println("!!! (30) could not determine location of file(s):", err)
return true return true
} }
if this.flags.configFile != "" {
paths = append(paths, this.flags.configFile)
}
if this.Verb() { if this.Verb() {
log.Println("(i) (30) have configuration files:") log.Println("(i) (30) have configuration files:")
for _, paths := range paths { for _, paths := range paths {
log.Println("(i) (30) -", paths) log.Println("(i) (30) -", paths)
} }
} }
if this.flags.configFile != "" {
paths = append(paths, this.flags.configFile)
}
// parse every config and merge them all // parse every config and merge them all
configs := make([]iniConfig, 0, len(paths)) configs := make([]iniConfig, 0, len(paths))
for _, path := range paths { for _, path := range paths {
@@ -258,6 +269,9 @@ func (this *environment) phase70_5Trimming() bool {
}() }()
if err := this.trimActors(this.ctx, trimmable...); err != nil { if err := this.trimActors(this.ctx, trimmable...); err != nil {
log.Println(".// (70.5) failed to trim:", err) log.Println(".// (70.5) failed to trim:", err)
if this.flags.crashOnError {
panic(err)
}
return false return false
} }
if this.Verb() { log.Println(".// (70.5) trimmed") } if this.Verb() { log.Println(".// (70.5) trimmed") }
@@ -273,6 +287,11 @@ func (this *environment) phase80Shutdown() bool {
<- ctx.Done() <- ctx.Done()
if errors.Is(context.Cause(ctx), context.DeadlineExceeded) { if errors.Is(context.Cause(ctx), context.DeadlineExceeded) {
log.Println("XXX (80) shutdown timeout expired, performing emergency halt") log.Println("XXX (80) shutdown timeout expired, performing emergency halt")
if Verb() || this.flags.crashOnError {
dumpBuffer := make([]byte, 8192)
runtime.Stack(dumpBuffer, true)
log.Printf("XXX (80) stack trace of all goroutines:\n%s", dumpBuffer)
}
log.Printf("====== [%s] END =======", this.name) log.Printf("====== [%s] END =======", this.name)
os.Exit(1) os.Exit(1)
} }