Initial commit

This commit is contained in:
Sasha Koshka 2024-12-31 01:27:53 -05:00
commit b3913d8078
25 changed files with 2477 additions and 0 deletions

7
README.md Normal file
View File

@ -0,0 +1,7 @@
# CAMFISH
*Concurrent Actor Model Framework for Internet Services on Holanet*
CAMFISH is a Go framework for creating daemons using an actor-model
architecture. It is designed to be fault-tolerant, and is capable of restarting
individual parts of your program (actors) if they fail.

90
actor.go Normal file
View File

@ -0,0 +1,90 @@
package camfish
import "context"
// Actor is a participant in the environment. All public methods on an actor
// must be safe for concurrent use by multiple goroutines. Additionally, any
// type which explicitly implements Actor should:
// - Treat all public fields, values, indices, etc. as immutable
// - Satisfy Actor as a pointer, not a value
// - Not have a constructor
type Actor interface {
// Type returns the type name of the actor. The value returned from this
// is used to locate actors capable of performing a specific task, so it
// absolutely must return the same string every time. Actors implemented
// in packages besides this one (i.e. not camfish) must not return the
// string "cron".
Type() string
}
// FlagAdder is any object that can add [Flag]s to a [FlagSet]. Actors which
// implement this interface will be called upon to add flags during and only
// during the flag parsing phase.
type FlagAdder interface {
// AddFlags adds flags to set. The object must not retain or distribute
// any reference to set.
AddFlags(set FlagSet)
}
// ConfigProcessor is any object that can read and modify a configuration before
// it is used. Actors which implement this interface will be called upon to
// process the config during and only during the configuration processing phase.
type ConfigProcessor interface {
// Process processes the config.
ProcessConfig(conf MutableConfig) error
}
// Configurable is any object that must be configured before use or
// initialization (if applicable). Actors which implement this interface will be
// configured during the configuration phase, or when they are added.
type Configurable interface {
// Configure configures the object. It must not make any attempt to
// modify conf, and it must not retain or distribute any reference to
// conf.
Configure(conf Config) error
}
// Initializable is any object that must be initialized before use. Actors which
// implement this interface will be initialized during the initialization phase,
// or when they are added.
type Initializable interface {
// Init initializes the object. It must return before the context
// expires, and must return ctx.Err if there is no other error to be
// returned. If Init returns an error, the object must be treated as
// invalid and any process which depends on it should be shut down.
Init(ctx context.Context) error
}
// Runnable is any object with a central, long-running routine. Actors which
// implement this interface will be run after they have been initialized,
// configured, etc. (if applicable). The environment will attempt to restart
// actors if their run method fails, see the documentation for this package's
// [Run] function for details.
type Runnable interface {
// Run runs the object. It must return when or before the context
// expires, and must return ctx.Err if there is no other error to be
// returned.
Run(ctx context.Context) error
}
// Trimmable is any object that needs to have a task run every so often. This
// can be garbage collecting, sanity checking, etc. Actors which implement this
// interface will be routinely trimmed while running. See the documentation for
// this package's [Run] function for details.
type Trimmable interface {
// Trim trims the object. It must return when or before the context
// expires, and must return ctx.Err if there is no other error to be
// returned.
Trim(ctx context.Context) error
}
// Resettable is any object that must be reset after failure and before re-use.
// Actors which implement this interface will be reset after their Run method
// (if applicable) has failed and is about to be called again.
type Resettable interface {
// Reset resets the object. It must return when or before the context
// expires, and must return ctx.Err if there is no other error to be
// returned. If Reset returns an error, the object must be treated as
// invalid and any process which depends on it should be shut down.
Reset(ctx context.Context) error
}

178
actorset.go Normal file
View File

@ -0,0 +1,178 @@
package camfish
import "iter"
import "slices"
import "context"
// actorSets stores a sorted set of all actors, as well as information about
// their state.
type actorSets struct {
all actorSet[Actor]
inf map[Actor] actorInfo
nextOrder int
flagAdder actorSet[FlagAdder]
configProcessor actorSet[ConfigProcessor]
configurable actorSet[Configurable]
initializable actorSet[Initializable]
runnable actorSet[Runnable]
trimmable actorSet[Trimmable]
}
// actorInfo holds information about a running actor.
type actorInfo struct {
// ctx is the context of the actor. It is passed to the actor's Run
// method.
ctx context.Context
// done is used to stop the actor. It is created when the actor is added
// to the actorSets.
done func()
// stopped is closed once the actor's run method exits. If the actor
// does not have a run method, it will be nil.
stopped chan struct { }
// order is the semantic order of the actor within the sets. This is
// incremented automatically for each actor that is added. It is never
// less than one.
order int
}
// actorSetIface holds only the add/del/clear methods of actorSet.
type actorSetIface interface {
add(actor Actor)
del(actor Actor)
clear()
}
// All returns an iterator over all sets in the collection.
func (sets *actorSets) All() iter.Seq[actorSetIface] {
return func(yield func(actorSetIface) bool) {
yield(&sets.all)
yield(&sets.flagAdder)
yield(&sets.configProcessor)
yield(&sets.configurable)
yield(&sets.initializable)
yield(&sets.runnable)
yield(&sets.trimmable)
}
}
// add adds an actor under the given parent context. This is a write operation.
func (this *actorSets) add(ctx context.Context, actor Actor) {
if this.inf == nil { this.inf = make(map[Actor] actorInfo)}
actorCtx, done := context.WithCancel(ctx)
this.nextOrder ++
info := actorInfo {
ctx: actorCtx,
done: done,
order: this.nextOrder,
}
if _, ok := actor.(Runnable); ok {
info.stopped = make(chan struct { })
}
this.inf[actor] = info
for set := range this.All() {
set.add(actor)
}
}
// del removes an actor. This is a write operation.
func (this *actorSets) del(actor Actor) {
delete(this.inf, actor)
for set := range this.All() {
set.del(actor)
}
}
// clear removes all actors.
func (this *actorSets) clear() {
clear(this.inf)
for set := range this.All() {
set.clear()
}
}
// info gets information about an actor.
func (this *actorSets) info(actor Actor) actorInfo {
if this.inf == nil { return actorInfo { } }
return this.inf[actor]
}
// sortActors sorts actors according to the order in which they were added.
func sortActors[T comparable] (sets *actorSets, actors []T) []T {
slices.SortFunc(actors, func (left, right T) int {
// :3
leftOrder := sets.info(any(left).(Actor)).order
rightOrder := sets.info(any(right).(Actor)).order
if leftOrder < 1 || rightOrder < 1 {
panic("could not sort actors, some were invalid")
}
switch {
case leftOrder < rightOrder: return -1
case rightOrder > leftOrder: return 1
default: return 0
}
})
return actors
}
// actorSet is a list that holds all actors of a type. it must not return any
// references to data that is referenced elsewehre, except for references to
// actors.
type actorSet[T comparable] struct {
actors map[T] string
}
// add adds an actor, if it is of the set's type
func (this *actorSet[T]) add(actor Actor) {
if this.actors == nil { this.actors = make(map[T] string) }
if item, ok := actor.(T); ok {
this.actors[item] = actor.Type()
}
}
// add removes an actor, if it is of the set's type
func (this *actorSet[T]) del(actor Actor) {
if this.actors == nil { return }
if item, ok := actor.(T); ok {
delete(this.actors, item)
}
}
// clear removes all actors.
func (this *actorSet[T]) clear() {
if this.actors == nil { return }
clear(this.actors)
}
// find returns the first actor with the given type.
func (this *actorSet[T]) find(typ string) T {
for actor, actorType := range this.actors {
if actorType == typ { return actor }
}
var zero T
return zero
}
// findAll returns all actors with the given type. This method allocates and
// returns a slice instead of an iterator for concurrency reasons.
func (this *actorSet[T]) findAll(typ string) []T {
if this.actors == nil { return nil }
slice := []T { }
for actor, actorType := range this.actors {
if actorType == typ {
slice = append(slice, actor)
}
}
return slice
}
// all returns all actors. This method allocates and returns a slice instead of
// an iterator for concurrency reasons.
func (this *actorSet[T]) all() []T {
if this.actors == nil { return nil }
slice := []T { }
for actor := range this.actors {
slice = append(slice, actor)
}
return slice
}

282
actorset_test.go Normal file
View File

@ -0,0 +1,282 @@
package camfish
import "context"
import "testing"
type mockInitializable string
func (this *mockInitializable) Type() string { return string(*this) }
func (*mockInitializable) Init(ctx context.Context) error { return ctx.Err() }
func TestActorSet(test *testing.T) {
set := actorSet[Initializable] { }
// add actors
actor0 := mockInitializable("type0")
test.Logf("actor0: %p", &actor0)
set.add(&actor0)
if len(set.actors) != 1 {
test.Fatalf("wrong length: %d", len(set.actors))
}
if _, ok := set.actors[&actor0]; !ok {
test.Fatal("missing item")
}
actor1 := mockInitializable("type0")
test.Logf("actor1: %p", &actor1)
set.add(&actor1)
if len(set.actors) != 2 {
test.Fatalf("wrong length: %d", len(set.actors))
}
if _, ok := set.actors[&actor1]; !ok {
test.Fatal("missing item")
}
actor2 := mockInitializable("type1")
test.Logf("actor2: %p", &actor2)
set.add(&actor2)
if len(set.actors) != 3 {
test.Fatalf("wrong length: %d", len(set.actors))
}
if _, ok := set.actors[&actor2]; !ok {
test.Fatal("missing item")
}
// find
found := set.find("type0")
test.Log("find:", found)
if found != &actor0 && found != &actor1 {
test.Fatalf("not equal: %p", found)
}
found = set.find("type1")
test.Log("find:", found)
if found != &actor2 {
test.Fatalf("not equal: %p", found)
}
// findAll
foundAll := set.findAll("type0")
test.Log("findAll:", foundAll)
if len(foundAll) != 2 {
test.Fatalf("wrong length: %d", len(foundAll))
}
if !mockInitializableIn(foundAll, &actor0) {
test.Fatal("missing item")
}
if !mockInitializableIn(foundAll, &actor1) {
test.Fatal("missing item")
}
foundAll = set.findAll("type1")
test.Log("findAll:", foundAll)
if len(foundAll) != 1 {
test.Fatalf("wrong length: %d", len(foundAll))
}
if !mockInitializableIn(foundAll, &actor2) {
test.Fatal("missing item")
}
// all
all := set.all()
test.Log("all:", all)
if len(all) != 3 {
test.Fatalf("wrong length: %d", len(all))
}
if !mockInitializableIn(all, &actor0) {
test.Fatal("missing item")
}
if !mockInitializableIn(all, &actor1) {
test.Fatal("missing item")
}
if !mockInitializableIn(all, &actor2) {
test.Fatal("missing item")
}
// del
set.del(&actor1)
test.Log("del")
if len(set.actors) != 2 {
test.Fatalf("wrong length: %d", len(set.actors))
}
if _, ok := set.actors[&actor1]; ok {
test.Fatal("leaked item")
}
// find
found = set.find("type0")
test.Log("find:", found)
if found != &actor0 {
test.Fatalf("not equal: %p", found)
}
found = set.find("type1")
test.Log("find:", found)
if found != &actor2 {
test.Fatalf("not equal: %p", found)
}
// findAll
foundAll = set.findAll("type0")
test.Log("findAll:", foundAll)
if len(foundAll) != 1 {
test.Fatalf("wrong length: %d", len(foundAll))
}
if !mockInitializableIn(foundAll, &actor0) {
test.Fatal("missing item")
}
if mockInitializableIn(foundAll, &actor1) {
test.Fatal("leaked item")
}
foundAll = set.findAll("type1")
test.Log("findAll:", foundAll)
if len(foundAll) != 1 {
test.Fatalf("wrong length: %d", len(foundAll))
}
if !mockInitializableIn(foundAll, &actor2) {
test.Fatal("missing item")
}
// all
all = set.all()
test.Log("all:", all)
if len(all) != 2 {
test.Fatalf("wrong length: %d", len(all))
}
if !mockInitializableIn(all, &actor0) {
test.Fatal("missing item")
}
if mockInitializableIn(all, &actor1) {
test.Fatal("leaked item")
}
if !mockInitializableIn(all, &actor2) {
test.Fatal("missing item")
}
}
func TestActorSets(test *testing.T) {
sets := actorSets { }
actor0 := mockInitializable("type0")
actor1 := mockInitializable("type1")
sets.add(context.Background(), &actor0)
sets.add(context.Background(), &actor1)
test.Log("add")
if len(sets.inf) != 2 {
test.Fatalf("wrong length: %d", len(sets.inf))
}
if len(sets.all.actors) != 2 {
test.Fatalf("wrong length: %d", len(sets.all.actors))
}
if _, ok := sets.all.actors[&actor0]; !ok {
test.Fatal("missing item")
}
if _, ok := sets.all.actors[&actor1]; !ok {
test.Fatal("missing item")
}
if len(sets.initializable.actors) != 2 {
test.Fatalf("wrong length: %d", len(sets.initializable.actors))
}
if _, ok := sets.initializable.actors[&actor0]; !ok {
test.Fatal("missing item")
}
if _, ok := sets.initializable.actors[&actor1]; !ok {
test.Fatal("missing item")
}
if len(sets.configProcessor.actors) != 0 {
test.Fatalf("wrong length: %d", len(sets.configProcessor.actors))
}
if len(sets.configurable.actors) != 0 {
test.Fatalf("wrong length: %d", len(sets.configProcessor.actors))
}
if len(sets.runnable.actors) != 0 {
test.Fatalf("wrong length: %d", len(sets.runnable.actors))
}
if len(sets.trimmable.actors) != 0 {
test.Fatalf("wrong length: %d", len(sets.trimmable.actors))
}
info := sets.info(&actor0)
test.Log("info:", info)
if info.ctx == nil { test.Fatal("value is nil") }
if info.done == nil { test.Fatal("value is nil") }
if info.order != 1 { test.Fatal("not equal") }
info = sets.info(&actor1)
test.Log("info:", info)
if info.ctx == nil { test.Fatal("value is nil") }
if info.done == nil { test.Fatal("value is nil") }
if info.order != 2 { test.Fatal("not equal") }
sets.del(&actor0)
test.Log("del")
if len(sets.inf) != 1 {
test.Fatalf("wrong length: %d", len(sets.inf))
}
if len(sets.all.actors) != 1 {
test.Fatalf("wrong length: %d", len(sets.all.actors))
}
if _, ok := sets.all.actors[&actor1]; !ok {
test.Fatal("missing item")
}
if len(sets.initializable.actors) != 1 {
test.Fatalf("wrong length: %d", len(sets.initializable.actors))
}
if _, ok := sets.initializable.actors[&actor1]; !ok {
test.Fatal("missing item")
}
info = sets.info(&actor0)
test.Log("info:", info)
if info.ctx != nil { test.Fatal("value is non-nil") }
if info.done != nil { test.Fatal("value is non-nil") }
info = sets.info(&actor1)
test.Log("info:", info)
if info.ctx == nil { test.Fatal("value is nil") }
if info.done == nil { test.Fatal("value is nil") }
sets.del(&actor1)
test.Log("del")
if len(sets.configProcessor.actors) != 0 {
test.Fatalf("wrong length: %d", len(sets.configProcessor.actors))
}
if len(sets.configurable.actors) != 0 {
test.Fatalf("wrong length: %d", len(sets.configProcessor.actors))
}
if len(sets.initializable.actors) != 0 {
test.Fatalf("wrong length: %d", len(sets.configProcessor.actors))
}
if len(sets.runnable.actors) != 0 {
test.Fatalf("wrong length: %d", len(sets.runnable.actors))
}
if len(sets.trimmable.actors) != 0 {
test.Fatalf("wrong length: %d", len(sets.trimmable.actors))
}
}
func TestSortActors(test *testing.T) {
sets := actorSets { }
actor0 := mockInitializable("type0")
actor1 := mockInitializable("type1")
actor2 := mockInitializable("type2")
actor3 := mockInitializable("type3")
sets.add(context.Background(), &actor0)
sets.add(context.Background(), &actor1)
sets.add(context.Background(), &actor2)
sets.add(context.Background(), &actor3)
test.Log("add")
sorted := sortActors(&sets, []Actor { &actor1, &actor3, &actor2, &actor0 })
test.Log("sort:", sorted)
if len(sorted) != 4 { test.Fatalf("wrong length: %d", len(sorted)) }
if sorted[0] != &actor0 { test.Fatal("wrong position") }
if sorted[1] != &actor1 { test.Fatal("wrong position") }
if sorted[2] != &actor2 { test.Fatal("wrong position") }
if sorted[3] != &actor3 { test.Fatal("wrong position") }
}
func mockInitializableIn(haystack []Initializable, needle *mockInitializable) bool {
for _, initializable := range haystack {
if initializable == needle { return true }
}
return false
}

BIN
assets/icon.png Normal file

Binary file not shown.

After

Width:  |  Height:  |  Size: 8.2 KiB

65
assets/icon.svg Normal file
View File

@ -0,0 +1,65 @@
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
<!-- Created with Inkscape (http://www.inkscape.org/) -->
<svg
width="48"
height="48"
viewBox="0 0 12.7 12.7"
version="1.1"
id="svg5"
inkscape:version="1.2.2 (b0a8486541, 2022-12-01)"
sodipodi:docname="icon.svg"
xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape"
xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd"
xmlns="http://www.w3.org/2000/svg"
xmlns:svg="http://www.w3.org/2000/svg">
<sodipodi:namedview
id="namedview7"
pagecolor="#ffffff"
bordercolor="#000000"
borderopacity="0.25"
inkscape:showpageshadow="2"
inkscape:pageopacity="0.0"
inkscape:pagecheckerboard="0"
inkscape:deskcolor="#d1d1d1"
inkscape:document-units="mm"
showgrid="false"
inkscape:zoom="11.313709"
inkscape:cx="16.617009"
inkscape:cy="25.102291"
inkscape:window-width="1920"
inkscape:window-height="981"
inkscape:window-x="0"
inkscape:window-y="663"
inkscape:window-maximized="1"
inkscape:current-layer="layer1" />
<defs
id="defs2" />
<g
inkscape:label="Layer 1"
inkscape:groupmode="layer"
id="layer1">
<rect
style="display:none;fill:#280b0b;stroke-width:0.830186"
id="rect681"
width="7.2150984"
height="3.8033857"
x="2.275116"
y="4.448307" />
<circle
style="display:none;fill:#ff00ff;fill-opacity:1;stroke-width:8.69018"
id="path683-5"
cx="6.1561918"
cy="6.3499999"
r="3.6360548" />
<path
id="path245"
style="display:none;fill:#ff8800;stroke-width:0.717"
d="m 5.4640685,4.4483074 c -2.3440145,0 -3.7889527,1.9016923 -3.7889527,1.9016923 0,0 1.4449381,1.901693 3.7889527,1.901693 2.3440146,0 3.7727875,-1.901693 3.7727875,-1.901693 0,0 -1.4287729,-1.9016923 -3.7727875,-1.9016923 z m 3.7727875,1.9016923 2.147745,1.384929 v -1.384929 -1.384928 z"
sodipodi:nodetypes="scscsccccc" />
<path
id="path245-7"
style="fill:#e2c558;stroke-width:1.2114;fill-opacity:1"
d="M 5.6567647,3.1370206 C 2.0915963,3.1370199 0,6.35 0,6.35 c 0,0 2.0915963,3.2129796 5.6567647,3.2129794 1.9867423,0 3.5188832,-1.6036824 4.5557953,-3.208614 C 9.1734328,4.7464456 7.6386787,3.1370209 5.6567647,3.1370206 Z M 10.21256,6.3543654 c 1.037275,1.6050535 1.580297,3.208614 1.580297,3.208614 0,0 0.907143,-1.307781 0.907143,-3.2129794 0,-1.9051984 -0.907143,-3.2129794 -0.907143,-3.2129794 0,0 -0.541375,1.609303 -1.580297,3.2173448 z M 6.5569228,4.5243641 A 1.8253549,1.8253549 0 0 1 8.3825587,6.35 1.8253549,1.8253549 0 0 1 6.5569228,8.1756359 1.8253549,1.8253549 0 0 1 4.7312869,6.35 1.8253549,1.8253549 0 0 1 6.5569228,4.5243641 Z M 2.9170012,5.0848893 A 0.4305839,0.43058384 0 0 1 3.3474356,5.516197 0.4305839,0.43058384 0 0 1 2.9170012,5.9466314 0.4305839,0.43058384 0 0 1 2.4865667,5.516197 0.4305839,0.43058384 0 0 1 2.9170012,5.0848893 Z" />
</g>
</svg>

After

Width:  |  Height:  |  Size: 2.9 KiB

20
config.go Normal file
View File

@ -0,0 +1,20 @@
package camfish
import "iter"
// Config represents key/value data.
type Config interface {
Get(key string) string
GetAll(key string) iter.Seq2[int, string]
}
// MutableConfig is like Config, but can be changed. These methods should be
// assumed unsafe for use by multiple concurrent goroutines. A MutableConfig
// must not be retained nor shared. These methods must not be called while
// iterating over All or GetAll.
type MutableConfig interface {
Config
Add(key, value string)
Del(key string)
Set(key, value string)
}

48
cron.go Normal file
View File

@ -0,0 +1,48 @@
package camfish
import "time"
import "context"
// cron is a built-in actor present in every environment that triggers routine
// tasks on time intervals.
type cron struct {
trimFunc func() bool
timing struct {
trimInterval time.Duration
}
}
var _ Actor = new(cron)
var _ Runnable = new(cron)
var _ Configurable = new(cron)
func (this *cron) Type () string {
return "cron"
}
func (this *cron) Configure (config Config) error {
if str := config.Get("trim-interval"); str != "" {
value, err := time.ParseDuration(str)
if err != nil {
return NewConfigError(config, "trim-interval", 0, err)
}
this.timing.trimInterval = value
}
return nil
}
func (this *cron) Run (ctx context.Context) error {
trimTicker := time.NewTicker(defaul(
this.timing.trimInterval,
defaultTrimInterval))
defer trimTicker.Stop()
for {
select {
case <- trimTicker.C:
if this.trimFunc != nil {
this.trimFunc()
}
case <- ctx.Done(): return ctx.Err()
}
}
}

7
doc.go Normal file
View File

@ -0,0 +1,7 @@
// Package camfish provides an actor-model oriented framework for daemons.
//
// The key words "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL
// NOT", "SHOULD", "SHOULD NOT", "RECOMMENDED", "MAY", and
// "OPTIONAL" in this documentation are to be interpreted as described in
// RFC 2119.
package camfish

389
environment.go Normal file
View File

@ -0,0 +1,389 @@
package camfish
import "os"
import "fmt"
import "log"
import "iter"
import "sync"
import "time"
import "errors"
import "context"
import "sync/atomic"
import "golang.org/x/sync/errgroup"
import "git.tebibyte.media/sashakoshka/go-util/sync"
import "git.tebibyte.media/sashakoshka/go-service/daemon"
const defaultInitTimeout = 8 * time.Minute
const defaultRestartThreshold = 16 * time.Second
const defaultRestartInitialInterval = 8 * time.Second
const defaultRestartInitialIncrement = 8 * time.Second
const defaultRestartInitialMaximum = 1 * time.Hour
const defaultResetTimeout = 8 * time.Minute
const defaultTrimInterval = 1 * time.Minute
const defaultTrimTimeout = 1 * time.Minute
const defaultShutdownTimeout = 8 * time.Minute
// environment is an object which handles requests by package-level functions.
// It is only a separate object for testing purposes.
type environment struct {
name string
description string
actors usync.RWLocker[*actorSets]
ctx context.Context
done context.CancelCauseFunc
group sync.WaitGroup
conf MutableConfig
// flags stores information from built-in flags.
flags struct {
pidFile string
user string
logDirectory string
configFile string
verbose bool
}
// running stores whether the environment is currently running.
// updated by environment.phase7Running.
// ATOMICALLY VOLATILE
running atomic.Bool
// timing stores configurable timing information.
// updated by environment.phase5ConfigurationApplication.
// ATOMICALLY VOLATILE
timing struct {
initTimeout atomicDuration
restartThreshold atomicDuration
restartInitialInterval atomicDuration
restartIntervalIncrement atomicDuration
restartIntervalMaximum atomicDuration
resetTimeout atomicDuration
trimTimeout atomicDuration
shutdownTimeout atomicDuration
}
}
// Run implements the package-level function [Run].
func (this *environment) Run(name, description string, actors ...Actor) {
if len(actors) == 0 {
log.Println("XXX cannot start with no actors")
os.Exit(2)
}
this.ctx, this.done = context.WithCancelCause(context.Background())
defer this.done(nil)
daemon.OnSigint(func() { this.done(ErrProcessKilled) })
defer log.SetOutput(os.Stderr)
this.name = name
this.description = description
this.actors = usync.NewRWLocker(&actorSets { })
this.addToSets(actors...)
this.addToSets(&cron {
trimFunc: this.phase70_5Trimming,
})
if !this.phase10FlagParsing() { os.Exit(2) }
if !this.phase13PidFileCreation() { os.Exit(2) }
if !this.phase17PrivilegeDropping() { os.Exit(2) }
if !this.phase20LogSwitching() { os.Exit(2) }
if !this.phase30ConfigurationParsing() { os.Exit(2) }
if !this.phase40ConfigurationProcessing() { os.Exit(2) }
if !this.phase50ConfigurationApplication() { os.Exit(2) }
if !this.phase60Initialization() { os.Exit(2) }
if !this.phase70Running() { os.Exit(1) }
if !this.phase80Shutdown() { os.Exit(1) }
}
// Done implements the package-level function [Done].
func (this *environment) Done(cause error) {
this.done(cause)
}
// Add implements the package-level function [Add].
func (this *environment) Add(ctx context.Context, actors ...Actor) error {
this.addToSets(actors...)
initializable := make([]Initializable, 0, len(actors))
for _, actor := range actors {
if actor, ok := actor.(Initializable); ok {
initializable = append(initializable, actor)
}
}
err := this.initializeActors(ctx, initializable...)
if err != nil { return err }
for _, actor := range actors {
if actor, ok := actor.(Configurable); ok {
err := actor.Configure(this.conf)
if err != nil {
return fmt.Errorf (
"could not apply configuration to %s: %w",
actor.(Actor).Type(), err)
}
}
}
for _, actor := range actors {
if actor, ok := actor.(Runnable); ok {
this.start(actor)
}
}
return ctx.Err()
}
// Del implements the package-level function [Del].
func (this *environment) Del(ctx context.Context, actors ...Actor) error {
channels := []<- chan struct { } { }
for _, actor := range actors {
info := this.info(actor)
if info.done != nil {
channels = append(channels, info.stopped)
}
}
for _, channel := range channels {
select {
case <- channel:
case <- ctx.Done():
return ctx.Err()
}
}
return ctx.Err()
}
// Find implements the package-level function [Find].
func (this *environment) Find(typ string) Actor {
actors, done := this.actors.RBorrow()
defer done()
return actors.all.find(typ)
}
// FindAll implements the package-level function [FindAll].
func (this *environment) FindAll(typ string) iter.Seq[Actor] {
actors, done := this.actors.RBorrow()
defer done()
slice := actors.all.findAll(typ)
return func (yield func(Actor) bool) {
for _, actor := range slice {
if !yield(actor) { return }
}
}
}
// All implements the package-level function [All].
func (this *environment) All() iter.Seq[Actor] {
actors, done := this.actors.RBorrow()
defer done()
slice := actors.all.all()
return func (yield func(Actor) bool) {
for _, actor := range slice {
if !yield(actor) { return }
}
}
}
// Verb implements the package-level function [Verb].
func (this *environment) Verb() bool {
return this.flags.verbose
}
// addToSets adds the actors to the actorSets in a thread-safe manner.
func (this *environment) addToSets(actors ...Actor) {
thisActors, done := this.actors.Borrow()
defer done()
for _, actor := range actors {
thisActors.add(this.ctx, actor)
}
}
// delFromSets deletes the actors from the actorSets in a thread-safe manner.
func (this *environment) delFromSets(actors ...Actor) {
thisActors, done := this.actors.Borrow()
defer done()
for _, actor := range actors {
thisActors.del(actor)
}
}
// info retrieves information about an actor from the actorSets in thread-safe
// manner.
func (this *environment) info(actor Actor) actorInfo {
thisActors, done := this.actors.RBorrow()
defer done()
return thisActors.info(actor)
}
// start increments the wait group by one and starts the given actor in the
// background, restarting it if it fails. this function will exit immediately.
// see the documentation for run for details.
func (this *environment) start(actor Runnable) {
this.group.Add(1)
go this.run(actor)
}
// run runs the given actor, restarting it if it fails. This function will exit
// when the actor's context is canceled. The actor will be removed from the
// environment once this function exits, and the environment's wait group
// counter will be decremented. note that this function will never increment the
// wait group counter, so start should usually be used instead.
func (this *environment) run(actor Runnable) {
// clean up when done
defer this.group.Done()
// logging
acto, ok := actor.(Actor)
if !ok { return }
typ := acto.Type()
if this.Verb() { log.Printf("(i) [%s] running", typ) }
var stopErr error
var exited bool
defer func() {
if stopErr == nil {
if exited {
if this.Verb() { log.Printf("(i) [%s] exited", typ) }
} else {
if this.Verb() { log.Printf("(i) [%s] stopped", typ) }
}
} else {
log.Printf("!!! [%s] stopped with error: %v", typ, stopErr)
}
}()
// contains context information
info := this.info(acto)
ctx := info.ctx
defer close(info.stopped)
// timing
restartThreshold := defaul(this.timing.restartThreshold.Load(), defaultRestartThreshold)
restartInitialInterval := defaul(this.timing.restartInitialInterval.Load(), defaultRestartInitialInterval)
restartIntervalIncrement := defaul(this.timing.restartIntervalIncrement.Load(), defaultRestartInitialIncrement)
restartIntervalMaximum := defaul(this.timing.restartIntervalMaximum.Load(), defaultRestartInitialMaximum)
resetTimeout := defaul(this.timing.resetTimeout.Load(), defaultResetTimeout)
restartInterval := restartInitialInterval
// main loop
for {
// run actor
lastStart := time.Now()
err := panicWrap(ctx, actor.Run)
// detect context cancellation
if ctxErr := ctx.Err(); ctxErr != nil {
if err != nil && !errors.Is(err, context.Canceled) {
stopErr = err
}
return
}
if err == nil {
// normal exit
exited = true
return
} else {
// failure
log.Printf("XXX [%s] failed", typ)
}
// restart logic
if time.Since(lastStart) < restartThreshold {
log.Printf("!!! [%s] failed too soon, restarting in %v", typ, restartInterval)
timer := time.NewTimer(restartInterval)
select {
case <- timer.C:
// ok
case <- ctx.Done():
if this.Verb() { log.Printf("(i) [%s] canceled while dormant", typ) }
return
}
restartInterval += restartIntervalIncrement
if restartInterval > restartIntervalMaximum {
restartInterval = restartIntervalMaximum
}
} else {
restartInterval = restartInitialInterval
}
// reset if needed
if actor, ok := actor.(Resettable); ok {
if this.Verb() { log.Printf("... [%s] resetting", typ) }
func() {
ctx, done := context.WithTimeout(ctx, resetTimeout)
defer done()
err := actor.Reset(ctx)
if err != nil {
log.Printf("XXX [%s] failed to reset", typ)
}
}()
if this.Verb() { log.Printf(".// [%s] reset", typ) }
}
log.Printf("(i) [%s] restarting", typ)
}
}
// initializeActors spawns initialization functions for the given actors and
// blocks until all of them have exited.
func (this *environment) initializeActors(ctx context.Context, actors ...Initializable) error {
ctx, done := context.WithTimeout(
ctx, defaul(this.timing.initTimeout.Load(), defaultInitTimeout))
defer done()
group, ctx := errgroup.WithContext(ctx)
for _, actor := range actors {
actor := actor
acto := actor.(Actor)
typ := acto.Type()
group.Go(func() error {
err := actor.Init(ctx)
if err != nil { return fmt.Errorf("%s: %w", typ, err) }
return nil
})
}
return group.Wait()
}
// trimActors spawns actor trimming functions, which can be waited upon via the
// returned errgroup.
func (this *environment) trimActors(ctx context.Context, actors ...Trimmable) error {
ctx, done := context.WithTimeout(
ctx, defaul(this.timing.trimTimeout.Load(), defaultTrimTimeout))
defer done()
group, ctx := errgroup.WithContext(ctx)
for _, actor := range actors {
actor := actor
acto := actor.(Actor)
typ := acto.Type()
group.Go(func() error {
err := actor.Trim(ctx)
if err != nil { return fmt.Errorf("%s: %w", typ, err) }
return nil
})
}
return group.Wait()
}
// applyConfig reads and applies environment-specific values from this.conf.
func (this *environment) applyConfig() error {
parseDuration := func(key string, destination interface { Store(time.Duration) }) error {
if str := this.conf.Get(key); str != "" {
value, err := time.ParseDuration(str)
if err != nil { return NewConfigError(this.conf, key, 0, err) }
this.timing.initTimeout.Store(value)
}
return nil
}
err := parseDuration("init-timeout", &this.timing.initTimeout)
if err != nil { return err }
err = parseDuration("restart-threshold", &this.timing.restartThreshold)
if err != nil { return err }
err = parseDuration("restart-initial-interval", &this.timing.restartInitialInterval)
if err != nil { return err }
err = parseDuration("restart-interval-increment", &this.timing.restartIntervalIncrement)
if err != nil { return err }
err = parseDuration("restart-interval-maximum", &this.timing.restartIntervalMaximum)
if err != nil { return err }
err = parseDuration("reset-timeout", &this.timing.resetTimeout)
if err != nil { return err }
err = parseDuration("trim-timeout", &this.timing.trimTimeout)
if err != nil { return err }
err = parseDuration("shutdown-timeout", &this.timing.shutdownTimeout)
if err != nil { return err }
return nil
}

7
environment_test.go Normal file
View File

@ -0,0 +1,7 @@
package camfish
import "testing"
func TestEnvironment(test *testing.T) {
// TODO
}

103
error.go Normal file
View File

@ -0,0 +1,103 @@
package camfish
import "fmt"
import "strings"
// Error enumerates common errors in this package.
type Error string; const (
ErrNotFound Error = "not found"
ErrNotRunning Error = "not running"
ErrProcessKilled Error = "process killed"
ErrExtraneousValues Error = "extraneous value(s)"
ErrSectionHeadingMalformed Error = "section heading malformed"
ErrPairMalformed Error = "key/value pair malformed"
ErrKeyEmpty Error = "key empty"
)
// Error implements the error interface.
func (err Error) Error() string {
return string(err)
}
// ConfigError pairs an error with a location in a config file.
type ConfigError struct {
File string
Key string
Line int
Column int
Err error
}
// Error implements the error interface.
func (err ConfigError) Error() string {
out := strings.Builder { }
if err.File != "" {
out.WriteString(err.File)
}
if err.Key != "" {
fmt.Fprintf(&out, "[%s]", err.Key)
}
switch {
case err.Line != 0 && err.Column != 0:
if out.Len() > 0 { out.WriteRune(':') }
fmt.Fprintf(&out, "%d:%d", err.Line, err.Column)
case err.Line != 0:
if out.Len() > 0 { out.WriteRune(':') }
fmt.Fprintf(&out, "%d", err.Line)
}
if out.Len() > 0 { out.WriteString(": ") }
if err.Err == nil {
out.WriteString("configuration error")
} else {
fmt.Fprint(&out, err.Err)
}
return out.String()
}
// Unwrap returns err.Err.
func (err ConfigError) Unwrap() error {
return err.Err
}
// NewConfigError creates a new config error with the given key and config. It
// will attempt to fill in as many details as possible with the information
// given. If the config has a method with an identical name and signature to
// this function, it will be called and its value will be returned. Otherwise,
// an error is returned containing only the provided information.
func NewConfigError(config Config, key string, index int, wrapped error) ConfigError {
type configErrorFactory interface {
NewConfigError(string, int, error) ConfigError
}
if config, ok := config.(configErrorFactory); ok {
return config.NewConfigError(key, index, wrapped)
}
return ConfigError {
Key: key,
Err: wrapped,
}
}
// FlagError pairs a flag with a long flag name.
type FlagError struct {
Long string
Err error
}
// Error implements the error interface.
func (err FlagError) Error() string {
output := err.Long
if output != "" {
output = fmt.Sprintf("--%s: ", output)
}
if err.Err == nil {
output += "argument parsing error"
} else {
output += err.Unwrap().Error()
}
return output
}
// Unwrap returns err.Err
func (err FlagError) Unwrap() error {
return err.Err
}

74
error_test.go Normal file
View File

@ -0,0 +1,74 @@
package camfish
import "testing"
func TestConfigError (test *testing.T) {
err := ConfigError {
File: "example.conf",
Key: "some-key",
Line: 6,
Column: 20,
Err: ErrNotFound,
}
str := err.Error()
test.Log(str)
if str != "example.conf[some-key]:6:20: not found" {
test.Fatal("not equal")
}
unwrapped := err.Unwrap()
test.Log(unwrapped)
if unwrapped!= ErrNotFound {
test.Fatal("not equal")
}
err = ConfigError {
File: "example.conf",
Line: 6,
Column: 20,
Err: ErrNotFound,
}
str = err.Error()
test.Log(str)
if str != "example.conf:6:20: not found" {
test.Fatal("not equal")
}
err = ConfigError {
Key: "some-key",
Column: 20,
Err: ErrNotFound,
}
str = err.Error()
test.Log(str)
if str != "[some-key]: not found" {
test.Fatal("not equal")
}
err = ConfigError {
File: "example.conf",
Key: "some-key",
Line: 20,
Err: ErrNotFound,
}
str = err.Error()
test.Log(str)
if str != "example.conf[some-key]:20: not found" {
test.Fatal("not equal")
}
err = ConfigError {
Err: ErrNotFound,
}
str = err.Error()
test.Log(str)
if str != "not found" {
test.Fatal("not equal")
}
err = ConfigError { }
str = err.Error()
test.Log(str)
if str != "configuration error" {
test.Fatal("not equal")
}
}

93
examples/pipeline/main.go Normal file
View File

@ -0,0 +1,93 @@
// Example pipeline demonstrates a three-stage pipeline with an actor for each
// stage.
package main
import "log"
import "time"
import "context"
import "math/rand"
import "git.tebibyte.media/sashakoshka/camfish"
func main() {
camfish.Run("pipeline",
"Example pipeline demonstrates a three-stage pipeline with " +
"an actor for each stage",
new(generator),
new(reverser),
new(printer))
}
// generator produces strings randomly.
type generator struct { }
var _ camfish.Runnable = new(generator)
func (this *generator) Type() string { return "generator" }
func (this *generator) Run(ctx context.Context) error {
reverser := camfish.Find("reverser").(*reverser)
timer := time.NewTimer(0)
for {
select {
case <- timer.C:
timer.Reset(time.Duration(float64(time.Second) * (rand.Float64() + 0.2)))
reverser.Send([]string {
"red", "yellow", "green", "blue",
}[rand.Int() % 4])
case <- ctx.Done(): return ctx.Err()
}
}
}
// reverser reverses strings.
type reverser struct { input chan string }
var _ camfish.Runnable = new(reverser)
var _ camfish.Initializable = new(reverser)
func (this *reverser) Type() string { return "reverser" }
func (this *reverser) Init(ctx context.Context) error {
this.input = make(chan string)
return ctx.Err()
}
func (this *reverser) Run(ctx context.Context) error {
printer := camfish.Find("printer").(*printer)
for {
select {
case str := <- this.input:
runes := []rune(str)
for i, j := 0, len(runes) - 1; i < j; i, j = i + 1, j - 1 {
runes[i], runes[j] = runes[j], runes[i]
}
printer.Print(string(runes))
case <- ctx.Done(): return ctx.Err()
}
}
}
func (this *reverser) Send(str string) {
this.input <- str
}
// printer prints strings.
type printer struct { input chan string }
var _ camfish.Runnable = new(printer)
var _ camfish.Initializable = new(printer)
func (this *printer) Type() string { return "printer" }
func (this *printer) Init(ctx context.Context) error {
this.input = make(chan string)
return ctx.Err()
}
func (this *printer) Run(ctx context.Context) error {
for {
select {
case str := <- this.input:
log.Println("(i) [printer]", str)
case <- ctx.Done(): return ctx.Err()
}
}
}
func (this *printer) Print(str string) {
this.input <- str
}

142
flag.go Normal file
View File

@ -0,0 +1,142 @@
package camfish
import "fmt"
import "iter"
import "git.tebibyte.media/sashakoshka/go-cli"
// FlagSet holds command-line flags to be parsed.
type FlagSet interface {
// Flag creates a new flag. If there are naming collisions between
// flags, flags specified later take precedence over those specified
// earlier.
//
// A short and long form of the flag may be specified. The short form of
// a flag is invoked with a single dash and one or more runes, each rune
// invoking its corresponding flag exactly once. The long form is
// invoked with two dashes followed by the text of the long form, and it
// should be in lower kebab case. If short is zero, the flag will not
// have a short form. All flags must have a long form.
Flag(short rune, long string, help string, validate func(string) error) Flag
}
// Flag represents the result of parsing a command-line flag. It is filled in
// automatically during argument parsing, and the values within it can be
// accessed afterwards.
type Flag interface {
// First returns the value where the flag was first found. If the flag
// was never specified, false is returned for found. If the flag either
// was never specified or does not take input, the string "true" will be
// returned. If at least one value is given but this function is never
// called to recieve it, the environment will exit with an error.
First() (value string, found bool)
// All returns an iterator over all instances of this flag that were
// specified. If this flag takes no input, all returned values will be
// the string "true". If multiple instances of a flag are given but this
// function is never called to receive them, will the environment exit
// with an error.
All() iter.Seq2[int, string]
}
// flagSet is an implementation of [FlagSet] that wraps [cli.Cli].
type flagSet struct {
name string
description string
cli cli.Cli
short map[rune ] int
long map[string] int
flags []*flag
}
func (this *flagSet) Flag(short rune, long string, help string, validate func(string) error) Flag {
if this.short == nil { this.short = make(map[rune ] int) }
if this.long == nil { this.long = make(map[string] int) }
fla := &flag {
short: short,
long: long,
help: help,
validate: validate,
}
if short != 0 { this.short[short] = len(this.flags) }
if long != "" { this.long[long] = len(this.flags) }
this.flags = append(this.flags, fla)
return fla
}
// parse parses the arguments using the [flag]s contained within the set.
func (this *flagSet) parse(args []string) error {
cliFlags := make([]*cli.Flag, len(this.flags))
for index, fla := range this.flags {
index := index
cliFlags[index] = &cli.Flag {
Short: fla.short,
Long: fla.long,
Help: fla.help,
Validate: fla.validate,
Found: func(_ *cli.Cli, value string) {
this.flags[index].values = append(this.flags[index].values, value)
},
}
}
this.cli.Description = this.description
this.cli.Flags = cliFlags
_, err := this.cli.Parse(args)
return err
}
// fullyReceived returns an error if not all specified values were used. This
// behavior is currently unused.
func (this *flagSet) fullyReceived() error {
for _, fla := range this.flags {
if !fla.fullyReceived() {
return FlagError {
Long: fla.long,
Err: fmt.Errorf (
"%w: %v",
ErrExtraneousValues,
fla.values[fla.received:]),
}
}
}
if len(this.cli.Args) > 0 {
return ErrExtraneousValues
}
return nil
}
// usage prints usage/help information to [os.Stderr]. it only works properly
// after parse has been run.
func (this *flagSet) usage() {
this.cli.Usage()
}
// flag is an implementation of [Flag] that
type flag struct {
short rune
long string
help string
validate func(string) error
values []string
received int
}
func (this *flag) First() (value string, found bool) {
if len(this.values) < 1 { return "", false }
if this.received < 1 { this.received = 1 }
return this.values[0], true
}
func (this *flag) All() iter.Seq2[int, string] {
return func(yield func(int, string) bool) {
for index, value := range this.values {
if this.received <= index { this.received = index + 1 }
if !yield(index, value) { return }
}
}
}
// fullyReceived returns whether all values were used. If this flag does not
// take in a value, it just returns true.
func (this *flag) fullyReceived() bool {
return this.validate == nil || this.received >= len(this.values)
}

161
flag_test.go Normal file
View File

@ -0,0 +1,161 @@
package camfish
import "errors"
import "testing"
import "git.tebibyte.media/sashakoshka/go-cli"
func TestFlagSet(test *testing.T) {
set := flagSet { }
flag0 := set.Flag('a', "flag-0", "some help", nil)
flag1 := set.Flag('b', "flag-1", "some help", nil)
flag2 := set.Flag('c', "flag-2", "some help", nil)
flag6 := set.Flag('d', "flag-3", "some help", nil)
flag3 := set.Flag('d', "flag-3", "some help", nil)
flag4v := set.Flag('e', "flag-4v", "some help", cli.ValString)
flag5 := set.Flag('f', "flag-5", "some help", nil)
err := set.parse([]string {
"thing",
"-baf",
"-e", "value 4 a",
"-d", "-ff",
"--flag-4v", "value 4 b",
})
if err != nil { test.Fatal(err) }
if value, ok := flag0.First(); ok {
if value != "true" { test.Fatalf("not equal: %s", value) }
} else { test.Fatal("not found") }
for index, value := range flag0.All() {
if index != 0 { test.Fatalf("extra value: %s", value) }
if value != "true" { test.Fatalf("not equal: %s", value) }
}
if value, ok := flag1.First(); ok {
if value != "true" { test.Fatalf("not equal: %s", value) }
} else { test.Fatal("not found") }
for index, value := range flag1.All() {
if index != 0 { test.Fatalf("extra value: %s", value) }
if value != "true" { test.Fatalf("not equal: %s", value) }
}
if value, ok := flag2.First(); ok {
if value != "" { test.Fatalf("found: %s", value) }
}
for _, value := range flag2.All() {
test.Fatalf("extra value: %s", value)
}
if value, ok := flag3.First(); ok {
if value != "true" { test.Fatalf("not equal: %s", value) }
} else { test.Fatal("not found") }
for index, value := range flag3.All() {
if index != 0 { test.Fatalf("extra value: %s", value) }
if value != "true" { test.Fatalf("not equal: %s", value) }
}
if value, ok := flag4v.First(); ok {
if value != "value 4 a" { test.Fatalf("not equal: %s", value) }
} else { test.Fatal("not found") }
for index, value := range flag4v.All() {
switch index {
case 0: if value != "value 4 a" { test.Fatalf("not equal: %s", value) }
case 1: if value != "value 4 b" { test.Fatalf("not equal: %s", value) }
default: test.Fatalf("extra value: %s", value)
}
}
if value, ok := flag5.First(); ok {
if value != "true" { test.Fatalf("not equal: %s", value) }
} else { test.Fatal("not found") }
for index, value := range flag5.All() {
if index > 3 { test.Fatalf("extra value: %s", value) }
if value != "true" { test.Fatalf("not equal: %s", value) }
}
if value, ok := flag6.First(); ok {
if value != "" { test.Fatalf("found: %s", value) }
}
for _, value := range flag6.All() {
test.Fatalf("extra value: %s", value)
}
err = set.fullyReceived()
if err != nil { test.Fatal(err) }
}
func TestFlagSetErrExtraneousValuesA(test *testing.T) {
set := flagSet { }
flag0 := set.Flag('a', "flag-0", "some help", cli.ValString)
err := set.parse([]string {
"thing",
"--flag-0", "value 0",
"--flag-0", "value 1",
"--flag-0", "value 2",
})
if err != nil { test.Fatal(err) }
// access the first value
flag0.First()
err = set.fullyReceived()
test.Log(err)
if !errors.Is(err, ErrExtraneousValues) {
test.Fatal("error is not ErrExtraneousValues")
}
var flagErr FlagError
if !errors.As(err, &flagErr) {
test.Fatal("error is not FlagError")
}
if flagErr.Long != "flag-0" {
test.Fatal("not equal")
}
}
func TestFlagSetErrExtraneousValuesB(test *testing.T) {
set := flagSet { }
flag0 := set.Flag('a', "flag-0", "some help", cli.ValString)
err := set.parse([]string {
"thing",
"--flag-0", "value 0",
"--flag-0", "value 1",
"--flag-0", "value 2",
})
if err != nil { test.Fatal(err) }
// access the first two values
for index, _ := range flag0.All() {
if index >= 1 { break }
}
err = set.fullyReceived()
test.Log(err)
if !errors.Is(err, ErrExtraneousValues) {
test.Fatal("error is not ErrExtraneousValues")
}
var flagErr FlagError
if !errors.As(err, &flagErr) {
test.Fatal("error is not FlagError")
}
if flagErr.Long != "flag-0" {
test.Fatal("not equal")
}
}
func TestFlagSetErrExtraneousValuesC(test *testing.T) {
set := flagSet { }
flag0 := set.Flag('a', "flag-0", "some help", cli.ValString)
err := set.parse([]string {
"thing",
"--flag-0", "value 0", "value 1", "value 2",
})
if err != nil { test.Fatal(err) }
// access the value
flag0.First()
err = set.fullyReceived()
test.Log(err)
if !errors.Is(err, ErrExtraneousValues) {
test.Fatal("error is not ErrExtraneousValues")
}
}

10
go.mod Normal file
View File

@ -0,0 +1,10 @@
module git.tebibyte.media/sashakoshka/camfish
go 1.23.0
require (
git.tebibyte.media/sashakoshka/go-cli v0.1.3
git.tebibyte.media/sashakoshka/go-service v0.1.1
git.tebibyte.media/sashakoshka/go-util v0.8.0
golang.org/x/sync v0.10.0
)

8
go.sum Normal file
View File

@ -0,0 +1,8 @@
git.tebibyte.media/sashakoshka/go-cli v0.1.3 h1:tSkWjyx2JrGu6KotbXWSTKSYGGS1D4O3qwCrRoZuwbs=
git.tebibyte.media/sashakoshka/go-cli v0.1.3/go.mod h1:JFA3wSdRkXxa4iQJWHfe3DokiG7Dh2XUJBzPmuVlbuY=
git.tebibyte.media/sashakoshka/go-service v0.1.1 h1:WhDK532iB3hrVILih2+rJmRtCctXIoj2uEWMm8tU4+E=
git.tebibyte.media/sashakoshka/go-service v0.1.1/go.mod h1:qPtzuqB1psUWZrmy3XTU1dZHHhVNHHP2pSBkpzlTazk=
git.tebibyte.media/sashakoshka/go-util v0.8.0 h1:XFuZ8HQkrnibrV016rso00geCFPatKpX4jxkIVhZPaQ=
git.tebibyte.media/sashakoshka/go-util v0.8.0/go.mod h1:0Q1t+PePdx6tFYkRuJNcpM1Mru7wE6X+it1kwuOH+6Y=
golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ=
golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=

168
ini.go Normal file
View File

@ -0,0 +1,168 @@
package camfish
import "os"
import "io"
import "fmt"
import "iter"
import "slices"
import "strings"
import "strconv"
import "path/filepath"
type iniConfig map[string] []iniValue
type iniValue struct {
value string
file string
line int
column int
}
// ParseINI parses a string containing INI configuration data.
func ParseINI(filename, input string) (MutableConfig, error) {
ini := make(iniConfig)
configErr := ConfigError {
File: filename,
}
section := ""
for index, line := range strings.Split(input, "\n") {
configErr.Line = index + 1
configErr.Key = ""
line = strings.TrimSpace(line)
if line == "" { continue }
if strings.HasPrefix(line, "#") { continue }
if strings.HasPrefix(line, "[") {
// section heading
if !strings.HasSuffix(line, "]") {
configErr.Err = ErrSectionHeadingMalformed
return nil, configErr
}
section = strings.TrimSpace(strings.TrimSuffix(strings.TrimPrefix(line, "["), "]"))
if section == "" {
configErr.Err = ErrSectionHeadingMalformed
return nil, configErr
}
continue
}
// split key/value
key, value, ok := strings.Cut(line, "=")
if !ok {
configErr.Err = ErrPairMalformed
return nil, configErr
}
// key
key = strings.TrimSpace(key)
if key == "" {
configErr.Err = ErrKeyEmpty
return nil, configErr
}
configErr.Key = key
if section != "" {
key = fmt.Sprintf("%s.%s", section, key)
}
// value
value = strings.TrimSpace(value)
if strings.HasPrefix(value, "\"") || strings.HasPrefix(value, "'") {
unquoted, err := strconv.Unquote(value)
if err != nil {
configErr.Column = strings.Index(line, "=") + 2
configErr.Err = err
return nil, configErr
}
value = unquoted
}
ini.Add(key, value)
}
return ini, nil
}
// DecodeINI decodes INI data from an io.Reader. The entire reader is consumed.
func DecodeINI(filename string, input io.Reader) (MutableConfig, error) {
buffer, err := io.ReadAll(input)
if err != nil { return nil, err }
return ParseINI(filename, string(buffer))
}
func (ini iniConfig) Add(key, value string) {
key = canonicalINIKey(key)
ini[key] = append(ini[key], iniValue {
value: value,
})
}
func (ini iniConfig) Del(key string) {
key = canonicalINIKey(key)
delete(ini, key)
}
func (ini iniConfig) Get(key string) string {
key = canonicalINIKey(key)
slice, ok := ini[key]
if !ok { return "" }
if len(slice) == 0 { return "" }
return slice[0].value
}
func (ini iniConfig) GetAll(key string) iter.Seq2[int, string] {
return func(yield func(int, string) bool) {
for index, value := range ini[key] {
if !yield(index, value.value) { return }
}
}
}
func (ini iniConfig) Set(key, value string) {
key = canonicalINIKey(key)
valueInfo := iniValue { }
if prevValues := ini[key]; len(prevValues) > 0 {
valueInfo = prevValues[0]
}
valueInfo.value = value
ini[key] = []iniValue { valueInfo }
}
func (ini iniConfig) NewConfigError(key string, index int, wrapped error) ConfigError {
if values, ok := ini[key]; ok {
if index > 0 && index < len(values) {
value := values[index]
return ConfigError {
File: value.file,
Key: key,
Line: value.line,
Column: value.column,
Err: wrapped,
}
}
}
return ConfigError {
Key: key,
Err: wrapped,
}
}
func canonicalINIKey(key string) string {
return strings.ToLower(key)
}
func mergeINI(inis ...iniConfig) iniConfig {
ini := make(iniConfig)
for index := len(inis) - 1; index >= 0; index -- {
for key, values := range inis[index] {
if _, exists := ini[key]; exists { continue }
ini[key] = slices.Clone(values)
}
}
return ini
}
func configFiles(program string) ([]string, error) {
userConfig, err := os.UserConfigDir()
if err != nil { return nil, err }
return []string {
filepath.Join("/etc", program),
filepath.Join(userConfig, program),
}, nil
}

93
ini_test.go Normal file
View File

@ -0,0 +1,93 @@
package camfish
import "reflect"
import "testing"
func TestParseINI_LF(test *testing.T) {
ini, err := ParseINI("input",
`thing= " Quoted string!!!!! "
Other-Thing = askdjlksajd
number = 3849
# comment
#also a comment
[section0]
value=1
[section1.a]
#dkjsaf
value=7
`)
if err != nil {
test.Fatal(err)
}
test.Log("INI:")
test.Log(ini)
if ini.Get("thing") != "\tQuoted string!!!!! " {
test.Fatal("value is not correct")
}
if ini.Get("other-thing") != "askdjlksajd" {
test.Fatal("value is not correct")
}
if ini.Get("number") != "3849" {
test.Fatal("value is not correct")
}
if ini.Get("section0.value") != "1" {
test.Fatal("value is not correct")
}
if ini.Get("section1.a.value") != "7" {
test.Fatal("value is not correct")
}
}
func TestParseINI_CRLF(test *testing.T) {
ini, err := ParseINI("input", "thing= \"\tQuoted string!!!!! \"\r\nOther-Thing = askdjlksajd\r\n\r\nnumber = 3849\r\n# comment\r\n #also a comment\r\n[section0]\r\nvalue=1\r\n\r\n[section1.a]\r\n#dkjsaf\r\nvalue=7\r\n")
if err != nil {
test.Fatal(err)
}
test.Log("INI:")
test.Log(ini)
if ini.Get("thing") != "\tQuoted string!!!!! " {
test.Fatal("value is not correct")
}
if ini.Get("other-thing") != "askdjlksajd" {
test.Fatal("value is not correct")
}
if ini.Get("number") != "3849" {
test.Fatal("value is not correct")
}
if ini.Get("section0.value") != "1" {
test.Fatal("value is not correct")
}
if ini.Get("section1.a.value") != "7" {
test.Fatal("value is not correct")
}
}
func TestMergeINI(test *testing.T) {
iv := func(value string) iniValue {
return iniValue { value: value }
}
ini := mergeINI(iniConfig {
"foo": []iniValue { iv("bar") },
"baz": []iniValue { iv("something") },
},
iniConfig {
"baz": []iniValue { iv("value 1"), iv("value 2") },
},
iniConfig {
"thing": []iniValue { iv("????") },
});
test.Log(ini)
if !reflect.DeepEqual(ini, iniConfig {
"foo": []iniValue { iv("bar") },
"baz": []iniValue { iv("value 1"), iv("value 2") },
"thing": []iniValue { iv("????") },
}) {
test.Fatal("not equal")
}
}

266
phases.go Normal file
View File

@ -0,0 +1,266 @@
package camfish
import "os"
import "fmt"
import "log"
import "io/fs"
import "errors"
import "context"
import "strings"
import "path/filepath"
import "git.tebibyte.media/sashakoshka/go-cli"
import "git.tebibyte.media/sashakoshka/go-service/daemon"
import "git.tebibyte.media/sashakoshka/go-service/rotate"
func (this *environment) phase10FlagParsing() bool {
// create flag set and specify built-in flags
set := flagSet {
name: this.name,
description: this.description,
}
flagHelp := set.Flag('h', "help", "Display usage information and exit", nil)
flagPidFile := set.Flag('p', "pid-file", "Write the PID to the specified file", cli.ValString)
flagUser := set.Flag('u', "user", "The user:group to run as", cli.ValString)
flagLogDirectory := set.Flag('l', "log-directory", "Write logs to the specified directory", cli.ValString)
flagConfigFile := set.Flag('c', "config-file", "Use this configuration file", cli.ValString)
flagVerbose := set.Flag('v', "verbose", "Enable verbose output/logging", nil)
// ask actors to add flags
actors, done := this.actors.RBorrow()
defer done()
for _, actor := range sortActors(actors, actors.flagAdder.all()) {
actor.AddFlags(&set)
}
// parse flags
err := set.parse(os.Args)
if err != nil {
fmt.Fprintf(os.Stderr, "%s: %v\n", os.Args[0], err)
set.usage()
return false
}
// handle built-in flags
if _, ok := flagHelp.First(); ok {
set.usage()
return false
}
if pidFile, ok := flagPidFile.First(); ok {
this.flags.pidFile = pidFile
}
if user, ok := flagUser.First(); ok {
this.flags.user = user
}
if logDirectory, ok := flagLogDirectory.First(); ok {
this.flags.logDirectory = logDirectory
}
if configFile, ok := flagConfigFile.First(); ok {
this.flags.configFile = configFile
}
if _, ok := flagVerbose.First(); ok {
this.flags.verbose = true
}
return true
}
func (this *environment) phase13PidFileCreation() bool {
if this.flags.pidFile != "" {
err := daemon.PidFile(this.flags.pidFile).Start()
if err != nil {
fmt.Fprintf(os.Stderr, "%s: %v\n", os.Args[0], err)
return false
}
}
return true
}
func (this *environment) phase17PrivilegeDropping() bool {
if this.flags.user != "" {
if this.Verb() {
fmt.Fprintf(os.Stderr,
"%s: dropping privilege to %s\n",
os.Args[0], this.flags.user)
}
user, group, _ := strings.Cut(this.flags.user, ":")
err := dropPrivelege(user, group)
fmt.Fprintf(os.Stderr, "%s: could not drop privilege %v\n", os.Args[0], err)
}
return true
}
func (this *environment) phase20LogSwitching() bool {
if this.flags.logDirectory != "" {
directory := this.flags.logDirectory
if this.Verb() {
fmt.Fprintf(os.Stderr,
"%s: logging to %s\n",
os.Args[0], directory)
}
directory, err := filepath.Abs(directory)
if err != nil {
fmt.Fprintf(os.Stderr, "%s: could not rotate logs: %v\n", os.Args[0], err)
return false
}
logger, err := rotate.New(directory)
if err != nil {
fmt.Fprintf(os.Stderr, "%s: could not rotate logs: %v\n", os.Args[0], err)
return false
}
defer logger.Close()
log.SetOutput(logger)
}
log.Printf("====== [%s] START =======", this.name)
log.Printf("(i) (20) CAMFISH environment %s", version)
logActors(All())
return true
}
func (this *environment) phase30ConfigurationParsing() bool {
if this.Verb() { log.Println("... (30) parsing configuration") }
// blank config if nothing happens
this.conf = make(iniConfig)
// get list of config files
paths, err := configFiles(this.name)
if err != nil {
log.Println("!!! (30) could not determine location of file(s):", err)
return true
}
if this.flags.configFile != "" {
paths = append(paths, this.flags.configFile)
}
// parse every config and merge them all
configs := make([]iniConfig, 0, len(paths))
for _, path := range paths {
file, err := os.Open(path)
if err != nil {
if !errors.Is(err, fs.ErrNotExist) {
log.Println("!!! (30) file present but inaccessible:", err)
} else if path == this.flags.configFile {
log.Println("!!! (30)", err)
}
continue
}
defer file.Close()
config, err := DecodeINI(path, file)
if err != nil {
log.Println("!!! (30) could not parse:", err)
continue
}
configs = append(configs, config.(iniConfig))
}
this.conf = mergeINI(configs...)
if this.Verb() { log.Println(".// (30) parsed configuration") }
return true
}
func (this *environment) phase40ConfigurationProcessing() bool {
if this.Verb() { log.Println("... (40) processing configuration") }
actors, done := this.actors.RBorrow()
defer done()
for _, actor := range sortActors(actors, actors.configProcessor.all()) {
err := actor.ProcessConfig(this.conf)
if err != nil {
log.Println("XXX (50) could not process configuration:", err)
return false
}
}
if this.Verb() { log.Println(".// (40) processed configuration") }
return true
}
func (this *environment) phase50ConfigurationApplication() bool {
if this.Verb() { log.Println("... (50) applying configuration") }
err := this.applyConfig()
if err != nil {
log.Println("XXX (50) could not apply configuration:", err)
return false
}
actors, done := this.actors.RBorrow()
defer done()
for _, actor := range sortActors(actors, actors.configurable.all()) {
err := actor.Configure(this.conf)
if err != nil {
log.Printf (
"XXX (50) could not apply configuration to %s: %v",
actor.(Actor).Type(), err)
return false
}
}
if this.Verb() { log.Println(".// (50) applied configuration") }
return true
}
func (this *environment) phase60Initialization() bool {
if this.Verb() { log.Println("... (60) initializing") }
var initializable []Initializable
func() {
actors, done := this.actors.RBorrow()
defer done()
initializable = actors.initializable.all()
}()
if err := this.initializeActors(this.ctx, initializable...); err != nil {
log.Println(".// (60) failed to initialize:", err)
return false
}
if this.Verb() { log.Println(".// (60) initialized") }
return true
}
func (this *environment) phase70Running() bool {
defer this.Done(nil)
if this.Verb() { log.Println("... (70) starting up") }
this.running.Store(true)
defer this.running.Store(false)
func() {
actors, done := this.actors.RBorrow()
defer done()
for _, actor := range actors.runnable.all() {
this.start(actor)
}
}()
log.Println(".// (70) startup sequence complete")
// await context cancellation or waitgroup completion
wgChannel := make(chan struct { }, 1)
go func() {
this.group.Wait()
wgChannel <- struct { } { }
}()
select {
case <- this.ctx.Done():
if this.Verb() { log.Println("(i) (70) canceled") }
case <- wgChannel:
if this.Verb() { log.Println("(i) (70) all actors have finished") }
}
return true
}
func (this *environment) phase70_5Trimming() bool {
if this.Verb() { log.Println("... (70.5) trimming") }
var trimmable []Trimmable
func() {
actors, done := this.actors.RBorrow()
defer done()
trimmable = actors.trimmable.all()
}()
if err := this.trimActors(this.ctx, trimmable...); err != nil {
log.Println(".// (70.5) failed to trim:", err)
return false
}
if this.Verb() { log.Println(".// (70.5) trimmed") }
return true
}
func (this *environment) phase80Shutdown() bool {
cause := context.Cause(this.ctx)
if cause != nil {
log.Println("XXX (80) shutting down because:", cause)
}
log.Println("... (80) waiting for actors to shut down")
defer func() {
log.Println(".// (80) shutdown succeeded, goodbye")
log.Printf("====== [%s] END =======", this.name)
}()
this.group.Wait()
return cause == nil
}

124
run.go Normal file
View File

@ -0,0 +1,124 @@
package camfish
import "iter"
import "context"
var env environment
// Run runs the daemon given the slice of actors, and shuts down the program
// when all running actors have stopped. Error and log messages will be printed.
// The correct way to use this function is to have it be the only thing in main:
//
// func main () {
// camfish.Run("name", "what it does", new(SomeActor), new(AnotherActor))
// }
//
// Run operates in several phases. In each phase that involves actors, the
// actors are operated on in the order they are specified in the variadic actors
// argument. This as well as the order of the phases is considered part of the
// API and are stable for versions above v1.0.0, but some exact details such as
// timing are not and may change in the future. The phases are as follows:
//
// 10. Flag parsing: Actors which implement [FlagAdder] are given an object
// which will allow them to add command line flags. Actors are given the
// object one after the other in the order that they are specified in the
// vararg list for Run. The flags are then parsed, giving values to the
// actors that requested them.
//
// 20. Log switching: The environment begins redirecting all logging output to
// a file if specified in the flags. After Run exits, logging will be
// redirected back to [os.Stderr].
//
// 30. Configuration parsing: The configuration file is parsed. If a specific
// file was not specified by a flag, it is loaded from
// /etc/<name>/<name>.conf
//
// 40. Configuration processing: Actors which implement [ConfigProcessor] are
// given a MutableConfig to read and modify. Actors are given the config
// one after the other in the order that they are specified in the vararg
// list for Run. Actors can use flag values they have received to override
// config values, apply macros, etc.
//
// 50. Configuration application: Actors which implement [Configurable] are
// given a [Config] to read. The order is not guaranteed.
//
// 60. Initialization: Actors which implement [Initializable] are initialized
// in parallel. During this time, actors may do things like establish
// network connections, start up servers, initialize data structures, etc.
// Actors may establish references to each-other during this time, but
// they must not interact yet. The amount of time actors have to do this
// is configurable, but by default it is 8 minutes. The vast majority of
// actors should initialize in under 100 milliseconds.
//
// 70. Running: Actors which implement [Runnable] are run, each in their own
// goroutine. The environment is able to restart actors which have failed,
// which entails resetting the actor if it implements [Resettable], and
// running the actor again within the same goroutine. If an actor does not
// run for a meaningful amount of time after resetting/initialization
// before failing, it is considered erratic and further attempts to restart
// it will be spaced by a limited, constantly increasing time interval. The
// timing is configurable, but by default the threshold for a meaningful
// amount of runtime is 16 seconds, the initial delay interval is 8
// seconds, the interval increase per attempt is 8 seconds, and the maximum
// interval is one hour. Additionally, programs which implement [Trimmable]
// will be trimmed regularly whenever they are running. The trimming
// interval is also configurable, but by default it is once every minute.
// When an actor which implements [Resettable] is reset, it is given a
// configurable timeout, which is 8 minutes by default.
//
// 80. Shutdown: This can be triggered by all actors being removed from the
// environment, a catastrophic error, [Done] being called, or the program
// recieving SIGINT. If necessary, the environment shuts down all running
// actors and waits for them to stop. If they do not all stop in time, an
// error message is printed and the program will exit with a non-zero code.
// Otherwise, it will exit with a code of 0. The amount of time actors
// have to shut down is configurable, but by default it is 8 minutes.
func Run(name, description string, actors ...Actor) {
env.Run(name, description, actors...)
}
// Done sends a shutdown signal to the environment with the given "cause" error.
// This will be logged as the reason for the shutdown.
func Done(cause error) {
env.Done(cause)
}
// Add adds an actor to the environment, starting it if necessary. If the
// environment is not running, it does nothing. Note that this function will
// block the current goroutine while the actors are initializing.
func Add(ctx context.Context, actors ...Actor) error {
return env.Add(ctx, actors...)
}
// Del removes an actor from the environment, stopping it if necessary. If the
// environment is not running, it does nothing. Note that this function will
// block the current goroutine while the actors are shutting down.
func Del(ctx context.Context, actors ...Actor) error {
return env.Del(ctx , actors...)
}
// Find finds an actor in the environment with the given type name. If no actor
// is found or the environment is not running, it returns nil.
func Find(typ string) Actor {
return env.Find(typ)
}
// FindAll returns an iterator over all actors in the environment with the given
// type name. If the environment is not running, it returns an empty iterator.
func FindAll(typ string) iter.Seq[Actor] {
return env.FindAll(typ)
}
// All returns an iterator over all actors in the environment. If the
// environment is not running, it returns an empty iterator.
func All() iter.Seq[Actor] {
return env.All()
}
// Verb returns true if verbose output is permitted. Actors should log less
// information when this is false.
func Verb() bool {
return env.Verb()
}
// tell me how tf its snowiung outside if its 33° f

104
util.go Normal file
View File

@ -0,0 +1,104 @@
package camfish
import "fmt"
import "log"
import "time"
import "iter"
import "errors"
import "strconv"
import "syscall"
import "os/user"
import "strings"
import "context"
import "sync/atomic"
import "unicode/utf8"
func defaul[T comparable](value, def T) T {
var zero T
if value == zero { return def }
return value
}
func panicWrap(ctx context.Context, f func (context.Context) error) (err error) {
defer func () {
if pan := recover(); pan != nil {
if panErr, ok := pan.(error); ok {
err = panErr
} else {
err = errors.New(fmt.Sprint(pan))
}
}
} ()
err = f(ctx)
return
}
type atomicDuration atomic.Int64
type atomicInt64Ptr = *atomic.Int64
func (duration* atomicDuration) Load() time.Duration {
return time.Duration((*atomic.Int64)(duration).Load())
}
func (duration *atomicDuration) Store(newDuration time.Duration) {
(*atomic.Int64)(duration).Store(int64(newDuration))
}
func logActors (actors iter.Seq[Actor]) {
output := "actors: "
x := utf8.RuneCountInString(output)
first := true
line := func () {
if output == "" { return }
if first {
first = false
log.Println("(i)", output)
} else {
log.Println(" ", output)
}
output = ""
x = 0
}
types := make(map[string] int)
for actor := range actors {
types[actor.Type()] += 1
}
for typ, count := range types {
if count > 1 {
typ += fmt.Sprintf("(%d)", count)
}
typ += ", "
x += 2
typeLen := utf8.RuneCountInString(typ)
if x + typeLen >= 60 {
line()
}
output += typ
x += typeLen
}
if output != "" {
output = strings.TrimSuffix(output, ", ")
}
line()
}
func dropPrivelege (usr, group string) error {
if group != "" {
groupInfo, err := user.LookupGroup(group)
if err != nil { return err }
gid, err := strconv.Atoi(groupInfo.Gid)
if err != nil { return err }
err = syscall.Setgid(gid)
if err != nil { return err }
}
if usr != "" {
usrInfo, err := user.Lookup(usr)
if err != nil { return err }
uid, err := strconv.Atoi(usrInfo.Uid)
if err != nil { return err }
err = syscall.Setuid(uid)
if err != nil { return err }
}
return nil
}

35
util_test.go Normal file
View File

@ -0,0 +1,35 @@
package camfish
import "errors"
import "testing"
import "context"
func TestDefaul(test *testing.T) {
value := defaul("", "askjd")
test.Log(value)
if value != "askjd" { test.Fatal("not equal") }
value1 := defaul("zzzz", "askjd")
test.Log(value1)
if value1 != "zzzz" { test.Fatal("not equal") }
value2 := defaul(0, 3)
test.Log(value2)
if value2 != 3 { test.Fatal("not equal") }
}
func TestPanicWrap(test *testing.T) {
err := panicWrap(context.Background(), func (ctx context.Context) error {
return errors.New("test case 0")
})
test.Log(err)
if err.Error() != "test case 0" { test.Fatal("not equal") }
err = panicWrap(context.Background(), func (ctx context.Context) error {
panic(errors.New("test case 1"))
})
test.Log(err)
if err.Error() != "test case 1" { test.Fatal("not equal") }
err = panicWrap(context.Background(), func (ctx context.Context) error {
return nil
})
test.Log(err)
if err != nil { test.Fatal("not equal") }
}

3
version.go Normal file
View File

@ -0,0 +1,3 @@
package camfish
const version = "v0.0.0"