390 lines
12 KiB
Go
390 lines
12 KiB
Go
package camfish
|
|
|
|
import "os"
|
|
import "fmt"
|
|
import "log"
|
|
import "iter"
|
|
import "sync"
|
|
import "time"
|
|
import "errors"
|
|
import "context"
|
|
import "sync/atomic"
|
|
import "golang.org/x/sync/errgroup"
|
|
import "git.tebibyte.media/sashakoshka/go-util/sync"
|
|
import "git.tebibyte.media/sashakoshka/go-service/daemon"
|
|
|
|
const defaultInitTimeout = 8 * time.Minute
|
|
const defaultRestartThreshold = 16 * time.Second
|
|
const defaultRestartInitialInterval = 8 * time.Second
|
|
const defaultRestartInitialIncrement = 8 * time.Second
|
|
const defaultRestartInitialMaximum = 1 * time.Hour
|
|
const defaultResetTimeout = 8 * time.Minute
|
|
const defaultTrimInterval = 1 * time.Minute
|
|
const defaultTrimTimeout = 1 * time.Minute
|
|
const defaultShutdownTimeout = 8 * time.Minute
|
|
|
|
// environment is an object which handles requests by package-level functions.
|
|
// It is only a separate object for testing purposes.
|
|
type environment struct {
|
|
name string
|
|
description string
|
|
actors usync.RWMonitor[*actorSets]
|
|
ctx context.Context
|
|
done context.CancelCauseFunc
|
|
group sync.WaitGroup
|
|
conf MutableConfig
|
|
|
|
// flags stores information from built-in flags.
|
|
flags struct {
|
|
pidFile string
|
|
user string
|
|
logDirectory string
|
|
configFile string
|
|
verbose bool
|
|
}
|
|
|
|
// running stores whether the environment is currently running.
|
|
// updated by environment.phase7Running.
|
|
// ATOMICALLY VOLATILE
|
|
running atomic.Bool
|
|
|
|
// timing stores configurable timing information.
|
|
// updated by environment.phase5ConfigurationApplication.
|
|
// ATOMICALLY VOLATILE
|
|
timing struct {
|
|
initTimeout atomicDuration
|
|
restartThreshold atomicDuration
|
|
restartInitialInterval atomicDuration
|
|
restartIntervalIncrement atomicDuration
|
|
restartIntervalMaximum atomicDuration
|
|
resetTimeout atomicDuration
|
|
trimTimeout atomicDuration
|
|
shutdownTimeout atomicDuration
|
|
}
|
|
}
|
|
|
|
// Run implements the package-level function [Run].
|
|
func (this *environment) Run(name, description string, actors ...Actor) {
|
|
if len(actors) == 0 {
|
|
log.Println("XXX cannot start with no actors")
|
|
os.Exit(2)
|
|
}
|
|
|
|
this.ctx, this.done = context.WithCancelCause(context.Background())
|
|
defer this.done(nil)
|
|
daemon.OnSigint(func() { this.done(ErrProcessKilled) })
|
|
defer log.SetOutput(os.Stderr)
|
|
|
|
this.name = name
|
|
this.description = description
|
|
this.actors = usync.NewRWMonitor(&actorSets { })
|
|
this.addToSets(actors...)
|
|
this.addToSets(&cron {
|
|
trimFunc: this.phase70_5Trimming,
|
|
})
|
|
|
|
if !this.phase10FlagParsing() { os.Exit(2) }
|
|
if !this.phase13PidFileCreation() { os.Exit(1) }
|
|
if !this.phase17PrivilegeDropping() { os.Exit(1) }
|
|
if !this.phase20LogSwitching() { os.Exit(1) }
|
|
if !this.phase30ConfigurationParsing() { os.Exit(1) }
|
|
if !this.phase40ConfigurationProcessing() { os.Exit(1) }
|
|
if !this.phase50ConfigurationApplication() { os.Exit(1) }
|
|
if !this.phase60Initialization() { os.Exit(1) }
|
|
if !this.phase70Running() { os.Exit(1) }
|
|
if !this.phase80Shutdown() { os.Exit(1) }
|
|
}
|
|
|
|
// Done implements the package-level function [Done].
|
|
func (this *environment) Done(cause error) {
|
|
this.done(cause)
|
|
}
|
|
|
|
// Add implements the package-level function [Add].
|
|
func (this *environment) Add(ctx context.Context, actors ...Actor) error {
|
|
this.addToSets(actors...)
|
|
initializable := make([]Initializable, 0, len(actors))
|
|
for _, actor := range actors {
|
|
if actor, ok := actor.(Initializable); ok {
|
|
initializable = append(initializable, actor)
|
|
}
|
|
}
|
|
err := this.initializeActors(ctx, initializable...)
|
|
if err != nil { return err }
|
|
for _, actor := range actors {
|
|
if actor, ok := actor.(Configurable); ok {
|
|
err := actor.Configure(this.conf)
|
|
if err != nil {
|
|
return fmt.Errorf (
|
|
"could not apply configuration to %s: %w",
|
|
actor.(Actor).Type(), err)
|
|
}
|
|
}
|
|
}
|
|
for _, actor := range actors {
|
|
if actor, ok := actor.(Runnable); ok {
|
|
this.start(actor)
|
|
}
|
|
}
|
|
return ctx.Err()
|
|
}
|
|
|
|
// Del implements the package-level function [Del].
|
|
func (this *environment) Del(ctx context.Context, actors ...Actor) error {
|
|
channels := []<- chan struct { } { }
|
|
for _, actor := range actors {
|
|
info := this.info(actor)
|
|
if info.done != nil {
|
|
channels = append(channels, info.stopped)
|
|
}
|
|
}
|
|
for _, channel := range channels {
|
|
select {
|
|
case <- channel:
|
|
case <- ctx.Done():
|
|
return ctx.Err()
|
|
}
|
|
}
|
|
return ctx.Err()
|
|
}
|
|
|
|
// Find implements the package-level function [Find].
|
|
func (this *environment) Find(typ string) Actor {
|
|
actors, done := this.actors.RBorrow()
|
|
defer done()
|
|
return actors.all.find(typ)
|
|
}
|
|
|
|
// FindAll implements the package-level function [FindAll].
|
|
func (this *environment) FindAll(typ string) iter.Seq[Actor] {
|
|
actors, done := this.actors.RBorrow()
|
|
defer done()
|
|
slice := actors.all.findAll(typ)
|
|
return func (yield func(Actor) bool) {
|
|
for _, actor := range slice {
|
|
if !yield(actor) { return }
|
|
}
|
|
}
|
|
}
|
|
|
|
// All implements the package-level function [All].
|
|
func (this *environment) All() iter.Seq[Actor] {
|
|
actors, done := this.actors.RBorrow()
|
|
defer done()
|
|
slice := actors.all.all()
|
|
return func (yield func(Actor) bool) {
|
|
for _, actor := range slice {
|
|
if !yield(actor) { return }
|
|
}
|
|
}
|
|
}
|
|
|
|
// Verb implements the package-level function [Verb].
|
|
func (this *environment) Verb() bool {
|
|
return this.flags.verbose
|
|
}
|
|
|
|
// addToSets adds the actors to the actorSets in a thread-safe manner.
|
|
func (this *environment) addToSets(actors ...Actor) {
|
|
thisActors, done := this.actors.Borrow()
|
|
defer done()
|
|
for _, actor := range actors {
|
|
thisActors.add(this.ctx, actor)
|
|
}
|
|
}
|
|
|
|
// delFromSets deletes the actors from the actorSets in a thread-safe manner.
|
|
func (this *environment) delFromSets(actors ...Actor) {
|
|
thisActors, done := this.actors.Borrow()
|
|
defer done()
|
|
for _, actor := range actors {
|
|
thisActors.del(actor)
|
|
}
|
|
}
|
|
|
|
// info retrieves information about an actor from the actorSets in thread-safe
|
|
// manner.
|
|
func (this *environment) info(actor Actor) actorInfo {
|
|
thisActors, done := this.actors.RBorrow()
|
|
defer done()
|
|
return thisActors.info(actor)
|
|
}
|
|
|
|
// start increments the wait group by one and starts the given actor in the
|
|
// background, restarting it if it fails. this function will exit immediately.
|
|
// see the documentation for run for details.
|
|
func (this *environment) start(actor Runnable) {
|
|
this.group.Add(1)
|
|
go this.run(actor)
|
|
}
|
|
|
|
// run runs the given actor, restarting it if it fails. This function will exit
|
|
// when the actor's context is canceled. The actor will be removed from the
|
|
// environment once this function exits, and the environment's wait group
|
|
// counter will be decremented. note that this function will never increment the
|
|
// wait group counter, so start should usually be used instead.
|
|
func (this *environment) run(actor Runnable) {
|
|
// clean up when done
|
|
defer this.group.Done()
|
|
|
|
// logging
|
|
acto, ok := actor.(Actor)
|
|
if !ok { return }
|
|
typ := acto.Type()
|
|
if this.Verb() { log.Printf("(i) [%s] running", typ) }
|
|
var stopErr error
|
|
var exited bool
|
|
defer func() {
|
|
if stopErr == nil {
|
|
if exited {
|
|
if this.Verb() { log.Printf("(i) [%s] exited", typ) }
|
|
} else {
|
|
if this.Verb() { log.Printf("(i) [%s] stopped", typ) }
|
|
}
|
|
} else {
|
|
log.Printf("!!! [%s] stopped with error: %v", typ, stopErr)
|
|
}
|
|
}()
|
|
|
|
// contains context information
|
|
info := this.info(acto)
|
|
ctx := info.ctx
|
|
defer close(info.stopped)
|
|
|
|
// timing
|
|
restartThreshold := defaul(this.timing.restartThreshold.Load(), defaultRestartThreshold)
|
|
restartInitialInterval := defaul(this.timing.restartInitialInterval.Load(), defaultRestartInitialInterval)
|
|
restartIntervalIncrement := defaul(this.timing.restartIntervalIncrement.Load(), defaultRestartInitialIncrement)
|
|
restartIntervalMaximum := defaul(this.timing.restartIntervalMaximum.Load(), defaultRestartInitialMaximum)
|
|
resetTimeout := defaul(this.timing.resetTimeout.Load(), defaultResetTimeout)
|
|
restartInterval := restartInitialInterval
|
|
|
|
// main loop
|
|
for {
|
|
// run actor
|
|
lastStart := time.Now()
|
|
err := panicWrap(ctx, actor.Run)
|
|
|
|
// detect context cancellation
|
|
if ctxErr := ctx.Err(); ctxErr != nil {
|
|
if err != nil && !errors.Is(err, context.Canceled) {
|
|
stopErr = err
|
|
}
|
|
return
|
|
}
|
|
|
|
if err == nil {
|
|
// normal exit
|
|
exited = true
|
|
return
|
|
} else {
|
|
// failure
|
|
log.Printf("XXX [%s] failed", typ)
|
|
}
|
|
|
|
// restart logic
|
|
if time.Since(lastStart) < restartThreshold {
|
|
log.Printf("!!! [%s] failed too soon, restarting in %v", typ, restartInterval)
|
|
timer := time.NewTimer(restartInterval)
|
|
select {
|
|
case <- timer.C:
|
|
// ok
|
|
case <- ctx.Done():
|
|
if this.Verb() { log.Printf("(i) [%s] canceled while dormant", typ) }
|
|
return
|
|
}
|
|
restartInterval += restartIntervalIncrement
|
|
if restartInterval > restartIntervalMaximum {
|
|
restartInterval = restartIntervalMaximum
|
|
}
|
|
} else {
|
|
restartInterval = restartInitialInterval
|
|
}
|
|
|
|
// reset if needed
|
|
if actor, ok := actor.(Resettable); ok {
|
|
if this.Verb() { log.Printf("... [%s] resetting", typ) }
|
|
func() {
|
|
ctx, done := context.WithTimeout(ctx, resetTimeout)
|
|
defer done()
|
|
err := actor.Reset(ctx)
|
|
if err != nil {
|
|
log.Printf("XXX [%s] failed to reset", typ)
|
|
}
|
|
}()
|
|
if this.Verb() { log.Printf(".// [%s] reset", typ) }
|
|
}
|
|
|
|
log.Printf("(i) [%s] restarting", typ)
|
|
}
|
|
}
|
|
|
|
// initializeActors spawns initialization functions for the given actors and
|
|
// blocks until all of them have exited.
|
|
func (this *environment) initializeActors(ctx context.Context, actors ...Initializable) error {
|
|
ctx, done := context.WithTimeout(
|
|
ctx, defaul(this.timing.initTimeout.Load(), defaultInitTimeout))
|
|
defer done()
|
|
group, ctx := errgroup.WithContext(ctx)
|
|
for _, actor := range actors {
|
|
actor := actor
|
|
acto := actor.(Actor)
|
|
typ := acto.Type()
|
|
group.Go(func() error {
|
|
err := actor.Init(ctx)
|
|
if err != nil { return fmt.Errorf("%s: %w", typ, err) }
|
|
return nil
|
|
})
|
|
}
|
|
return group.Wait()
|
|
}
|
|
|
|
// trimActors spawns actor trimming functions, which can be waited upon via the
|
|
// returned errgroup.
|
|
func (this *environment) trimActors(ctx context.Context, actors ...Trimmable) error {
|
|
ctx, done := context.WithTimeout(
|
|
ctx, defaul(this.timing.trimTimeout.Load(), defaultTrimTimeout))
|
|
defer done()
|
|
group, ctx := errgroup.WithContext(ctx)
|
|
for _, actor := range actors {
|
|
actor := actor
|
|
acto := actor.(Actor)
|
|
typ := acto.Type()
|
|
group.Go(func() error {
|
|
err := actor.Trim(ctx)
|
|
if err != nil { return fmt.Errorf("%s: %w", typ, err) }
|
|
return nil
|
|
})
|
|
}
|
|
return group.Wait()
|
|
}
|
|
|
|
// applyConfig reads and applies environment-specific values from this.conf.
|
|
func (this *environment) applyConfig() error {
|
|
parseDuration := func(key string, destination interface { Store(time.Duration) }) error {
|
|
if str := this.conf.Get(key); str != "" {
|
|
value, err := time.ParseDuration(str)
|
|
if err != nil { return NewConfigError(this.conf, key, 0, err) }
|
|
this.timing.initTimeout.Store(value)
|
|
}
|
|
return nil
|
|
}
|
|
err := parseDuration("init-timeout", &this.timing.initTimeout)
|
|
if err != nil { return err }
|
|
err = parseDuration("restart-threshold", &this.timing.restartThreshold)
|
|
if err != nil { return err }
|
|
err = parseDuration("restart-initial-interval", &this.timing.restartInitialInterval)
|
|
if err != nil { return err }
|
|
err = parseDuration("restart-interval-increment", &this.timing.restartIntervalIncrement)
|
|
if err != nil { return err }
|
|
err = parseDuration("restart-interval-maximum", &this.timing.restartIntervalMaximum)
|
|
if err != nil { return err }
|
|
err = parseDuration("reset-timeout", &this.timing.resetTimeout)
|
|
if err != nil { return err }
|
|
err = parseDuration("trim-timeout", &this.timing.trimTimeout)
|
|
if err != nil { return err }
|
|
err = parseDuration("shutdown-timeout", &this.timing.shutdownTimeout)
|
|
if err != nil { return err }
|
|
return nil
|
|
}
|