Compare commits
4 Commits
613e21597b
...
v0.4.0
| Author | SHA1 | Date | |
|---|---|---|---|
| eca2b35057 | |||
| d34af2c4ee | |||
| e21cd9ed11 | |||
| 70dc9702bd |
7
actor.go
7
actor.go
@@ -127,3 +127,10 @@ type RunShutdownable interface {
|
|||||||
// should be shut down.
|
// should be shut down.
|
||||||
Shutdown(ctx context.Context) error
|
Shutdown(ctx context.Context) error
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Cleanupable is any object that must be cleaned up after it has stopped for
|
||||||
|
// good. Actors which implement this interface will be cleaned up after they
|
||||||
|
// are deleted from the environment.
|
||||||
|
type Cleanupable interface {
|
||||||
|
Cleanup(ctx context.Context) error
|
||||||
|
}
|
||||||
|
|||||||
4
cron.go
4
cron.go
@@ -7,6 +7,7 @@ import "context"
|
|||||||
// tasks on time intervals.
|
// tasks on time intervals.
|
||||||
type cron struct {
|
type cron struct {
|
||||||
trimFunc func() bool
|
trimFunc func() bool
|
||||||
|
fastTiming bool
|
||||||
timing struct {
|
timing struct {
|
||||||
trimInterval time.Duration
|
trimInterval time.Duration
|
||||||
}
|
}
|
||||||
@@ -28,6 +29,9 @@ func (this *cron) Configure (config Config) error {
|
|||||||
}
|
}
|
||||||
this.timing.trimInterval = value
|
this.timing.trimInterval = value
|
||||||
}
|
}
|
||||||
|
if this.fastTiming {
|
||||||
|
this.timing.trimInterval = time.Second * 10
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
@@ -19,6 +19,7 @@ const defaultRestartInitialInterval = 8 * time.Second
|
|||||||
const defaultRestartInitialIncrement = 8 * time.Second
|
const defaultRestartInitialIncrement = 8 * time.Second
|
||||||
const defaultRestartInitialMaximum = 1 * time.Hour
|
const defaultRestartInitialMaximum = 1 * time.Hour
|
||||||
const defaultResetTimeout = 8 * time.Minute
|
const defaultResetTimeout = 8 * time.Minute
|
||||||
|
const defaultCleanupTimeout = 1 * time.Minute
|
||||||
const defaultTrimInterval = 1 * time.Minute
|
const defaultTrimInterval = 1 * time.Minute
|
||||||
const defaultTrimTimeout = 1 * time.Minute
|
const defaultTrimTimeout = 1 * time.Minute
|
||||||
const defaultShutdownTimeout = 8 * time.Minute
|
const defaultShutdownTimeout = 8 * time.Minute
|
||||||
@@ -33,6 +34,7 @@ type environment struct {
|
|||||||
done context.CancelCauseFunc
|
done context.CancelCauseFunc
|
||||||
group sync.WaitGroup
|
group sync.WaitGroup
|
||||||
conf MutableConfig
|
conf MutableConfig
|
||||||
|
cron *cron
|
||||||
|
|
||||||
// flags stores information from built-in flags.
|
// flags stores information from built-in flags.
|
||||||
flags struct {
|
flags struct {
|
||||||
@@ -43,6 +45,7 @@ type environment struct {
|
|||||||
verbose bool
|
verbose bool
|
||||||
crash bool
|
crash bool
|
||||||
crashOnError bool
|
crashOnError bool
|
||||||
|
fastTiming bool
|
||||||
}
|
}
|
||||||
|
|
||||||
// running stores whether the environment is currently running.
|
// running stores whether the environment is currently running.
|
||||||
@@ -60,6 +63,7 @@ type environment struct {
|
|||||||
restartIntervalIncrement atomicDuration
|
restartIntervalIncrement atomicDuration
|
||||||
restartIntervalMaximum atomicDuration
|
restartIntervalMaximum atomicDuration
|
||||||
resetTimeout atomicDuration
|
resetTimeout atomicDuration
|
||||||
|
cleanupTimeout atomicDuration
|
||||||
trimTimeout atomicDuration
|
trimTimeout atomicDuration
|
||||||
shutdownTimeout atomicDuration
|
shutdownTimeout atomicDuration
|
||||||
}
|
}
|
||||||
@@ -80,10 +84,11 @@ func (this *environment) Run(name, description string, actors ...Actor) {
|
|||||||
this.name = name
|
this.name = name
|
||||||
this.description = description
|
this.description = description
|
||||||
this.actors = usync.NewRWMonitor(&actorSets { })
|
this.actors = usync.NewRWMonitor(&actorSets { })
|
||||||
this.addToSets(actors...)
|
this.cron = &cron {
|
||||||
this.addToSets(&cron {
|
|
||||||
trimFunc: this.phase70_5Trimming,
|
trimFunc: this.phase70_5Trimming,
|
||||||
})
|
}
|
||||||
|
this.addToSets(actors...)
|
||||||
|
this.addToSets(this.cron)
|
||||||
|
|
||||||
if !this.phase10FlagParsing() { os.Exit(2) }
|
if !this.phase10FlagParsing() { os.Exit(2) }
|
||||||
if !this.phase13PidFileCreation() { os.Exit(1) }
|
if !this.phase13PidFileCreation() { os.Exit(1) }
|
||||||
@@ -146,8 +151,10 @@ func (this *environment) Del(ctx context.Context, actors ...Actor) error {
|
|||||||
if info.stopped != nil {
|
if info.stopped != nil {
|
||||||
channels = append(channels, info.stopped)
|
channels = append(channels, info.stopped)
|
||||||
}
|
}
|
||||||
|
if info.done != nil {
|
||||||
info.done()
|
info.done()
|
||||||
}
|
}
|
||||||
|
}
|
||||||
for _, channel := range channels {
|
for _, channel := range channels {
|
||||||
if channel == nil { continue }
|
if channel == nil { continue }
|
||||||
select {
|
select {
|
||||||
@@ -235,11 +242,30 @@ func (this *environment) start(actor Actor) {
|
|||||||
// counter will be decremented. note that this function will never increment the
|
// counter will be decremented. note that this function will never increment the
|
||||||
// wait group counter, so start should usually be used instead.
|
// wait group counter, so start should usually be used instead.
|
||||||
func (this *environment) run(actor Actor) {
|
func (this *environment) run(actor Actor) {
|
||||||
|
typ := actor.Type()
|
||||||
|
|
||||||
// clean up when done
|
// clean up when done
|
||||||
defer this.group.Done()
|
defer func() {
|
||||||
|
this.group.Done()
|
||||||
|
this.delFromSets(actor)
|
||||||
|
if actor, ok := actor.(Cleanupable); ok {
|
||||||
|
ctx, done := context.WithTimeout(
|
||||||
|
context.Background(),
|
||||||
|
defaul(
|
||||||
|
this.timing.cleanupTimeout.Load(),
|
||||||
|
defaultCleanupTimeout))
|
||||||
|
defer done()
|
||||||
|
err := actor.Cleanup(ctx)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("XXX [%s] failed to cleanup: %v", typ, err)
|
||||||
|
if this.flags.crashOnError {
|
||||||
|
panic(fmt.Sprint(err))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
// logging
|
// logging
|
||||||
typ := actor.Type()
|
|
||||||
if this.Verb() { log.Printf("(i) [%s] running", typ) }
|
if this.Verb() { log.Printf("(i) [%s] running", typ) }
|
||||||
var stopErr error
|
var stopErr error
|
||||||
var exited bool
|
var exited bool
|
||||||
@@ -418,6 +444,7 @@ func (this *environment) applyConfig() error {
|
|||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
// TODO: trim interval
|
||||||
err := parseDuration("init-timeout", &this.timing.initTimeout)
|
err := parseDuration("init-timeout", &this.timing.initTimeout)
|
||||||
if err != nil { return err }
|
if err != nil { return err }
|
||||||
err = parseDuration("restart-threshold", &this.timing.restartThreshold)
|
err = parseDuration("restart-threshold", &this.timing.restartThreshold)
|
||||||
@@ -430,10 +457,17 @@ func (this *environment) applyConfig() error {
|
|||||||
if err != nil { return err }
|
if err != nil { return err }
|
||||||
err = parseDuration("reset-timeout", &this.timing.resetTimeout)
|
err = parseDuration("reset-timeout", &this.timing.resetTimeout)
|
||||||
if err != nil { return err }
|
if err != nil { return err }
|
||||||
|
err = parseDuration("cleanup-timeout", &this.timing.cleanupTimeout)
|
||||||
|
if err != nil { return err }
|
||||||
err = parseDuration("trim-timeout", &this.timing.trimTimeout)
|
err = parseDuration("trim-timeout", &this.timing.trimTimeout)
|
||||||
if err != nil { return err }
|
if err != nil { return err }
|
||||||
err = parseDuration("shutdown-timeout", &this.timing.shutdownTimeout)
|
err = parseDuration("shutdown-timeout", &this.timing.shutdownTimeout)
|
||||||
if err != nil { return err }
|
if err != nil { return err }
|
||||||
|
|
||||||
|
if this.flags.fastTiming {
|
||||||
|
this.timing.shutdownTimeout.Store(time.Second * 10)
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
12
phases.go
12
phases.go
@@ -24,9 +24,10 @@ func (this *environment) phase10FlagParsing() bool {
|
|||||||
flagUser := set.Flag('u', "user", "The user:group to run as", cli.ValString)
|
flagUser := set.Flag('u', "user", "The user:group to run as", cli.ValString)
|
||||||
flagLogDirectory := set.Flag('l', "log-directory", "Write logs to the specified directory", cli.ValString)
|
flagLogDirectory := set.Flag('l', "log-directory", "Write logs to the specified directory", cli.ValString)
|
||||||
flagConfigFile := set.Flag('c', "config-file", "Use this configuration file", cli.ValString)
|
flagConfigFile := set.Flag('c', "config-file", "Use this configuration file", cli.ValString)
|
||||||
flagVerbose := set.Flag('v', "verbose", "Enable verbose output/logging", nil)
|
flagVerbose := set.Flag('v', "verbose", "(debug) Enable verbose output/logging", nil)
|
||||||
flagCrash := set.Flag(0, "crash", "Crash when an actor panics", nil)
|
flagCrash := set.Flag(0, "crash", "(debug) Crash when an actor panics", nil)
|
||||||
flagCrashOnError := set.Flag(0, "crash-on-error", "Crash when an actor experiences any error", nil)
|
flagCrashOnError := set.Flag(0, "crash-on-error", "(debug) Crash when an actor experiences any error", nil)
|
||||||
|
flagFastTiming := set.Flag(0, "fast-timing", "(debug) Make timed things happen faster/more often", nil)
|
||||||
|
|
||||||
// ask actors to add flags
|
// ask actors to add flags
|
||||||
actors, done := this.actors.RBorrow()
|
actors, done := this.actors.RBorrow()
|
||||||
@@ -70,6 +71,10 @@ func (this *environment) phase10FlagParsing() bool {
|
|||||||
this.flags.crash = true
|
this.flags.crash = true
|
||||||
this.flags.crashOnError = true
|
this.flags.crashOnError = true
|
||||||
}
|
}
|
||||||
|
if _, ok := flagFastTiming.First(); ok {
|
||||||
|
this.flags.fastTiming = true
|
||||||
|
this.cron.fastTiming = true
|
||||||
|
}
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -274,6 +279,7 @@ func (this *environment) phase70_5Trimming() bool {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *environment) phase80Shutdown() bool {
|
func (this *environment) phase80Shutdown() bool {
|
||||||
|
logActors(All())
|
||||||
ctx, done := context.WithTimeout(
|
ctx, done := context.WithTimeout(
|
||||||
context.Background(),
|
context.Background(),
|
||||||
defaul(this.timing.shutdownTimeout.Load(), defaultShutdownTimeout))
|
defaul(this.timing.shutdownTimeout.Load(), defaultShutdownTimeout))
|
||||||
|
|||||||
Reference in New Issue
Block a user