camfish/environment.go

430 lines
13 KiB
Go

package camfish
import "os"
import "fmt"
import "log"
import "iter"
import "sync"
import "time"
import "errors"
import "context"
import "sync/atomic"
import "golang.org/x/sync/errgroup"
import "git.tebibyte.media/sashakoshka/go-util/sync"
import "git.tebibyte.media/sashakoshka/go-service/daemon"
const defaultInitTimeout = 8 * time.Minute
const defaultRestartThreshold = 16 * time.Second
const defaultRestartInitialInterval = 8 * time.Second
const defaultRestartInitialIncrement = 8 * time.Second
const defaultRestartInitialMaximum = 1 * time.Hour
const defaultResetTimeout = 8 * time.Minute
const defaultTrimInterval = 1 * time.Minute
const defaultTrimTimeout = 1 * time.Minute
const defaultShutdownTimeout = 8 * time.Minute
// environment is an object which handles requests by package-level functions.
// It is only a separate object for testing purposes.
type environment struct {
name string
description string
actors usync.RWMonitor[*actorSets]
ctx context.Context
done context.CancelCauseFunc
group sync.WaitGroup
conf MutableConfig
// flags stores information from built-in flags.
flags struct {
pidFile string
user string
logDirectory string
configFile string
verbose bool
}
// running stores whether the environment is currently running.
// updated by environment.phase7Running.
// ATOMICALLY VOLATILE
running atomic.Bool
// timing stores configurable timing information.
// updated by environment.phase5ConfigurationApplication.
// ATOMICALLY VOLATILE
timing struct {
initTimeout atomicDuration
restartThreshold atomicDuration
restartInitialInterval atomicDuration
restartIntervalIncrement atomicDuration
restartIntervalMaximum atomicDuration
resetTimeout atomicDuration
trimTimeout atomicDuration
shutdownTimeout atomicDuration
}
}
// Run implements the package-level function [Run].
func (this *environment) Run(name, description string, actors ...Actor) {
if len(actors) == 0 {
log.Println("XXX cannot start with no actors")
os.Exit(2)
}
this.ctx, this.done = context.WithCancelCause(context.Background())
defer this.done(nil)
daemon.OnSigint(func() { this.done(ErrProcessKilled) })
defer log.SetOutput(os.Stderr)
this.name = name
this.description = description
this.actors = usync.NewRWMonitor(&actorSets { })
this.addToSets(actors...)
this.addToSets(&cron {
trimFunc: this.phase70_5Trimming,
})
if !this.phase10FlagParsing() { os.Exit(2) }
if !this.phase13PidFileCreation() { os.Exit(1) }
if !this.phase17PrivilegeDropping() { os.Exit(1) }
if !this.phase20LogSwitching() { os.Exit(1) }
if !this.phase30ConfigurationParsing() { os.Exit(1) }
if !this.phase40ConfigurationProcessing() { os.Exit(1) }
if !this.phase50ConfigurationApplication() { os.Exit(1) }
if !this.phase60Initialization() { os.Exit(1) }
if !this.phase70Running() { os.Exit(1) }
if !this.phase80Shutdown() { os.Exit(1) }
}
// Done implements the package-level function [Done].
func (this *environment) Done(cause error) {
this.done(cause)
}
// Add implements the package-level function [Add].
func (this *environment) Add(ctx context.Context, actors ...Actor) error {
this.addToSets(actors...)
initializable := make([]Initializable, 0, len(actors))
for _, actor := range actors {
if actor, ok := actor.(Initializable); ok {
initializable = append(initializable, actor)
}
}
err := this.initializeActors(ctx, initializable...)
if err != nil { return err }
for _, actor := range actors {
if actor, ok := actor.(Configurable); ok {
err := actor.Configure(this.conf)
if err != nil {
return fmt.Errorf (
"could not apply configuration to %s: %w",
actor.(Actor).Type(), err)
}
}
}
for _, actor := range actors {
_, isRunnable := actor.(Runnable)
_, isRunShutdownable := actor.(RunShutdownable)
if isRunnable || isRunShutdownable {
this.start(actor)
}
}
return ctx.Err()
}
// Del implements the package-level function [Del].
func (this *environment) Del(ctx context.Context, actors ...Actor) error {
channels := []<- chan struct { } { }
for _, actor := range actors {
info := this.info(actor)
if info.done != nil {
channels = append(channels, info.stopped)
}
}
for _, channel := range channels {
select {
case <- channel:
case <- ctx.Done():
return ctx.Err()
}
}
return ctx.Err()
}
// Find implements the package-level function [Find].
func (this *environment) Find(typ string) Actor {
actors, done := this.actors.RBorrow()
defer done()
return actors.all.find(typ)
}
// FindAll implements the package-level function [FindAll].
func (this *environment) FindAll(typ string) iter.Seq[Actor] {
actors, done := this.actors.RBorrow()
defer done()
slice := actors.all.findAll(typ)
return func (yield func(Actor) bool) {
for _, actor := range slice {
if !yield(actor) { return }
}
}
}
// All implements the package-level function [All].
func (this *environment) All() iter.Seq[Actor] {
actors, done := this.actors.RBorrow()
defer done()
slice := actors.all.all()
return func (yield func(Actor) bool) {
for _, actor := range slice {
if !yield(actor) { return }
}
}
}
// Verb implements the package-level function [Verb].
func (this *environment) Verb() bool {
return this.flags.verbose
}
// addToSets adds the actors to the actorSets in a thread-safe manner.
func (this *environment) addToSets(actors ...Actor) {
thisActors, done := this.actors.Borrow()
defer done()
for _, actor := range actors {
thisActors.add(this.ctx, actor)
}
}
// delFromSets deletes the actors from the actorSets in a thread-safe manner.
func (this *environment) delFromSets(actors ...Actor) {
thisActors, done := this.actors.Borrow()
defer done()
for _, actor := range actors {
thisActors.del(actor)
}
}
// info retrieves information about an actor from the actorSets in thread-safe
// manner.
func (this *environment) info(actor Actor) actorInfo {
thisActors, done := this.actors.RBorrow()
defer done()
return thisActors.info(actor)
}
// start increments the wait group by one and starts the given actor in the
// background, restarting it if it fails. this function will exit immediately.
// see the documentation for run for details.
func (this *environment) start(actor Actor) {
this.group.Add(1)
go this.run(actor)
}
// run runs the given actor, restarting it if it fails. This function will exit
// when the actor's context is canceled. The actor will be removed from the
// environment once this function exits, and the environment's wait group
// counter will be decremented. note that this function will never increment the
// wait group counter, so start should usually be used instead.
func (this *environment) run(actor Actor) {
// clean up when done
defer this.group.Done()
// logging
typ := actor.Type()
if this.Verb() { log.Printf("(i) [%s] running", typ) }
var stopErr error
var exited bool
defer func() {
if stopErr == nil {
if exited {
if this.Verb() { log.Printf("(i) [%s] exited", typ) }
} else {
if this.Verb() { log.Printf("(i) [%s] stopped", typ) }
}
} else {
log.Printf("!!! [%s] stopped with error: %v", typ, stopErr)
}
}()
// contains context information
info := this.info(actor)
ctx := info.ctx
defer close(info.stopped)
switch actor := actor.(type) {
case Runnable:
stopErr, exited = this.runRunnable(ctx, actor)
case RunShutdownable:
stopErr, exited = this.runRunnable(ctx, &runShutdownableShim {
shutdownTimeout: defaul(this.timing.shutdownTimeout.Load(), defaultShutdownTimeout),
underlying: actor,
})
default:
panic("actor was neither Runnable or RunShutdownable")
}
}
// runRunnable runs an actor implementing [Runnable]. this should only be called
// from within [environment.run].
func (this *environment) runRunnable(ctx context.Context, actor Runnable) (stopErr error, exited bool) {
// timing
restartThreshold := defaul(this.timing.restartThreshold.Load(), defaultRestartThreshold)
restartInitialInterval := defaul(this.timing.restartInitialInterval.Load(), defaultRestartInitialInterval)
restartIntervalIncrement := defaul(this.timing.restartIntervalIncrement.Load(), defaultRestartInitialIncrement)
restartIntervalMaximum := defaul(this.timing.restartIntervalMaximum.Load(), defaultRestartInitialMaximum)
resetTimeout := defaul(this.timing.resetTimeout.Load(), defaultResetTimeout)
restartInterval := restartInitialInterval
acto, ok := actor.(Actor)
if !ok { return }
typ := acto.Type()
for {
// run actor
lastStart := time.Now()
err := panicWrapCtx(ctx, actor.Run)
// detect context cancellation
if ctxErr := ctx.Err(); ctxErr != nil {
if err != nil && !errors.Is(err, context.Canceled) {
stopErr = err
}
return
}
if err == nil {
// normal exit
exited = true
return
} else {
// failure
log.Printf("XXX [%s] failed", typ)
}
// restart logic
if time.Since(lastStart) < restartThreshold {
log.Printf("!!! [%s] failed too soon, restarting in %v", typ, restartInterval)
timer := time.NewTimer(restartInterval)
select {
case <- timer.C:
// ok
case <- ctx.Done():
if this.Verb() { log.Printf("(i) [%s] canceled while dormant", typ) }
return
}
restartInterval += restartIntervalIncrement
if restartInterval > restartIntervalMaximum {
restartInterval = restartIntervalMaximum
}
} else {
restartInterval = restartInitialInterval
}
// reset if needed
if actor, ok := actor.(Resettable); ok {
if this.Verb() { log.Printf("... [%s] resetting", typ) }
func() {
ctx, done := context.WithTimeout(ctx, resetTimeout)
defer done()
err := actor.Reset(ctx)
if err != nil {
log.Printf("XXX [%s] failed to reset", typ)
}
}()
if this.Verb() { log.Printf(".// [%s] reset", typ) }
}
log.Printf("(i) [%s] restarting", typ)
}
}
// initializeActors spawns initialization functions for the given actors and
// blocks until all of them have exited.
func (this *environment) initializeActors(ctx context.Context, actors ...Initializable) error {
ctx, done := context.WithTimeout(
ctx, defaul(this.timing.initTimeout.Load(), defaultInitTimeout))
defer done()
group, ctx := errgroup.WithContext(ctx)
for _, actor := range actors {
actor := actor
acto := actor.(Actor)
typ := acto.Type()
group.Go(func() error {
err := actor.Init(ctx)
if err != nil { return fmt.Errorf("%s: %w", typ, err) }
return nil
})
}
return group.Wait()
}
// trimActors spawns actor trimming functions, which can be waited upon via the
// returned errgroup.
func (this *environment) trimActors(ctx context.Context, actors ...Trimmable) error {
ctx, done := context.WithTimeout(
ctx, defaul(this.timing.trimTimeout.Load(), defaultTrimTimeout))
defer done()
group, ctx := errgroup.WithContext(ctx)
for _, actor := range actors {
actor := actor
acto := actor.(Actor)
typ := acto.Type()
group.Go(func() error {
err := actor.Trim(ctx)
if err != nil { return fmt.Errorf("%s: %w", typ, err) }
return nil
})
}
return group.Wait()
}
// applyConfig reads and applies environment-specific values from this.conf.
func (this *environment) applyConfig() error {
parseDuration := func(key string, destination interface { Store(time.Duration) }) error {
if str := this.conf.Get(key); str != "" {
value, err := time.ParseDuration(str)
if err != nil { return NewConfigError(this.conf, key, 0, err) }
this.timing.initTimeout.Store(value)
}
return nil
}
err := parseDuration("init-timeout", &this.timing.initTimeout)
if err != nil { return err }
err = parseDuration("restart-threshold", &this.timing.restartThreshold)
if err != nil { return err }
err = parseDuration("restart-initial-interval", &this.timing.restartInitialInterval)
if err != nil { return err }
err = parseDuration("restart-interval-increment", &this.timing.restartIntervalIncrement)
if err != nil { return err }
err = parseDuration("restart-interval-maximum", &this.timing.restartIntervalMaximum)
if err != nil { return err }
err = parseDuration("reset-timeout", &this.timing.resetTimeout)
if err != nil { return err }
err = parseDuration("trim-timeout", &this.timing.trimTimeout)
if err != nil { return err }
err = parseDuration("shutdown-timeout", &this.timing.shutdownTimeout)
if err != nil { return err }
return nil
}
type runShutdownableShim struct {
underlying RunShutdownable
shutdownTimeout time.Duration
}
func (this *runShutdownableShim) Type() string {
return this.underlying.(Actor).Type()
}
func (this *runShutdownableShim) Run(ctx context.Context) error {
ctx, done := context.WithCancel(ctx)
defer done()
go func() {
<- ctx.Done()
shutdownCtx, done := context.WithTimeout(context.Background(), this.shutdownTimeout)
defer done()
this.underlying.Shutdown(shutdownCtx)
}()
return this.underlying.Run()
}