package camfish import "os" import "fmt" import "log" import "iter" import "sync" import "time" import "errors" import "context" import "sync/atomic" import "golang.org/x/sync/errgroup" import "git.tebibyte.media/sashakoshka/go-util/sync" import "git.tebibyte.media/sashakoshka/go-service/daemon" const defaultInitTimeout = 8 * time.Minute const defaultRestartThreshold = 16 * time.Second const defaultRestartInitialInterval = 8 * time.Second const defaultRestartInitialIncrement = 8 * time.Second const defaultRestartInitialMaximum = 1 * time.Hour const defaultResetTimeout = 8 * time.Minute const defaultTrimInterval = 1 * time.Minute const defaultTrimTimeout = 1 * time.Minute const defaultShutdownTimeout = 8 * time.Minute // environment is an object which handles requests by package-level functions. // It is only a separate object for testing purposes. type environment struct { name string description string actors usync.RWMonitor[*actorSets] ctx context.Context done context.CancelCauseFunc group sync.WaitGroup conf MutableConfig // flags stores information from built-in flags. flags struct { pidFile string user string logDirectory string configFile string verbose bool } // running stores whether the environment is currently running. // updated by environment.phase7Running. // ATOMICALLY VOLATILE running atomic.Bool // timing stores configurable timing information. // updated by environment.phase5ConfigurationApplication. // ATOMICALLY VOLATILE timing struct { initTimeout atomicDuration restartThreshold atomicDuration restartInitialInterval atomicDuration restartIntervalIncrement atomicDuration restartIntervalMaximum atomicDuration resetTimeout atomicDuration trimTimeout atomicDuration shutdownTimeout atomicDuration } } // Run implements the package-level function [Run]. func (this *environment) Run(name, description string, actors ...Actor) { if len(actors) == 0 { log.Println("XXX cannot start with no actors") os.Exit(2) } this.ctx, this.done = context.WithCancelCause(context.Background()) defer this.done(nil) daemon.OnSigint(func() { this.done(ErrProcessKilled) }) defer log.SetOutput(os.Stderr) this.name = name this.description = description this.actors = usync.NewRWMonitor(&actorSets { }) this.addToSets(actors...) this.addToSets(&cron { trimFunc: this.phase70_5Trimming, }) if !this.phase10FlagParsing() { os.Exit(2) } if !this.phase13PidFileCreation() { os.Exit(1) } if !this.phase17PrivilegeDropping() { os.Exit(1) } if !this.phase20LogSwitching() { os.Exit(1) } if !this.phase30ConfigurationParsing() { os.Exit(1) } if !this.phase40ConfigurationProcessing() { os.Exit(1) } if !this.phase50ConfigurationApplication() { os.Exit(1) } if !this.phase60Initialization() { os.Exit(1) } if !this.phase70Running() { os.Exit(1) } if !this.phase80Shutdown() { os.Exit(1) } } // Done implements the package-level function [Done]. func (this *environment) Done(cause error) { this.done(cause) } // Add implements the package-level function [Add]. func (this *environment) Add(ctx context.Context, actors ...Actor) error { this.addToSets(actors...) initializable := make([]Initializable, 0, len(actors)) for _, actor := range actors { if actor, ok := actor.(Initializable); ok { initializable = append(initializable, actor) } } err := this.initializeActors(ctx, initializable...) if err != nil { return err } for _, actor := range actors { if actor, ok := actor.(Configurable); ok { err := actor.Configure(this.conf) if err != nil { return fmt.Errorf ( "could not apply configuration to %s: %w", actor.(Actor).Type(), err) } } } for _, actor := range actors { _, isRunnable := actor.(Runnable) _, isRunShutdownable := actor.(RunShutdownable) if isRunnable || isRunShutdownable { this.start(actor) } } return ctx.Err() } // Del implements the package-level function [Del]. func (this *environment) Del(ctx context.Context, actors ...Actor) error { channels := []<- chan struct { } { } for _, actor := range actors { info := this.info(actor) if info.done != nil { channels = append(channels, info.stopped) } } for _, channel := range channels { select { case <- channel: case <- ctx.Done(): return ctx.Err() } } return ctx.Err() } // Find implements the package-level function [Find]. func (this *environment) Find(typ string) Actor { actors, done := this.actors.RBorrow() defer done() return actors.all.find(typ) } // FindAll implements the package-level function [FindAll]. func (this *environment) FindAll(typ string) iter.Seq[Actor] { actors, done := this.actors.RBorrow() defer done() slice := actors.all.findAll(typ) return func (yield func(Actor) bool) { for _, actor := range slice { if !yield(actor) { return } } } } // All implements the package-level function [All]. func (this *environment) All() iter.Seq[Actor] { actors, done := this.actors.RBorrow() defer done() slice := actors.all.all() return func (yield func(Actor) bool) { for _, actor := range slice { if !yield(actor) { return } } } } // Verb implements the package-level function [Verb]. func (this *environment) Verb() bool { return this.flags.verbose } // addToSets adds the actors to the actorSets in a thread-safe manner. func (this *environment) addToSets(actors ...Actor) { thisActors, done := this.actors.Borrow() defer done() for _, actor := range actors { thisActors.add(this.ctx, actor) } } // delFromSets deletes the actors from the actorSets in a thread-safe manner. func (this *environment) delFromSets(actors ...Actor) { thisActors, done := this.actors.Borrow() defer done() for _, actor := range actors { thisActors.del(actor) } } // info retrieves information about an actor from the actorSets in thread-safe // manner. func (this *environment) info(actor Actor) actorInfo { thisActors, done := this.actors.RBorrow() defer done() return thisActors.info(actor) } // start increments the wait group by one and starts the given actor in the // background, restarting it if it fails. this function will exit immediately. // see the documentation for run for details. func (this *environment) start(actor Actor) { this.group.Add(1) go this.run(actor) } // run runs the given actor, restarting it if it fails. This function will exit // when the actor's context is canceled. The actor will be removed from the // environment once this function exits, and the environment's wait group // counter will be decremented. note that this function will never increment the // wait group counter, so start should usually be used instead. func (this *environment) run(actor Actor) { // clean up when done defer this.group.Done() // logging typ := actor.Type() if this.Verb() { log.Printf("(i) [%s] running", typ) } var stopErr error var exited bool defer func() { if stopErr == nil { if exited { if this.Verb() { log.Printf("(i) [%s] exited", typ) } } else { if this.Verb() { log.Printf("(i) [%s] stopped", typ) } } } else { log.Printf("!!! [%s] stopped with error: %v", typ, stopErr) } }() // contains context information info := this.info(actor) ctx := info.ctx defer close(info.stopped) switch actor := actor.(type) { case Runnable: stopErr, exited = this.runRunnable(ctx, actor) case RunShutdownable: stopErr, exited = this.runRunnable(ctx, &runShutdownableShim { shutdownTimeout: defaul(this.timing.shutdownTimeout.Load(), defaultShutdownTimeout), underlying: actor, }) default: panic("actor was neither Runnable or RunShutdownable") } } // runRunnable runs an actor implementing [Runnable]. this should only be called // from within [environment.run]. func (this *environment) runRunnable(ctx context.Context, actor Runnable) (stopErr error, exited bool) { // timing restartThreshold := defaul(this.timing.restartThreshold.Load(), defaultRestartThreshold) restartInitialInterval := defaul(this.timing.restartInitialInterval.Load(), defaultRestartInitialInterval) restartIntervalIncrement := defaul(this.timing.restartIntervalIncrement.Load(), defaultRestartInitialIncrement) restartIntervalMaximum := defaul(this.timing.restartIntervalMaximum.Load(), defaultRestartInitialMaximum) resetTimeout := defaul(this.timing.resetTimeout.Load(), defaultResetTimeout) restartInterval := restartInitialInterval acto, ok := actor.(Actor) if !ok { return } typ := acto.Type() for { // run actor lastStart := time.Now() err := panicWrapCtx(ctx, actor.Run) // detect context cancellation if ctxErr := ctx.Err(); ctxErr != nil { if err != nil && !errors.Is(err, context.Canceled) { stopErr = err } return } if err == nil { // normal exit exited = true return } else { // failure log.Printf("XXX [%s] failed: %v", typ, err) } // restart logic if time.Since(lastStart) < restartThreshold { log.Printf("!!! [%s] failed too soon, restarting in %v", typ, restartInterval) timer := time.NewTimer(restartInterval) select { case <- timer.C: // ok case <- ctx.Done(): if this.Verb() { log.Printf("(i) [%s] canceled while dormant", typ) } return } restartInterval += restartIntervalIncrement if restartInterval > restartIntervalMaximum { restartInterval = restartIntervalMaximum } } else { restartInterval = restartInitialInterval } // reset if needed if actor, ok := actor.(Resettable); ok { if this.Verb() { log.Printf("... [%s] resetting", typ) } func() { ctx, done := context.WithTimeout(ctx, resetTimeout) defer done() err := actor.Reset(ctx) if err != nil { log.Printf("XXX [%s] failed to reset", typ) } }() if this.Verb() { log.Printf(".// [%s] reset", typ) } } log.Printf("(i) [%s] restarting", typ) } } // initializeActors spawns initialization functions for the given actors and // blocks until all of them have exited. func (this *environment) initializeActors(ctx context.Context, actors ...Initializable) error { ctx, done := context.WithTimeout( ctx, defaul(this.timing.initTimeout.Load(), defaultInitTimeout)) defer done() group, ctx := errgroup.WithContext(ctx) for _, actor := range actors { actor := actor acto := actor.(Actor) typ := acto.Type() group.Go(func() error { err := actor.Init(ctx) if err != nil { return fmt.Errorf("%s: %w", typ, err) } return nil }) } return group.Wait() } // trimActors spawns actor trimming functions, which can be waited upon via the // returned errgroup. func (this *environment) trimActors(ctx context.Context, actors ...Trimmable) error { ctx, done := context.WithTimeout( ctx, defaul(this.timing.trimTimeout.Load(), defaultTrimTimeout)) defer done() group, ctx := errgroup.WithContext(ctx) for _, actor := range actors { actor := actor acto := actor.(Actor) typ := acto.Type() group.Go(func() error { err := actor.Trim(ctx) if err != nil { return fmt.Errorf("%s: %w", typ, err) } return nil }) } return group.Wait() } // applyConfig reads and applies environment-specific values from this.conf. func (this *environment) applyConfig() error { parseDuration := func(key string, destination interface { Store(time.Duration) }) error { if str := this.conf.Get(key); str != "" { value, err := time.ParseDuration(str) if err != nil { return NewConfigError(this.conf, key, 0, err) } this.timing.initTimeout.Store(value) } return nil } err := parseDuration("init-timeout", &this.timing.initTimeout) if err != nil { return err } err = parseDuration("restart-threshold", &this.timing.restartThreshold) if err != nil { return err } err = parseDuration("restart-initial-interval", &this.timing.restartInitialInterval) if err != nil { return err } err = parseDuration("restart-interval-increment", &this.timing.restartIntervalIncrement) if err != nil { return err } err = parseDuration("restart-interval-maximum", &this.timing.restartIntervalMaximum) if err != nil { return err } err = parseDuration("reset-timeout", &this.timing.resetTimeout) if err != nil { return err } err = parseDuration("trim-timeout", &this.timing.trimTimeout) if err != nil { return err } err = parseDuration("shutdown-timeout", &this.timing.shutdownTimeout) if err != nil { return err } return nil } type runShutdownableShim struct { underlying RunShutdownable shutdownTimeout time.Duration } func (this *runShutdownableShim) Type() string { return this.underlying.(Actor).Type() } func (this *runShutdownableShim) Run(ctx context.Context) error { ctx, done := context.WithCancel(ctx) defer done() go func() { <- ctx.Done() shutdownCtx, done := context.WithTimeout(context.Background(), this.shutdownTimeout) defer done() this.underlying.Shutdown(shutdownCtx) }() return this.underlying.Run() }