From b3913d8078fef88fa4e3c429b4472d912c9d53c2 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Tue, 31 Dec 2024 01:27:53 -0500 Subject: [PATCH] Initial commit --- README.md | 7 + actor.go | 90 +++++++++ actorset.go | 178 +++++++++++++++++ actorset_test.go | 282 +++++++++++++++++++++++++++ assets/icon.png | Bin 0 -> 8440 bytes assets/icon.svg | 65 +++++++ config.go | 20 ++ cron.go | 48 +++++ doc.go | 7 + environment.go | 389 ++++++++++++++++++++++++++++++++++++++ environment_test.go | 7 + error.go | 103 ++++++++++ error_test.go | 74 ++++++++ examples/pipeline/main.go | 93 +++++++++ flag.go | 142 ++++++++++++++ flag_test.go | 161 ++++++++++++++++ go.mod | 10 + go.sum | 8 + ini.go | 168 ++++++++++++++++ ini_test.go | 93 +++++++++ phases.go | 266 ++++++++++++++++++++++++++ run.go | 124 ++++++++++++ util.go | 104 ++++++++++ util_test.go | 35 ++++ version.go | 3 + 25 files changed, 2477 insertions(+) create mode 100644 README.md create mode 100644 actor.go create mode 100644 actorset.go create mode 100644 actorset_test.go create mode 100644 assets/icon.png create mode 100644 assets/icon.svg create mode 100644 config.go create mode 100644 cron.go create mode 100644 doc.go create mode 100644 environment.go create mode 100644 environment_test.go create mode 100644 error.go create mode 100644 error_test.go create mode 100644 examples/pipeline/main.go create mode 100644 flag.go create mode 100644 flag_test.go create mode 100644 go.mod create mode 100644 go.sum create mode 100644 ini.go create mode 100644 ini_test.go create mode 100644 phases.go create mode 100644 run.go create mode 100644 util.go create mode 100644 util_test.go create mode 100644 version.go diff --git a/README.md b/README.md new file mode 100644 index 0000000..ee7c87a --- /dev/null +++ b/README.md @@ -0,0 +1,7 @@ +# CAMFISH + +*Concurrent Actor Model Framework for Internet Services on Holanet* + +CAMFISH is a Go framework for creating daemons using an actor-model +architecture. It is designed to be fault-tolerant, and is capable of restarting +individual parts of your program (actors) if they fail. diff --git a/actor.go b/actor.go new file mode 100644 index 0000000..52c26e7 --- /dev/null +++ b/actor.go @@ -0,0 +1,90 @@ +package camfish + +import "context" + +// Actor is a participant in the environment. All public methods on an actor +// must be safe for concurrent use by multiple goroutines. Additionally, any +// type which explicitly implements Actor should: +// - Treat all public fields, values, indices, etc. as immutable +// - Satisfy Actor as a pointer, not a value +// - Not have a constructor +type Actor interface { + // Type returns the type name of the actor. The value returned from this + // is used to locate actors capable of performing a specific task, so it + // absolutely must return the same string every time. Actors implemented + // in packages besides this one (i.e. not camfish) must not return the + // string "cron". + Type() string +} + +// FlagAdder is any object that can add [Flag]s to a [FlagSet]. Actors which +// implement this interface will be called upon to add flags during and only +// during the flag parsing phase. +type FlagAdder interface { + // AddFlags adds flags to set. The object must not retain or distribute + // any reference to set. + AddFlags(set FlagSet) +} + +// ConfigProcessor is any object that can read and modify a configuration before +// it is used. Actors which implement this interface will be called upon to +// process the config during and only during the configuration processing phase. +type ConfigProcessor interface { + // Process processes the config. + ProcessConfig(conf MutableConfig) error +} + +// Configurable is any object that must be configured before use or +// initialization (if applicable). Actors which implement this interface will be +// configured during the configuration phase, or when they are added. +type Configurable interface { + // Configure configures the object. It must not make any attempt to + // modify conf, and it must not retain or distribute any reference to + // conf. + Configure(conf Config) error +} + +// Initializable is any object that must be initialized before use. Actors which +// implement this interface will be initialized during the initialization phase, +// or when they are added. +type Initializable interface { + // Init initializes the object. It must return before the context + // expires, and must return ctx.Err if there is no other error to be + // returned. If Init returns an error, the object must be treated as + // invalid and any process which depends on it should be shut down. + Init(ctx context.Context) error +} + +// Runnable is any object with a central, long-running routine. Actors which +// implement this interface will be run after they have been initialized, +// configured, etc. (if applicable). The environment will attempt to restart +// actors if their run method fails, see the documentation for this package's +// [Run] function for details. +type Runnable interface { + // Run runs the object. It must return when or before the context + // expires, and must return ctx.Err if there is no other error to be + // returned. + Run(ctx context.Context) error +} + +// Trimmable is any object that needs to have a task run every so often. This +// can be garbage collecting, sanity checking, etc. Actors which implement this +// interface will be routinely trimmed while running. See the documentation for +// this package's [Run] function for details. +type Trimmable interface { + // Trim trims the object. It must return when or before the context + // expires, and must return ctx.Err if there is no other error to be + // returned. + Trim(ctx context.Context) error +} + +// Resettable is any object that must be reset after failure and before re-use. +// Actors which implement this interface will be reset after their Run method +// (if applicable) has failed and is about to be called again. +type Resettable interface { + // Reset resets the object. It must return when or before the context + // expires, and must return ctx.Err if there is no other error to be + // returned. If Reset returns an error, the object must be treated as + // invalid and any process which depends on it should be shut down. + Reset(ctx context.Context) error +} diff --git a/actorset.go b/actorset.go new file mode 100644 index 0000000..eacced9 --- /dev/null +++ b/actorset.go @@ -0,0 +1,178 @@ +package camfish + +import "iter" +import "slices" +import "context" + +// actorSets stores a sorted set of all actors, as well as information about +// their state. +type actorSets struct { + all actorSet[Actor] + inf map[Actor] actorInfo + nextOrder int + + flagAdder actorSet[FlagAdder] + configProcessor actorSet[ConfigProcessor] + configurable actorSet[Configurable] + initializable actorSet[Initializable] + runnable actorSet[Runnable] + trimmable actorSet[Trimmable] +} + +// actorInfo holds information about a running actor. +type actorInfo struct { + // ctx is the context of the actor. It is passed to the actor's Run + // method. + ctx context.Context + // done is used to stop the actor. It is created when the actor is added + // to the actorSets. + done func() + // stopped is closed once the actor's run method exits. If the actor + // does not have a run method, it will be nil. + stopped chan struct { } + // order is the semantic order of the actor within the sets. This is + // incremented automatically for each actor that is added. It is never + // less than one. + order int +} + +// actorSetIface holds only the add/del/clear methods of actorSet. +type actorSetIface interface { + add(actor Actor) + del(actor Actor) + clear() +} + +// All returns an iterator over all sets in the collection. +func (sets *actorSets) All() iter.Seq[actorSetIface] { + return func(yield func(actorSetIface) bool) { + yield(&sets.all) + yield(&sets.flagAdder) + yield(&sets.configProcessor) + yield(&sets.configurable) + yield(&sets.initializable) + yield(&sets.runnable) + yield(&sets.trimmable) + } +} + +// add adds an actor under the given parent context. This is a write operation. +func (this *actorSets) add(ctx context.Context, actor Actor) { + if this.inf == nil { this.inf = make(map[Actor] actorInfo)} + actorCtx, done := context.WithCancel(ctx) + this.nextOrder ++ + info := actorInfo { + ctx: actorCtx, + done: done, + order: this.nextOrder, + } + if _, ok := actor.(Runnable); ok { + info.stopped = make(chan struct { }) + } + this.inf[actor] = info + for set := range this.All() { + set.add(actor) + } +} + +// del removes an actor. This is a write operation. +func (this *actorSets) del(actor Actor) { + delete(this.inf, actor) + for set := range this.All() { + set.del(actor) + } +} + +// clear removes all actors. +func (this *actorSets) clear() { + clear(this.inf) + for set := range this.All() { + set.clear() + } +} + +// info gets information about an actor. +func (this *actorSets) info(actor Actor) actorInfo { + if this.inf == nil { return actorInfo { } } + return this.inf[actor] +} + +// sortActors sorts actors according to the order in which they were added. +func sortActors[T comparable] (sets *actorSets, actors []T) []T { + slices.SortFunc(actors, func (left, right T) int { + // :3 + leftOrder := sets.info(any(left).(Actor)).order + rightOrder := sets.info(any(right).(Actor)).order + if leftOrder < 1 || rightOrder < 1 { + panic("could not sort actors, some were invalid") + } + switch { + case leftOrder < rightOrder: return -1 + case rightOrder > leftOrder: return 1 + default: return 0 + } + }) + return actors +} + +// actorSet is a list that holds all actors of a type. it must not return any +// references to data that is referenced elsewehre, except for references to +// actors. +type actorSet[T comparable] struct { + actors map[T] string +} + +// add adds an actor, if it is of the set's type +func (this *actorSet[T]) add(actor Actor) { + if this.actors == nil { this.actors = make(map[T] string) } + if item, ok := actor.(T); ok { + this.actors[item] = actor.Type() + } +} + +// add removes an actor, if it is of the set's type +func (this *actorSet[T]) del(actor Actor) { + if this.actors == nil { return } + if item, ok := actor.(T); ok { + delete(this.actors, item) + } +} + +// clear removes all actors. +func (this *actorSet[T]) clear() { + if this.actors == nil { return } + clear(this.actors) +} + +// find returns the first actor with the given type. +func (this *actorSet[T]) find(typ string) T { + for actor, actorType := range this.actors { + if actorType == typ { return actor } + } + var zero T + return zero +} + +// findAll returns all actors with the given type. This method allocates and +// returns a slice instead of an iterator for concurrency reasons. +func (this *actorSet[T]) findAll(typ string) []T { + if this.actors == nil { return nil } + slice := []T { } + for actor, actorType := range this.actors { + if actorType == typ { + slice = append(slice, actor) + } + } + return slice +} + +// all returns all actors. This method allocates and returns a slice instead of +// an iterator for concurrency reasons. +func (this *actorSet[T]) all() []T { + if this.actors == nil { return nil } + slice := []T { } + for actor := range this.actors { + slice = append(slice, actor) + } + return slice +} diff --git a/actorset_test.go b/actorset_test.go new file mode 100644 index 0000000..b382d94 --- /dev/null +++ b/actorset_test.go @@ -0,0 +1,282 @@ +package camfish + +import "context" +import "testing" + +type mockInitializable string +func (this *mockInitializable) Type() string { return string(*this) } +func (*mockInitializable) Init(ctx context.Context) error { return ctx.Err() } + +func TestActorSet(test *testing.T) { + set := actorSet[Initializable] { } + + // add actors + actor0 := mockInitializable("type0") + test.Logf("actor0: %p", &actor0) + set.add(&actor0) + if len(set.actors) != 1 { + test.Fatalf("wrong length: %d", len(set.actors)) + } + if _, ok := set.actors[&actor0]; !ok { + test.Fatal("missing item") + } + actor1 := mockInitializable("type0") + test.Logf("actor1: %p", &actor1) + set.add(&actor1) + if len(set.actors) != 2 { + test.Fatalf("wrong length: %d", len(set.actors)) + } + if _, ok := set.actors[&actor1]; !ok { + test.Fatal("missing item") + } + actor2 := mockInitializable("type1") + test.Logf("actor2: %p", &actor2) + set.add(&actor2) + if len(set.actors) != 3 { + test.Fatalf("wrong length: %d", len(set.actors)) + } + if _, ok := set.actors[&actor2]; !ok { + test.Fatal("missing item") + } + + // find + found := set.find("type0") + test.Log("find:", found) + if found != &actor0 && found != &actor1 { + test.Fatalf("not equal: %p", found) + } + found = set.find("type1") + test.Log("find:", found) + if found != &actor2 { + test.Fatalf("not equal: %p", found) + } + + // findAll + foundAll := set.findAll("type0") + test.Log("findAll:", foundAll) + if len(foundAll) != 2 { + test.Fatalf("wrong length: %d", len(foundAll)) + } + if !mockInitializableIn(foundAll, &actor0) { + test.Fatal("missing item") + } + if !mockInitializableIn(foundAll, &actor1) { + test.Fatal("missing item") + } + foundAll = set.findAll("type1") + test.Log("findAll:", foundAll) + if len(foundAll) != 1 { + test.Fatalf("wrong length: %d", len(foundAll)) + } + if !mockInitializableIn(foundAll, &actor2) { + test.Fatal("missing item") + } + + // all + all := set.all() + test.Log("all:", all) + if len(all) != 3 { + test.Fatalf("wrong length: %d", len(all)) + } + if !mockInitializableIn(all, &actor0) { + test.Fatal("missing item") + } + if !mockInitializableIn(all, &actor1) { + test.Fatal("missing item") + } + if !mockInitializableIn(all, &actor2) { + test.Fatal("missing item") + } + + // del + set.del(&actor1) + test.Log("del") + if len(set.actors) != 2 { + test.Fatalf("wrong length: %d", len(set.actors)) + } + if _, ok := set.actors[&actor1]; ok { + test.Fatal("leaked item") + } + + // find + found = set.find("type0") + test.Log("find:", found) + if found != &actor0 { + test.Fatalf("not equal: %p", found) + } + found = set.find("type1") + test.Log("find:", found) + if found != &actor2 { + test.Fatalf("not equal: %p", found) + } + + // findAll + foundAll = set.findAll("type0") + test.Log("findAll:", foundAll) + if len(foundAll) != 1 { + test.Fatalf("wrong length: %d", len(foundAll)) + } + if !mockInitializableIn(foundAll, &actor0) { + test.Fatal("missing item") + } + if mockInitializableIn(foundAll, &actor1) { + test.Fatal("leaked item") + } + foundAll = set.findAll("type1") + test.Log("findAll:", foundAll) + if len(foundAll) != 1 { + test.Fatalf("wrong length: %d", len(foundAll)) + } + if !mockInitializableIn(foundAll, &actor2) { + test.Fatal("missing item") + } + + // all + all = set.all() + test.Log("all:", all) + if len(all) != 2 { + test.Fatalf("wrong length: %d", len(all)) + } + if !mockInitializableIn(all, &actor0) { + test.Fatal("missing item") + } + if mockInitializableIn(all, &actor1) { + test.Fatal("leaked item") + } + if !mockInitializableIn(all, &actor2) { + test.Fatal("missing item") + } +} + +func TestActorSets(test *testing.T) { + sets := actorSets { } + + actor0 := mockInitializable("type0") + actor1 := mockInitializable("type1") + sets.add(context.Background(), &actor0) + sets.add(context.Background(), &actor1) + test.Log("add") + + if len(sets.inf) != 2 { + test.Fatalf("wrong length: %d", len(sets.inf)) + } + if len(sets.all.actors) != 2 { + test.Fatalf("wrong length: %d", len(sets.all.actors)) + } + if _, ok := sets.all.actors[&actor0]; !ok { + test.Fatal("missing item") + } + if _, ok := sets.all.actors[&actor1]; !ok { + test.Fatal("missing item") + } + if len(sets.initializable.actors) != 2 { + test.Fatalf("wrong length: %d", len(sets.initializable.actors)) + } + if _, ok := sets.initializable.actors[&actor0]; !ok { + test.Fatal("missing item") + } + if _, ok := sets.initializable.actors[&actor1]; !ok { + test.Fatal("missing item") + } + if len(sets.configProcessor.actors) != 0 { + test.Fatalf("wrong length: %d", len(sets.configProcessor.actors)) + } + if len(sets.configurable.actors) != 0 { + test.Fatalf("wrong length: %d", len(sets.configProcessor.actors)) + } + if len(sets.runnable.actors) != 0 { + test.Fatalf("wrong length: %d", len(sets.runnable.actors)) + } + if len(sets.trimmable.actors) != 0 { + test.Fatalf("wrong length: %d", len(sets.trimmable.actors)) + } + + info := sets.info(&actor0) + test.Log("info:", info) + if info.ctx == nil { test.Fatal("value is nil") } + if info.done == nil { test.Fatal("value is nil") } + if info.order != 1 { test.Fatal("not equal") } + + info = sets.info(&actor1) + test.Log("info:", info) + if info.ctx == nil { test.Fatal("value is nil") } + if info.done == nil { test.Fatal("value is nil") } + if info.order != 2 { test.Fatal("not equal") } + + sets.del(&actor0) + test.Log("del") + + if len(sets.inf) != 1 { + test.Fatalf("wrong length: %d", len(sets.inf)) + } + if len(sets.all.actors) != 1 { + test.Fatalf("wrong length: %d", len(sets.all.actors)) + } + if _, ok := sets.all.actors[&actor1]; !ok { + test.Fatal("missing item") + } + if len(sets.initializable.actors) != 1 { + test.Fatalf("wrong length: %d", len(sets.initializable.actors)) + } + if _, ok := sets.initializable.actors[&actor1]; !ok { + test.Fatal("missing item") + } + + info = sets.info(&actor0) + test.Log("info:", info) + if info.ctx != nil { test.Fatal("value is non-nil") } + if info.done != nil { test.Fatal("value is non-nil") } + + info = sets.info(&actor1) + test.Log("info:", info) + if info.ctx == nil { test.Fatal("value is nil") } + if info.done == nil { test.Fatal("value is nil") } + + sets.del(&actor1) + test.Log("del") + + if len(sets.configProcessor.actors) != 0 { + test.Fatalf("wrong length: %d", len(sets.configProcessor.actors)) + } + if len(sets.configurable.actors) != 0 { + test.Fatalf("wrong length: %d", len(sets.configProcessor.actors)) + } + if len(sets.initializable.actors) != 0 { + test.Fatalf("wrong length: %d", len(sets.configProcessor.actors)) + } + if len(sets.runnable.actors) != 0 { + test.Fatalf("wrong length: %d", len(sets.runnable.actors)) + } + if len(sets.trimmable.actors) != 0 { + test.Fatalf("wrong length: %d", len(sets.trimmable.actors)) + } +} + +func TestSortActors(test *testing.T) { + sets := actorSets { } + + actor0 := mockInitializable("type0") + actor1 := mockInitializable("type1") + actor2 := mockInitializable("type2") + actor3 := mockInitializable("type3") + sets.add(context.Background(), &actor0) + sets.add(context.Background(), &actor1) + sets.add(context.Background(), &actor2) + sets.add(context.Background(), &actor3) + test.Log("add") + + sorted := sortActors(&sets, []Actor { &actor1, &actor3, &actor2, &actor0 }) + test.Log("sort:", sorted) + if len(sorted) != 4 { test.Fatalf("wrong length: %d", len(sorted)) } + if sorted[0] != &actor0 { test.Fatal("wrong position") } + if sorted[1] != &actor1 { test.Fatal("wrong position") } + if sorted[2] != &actor2 { test.Fatal("wrong position") } + if sorted[3] != &actor3 { test.Fatal("wrong position") } +} + +func mockInitializableIn(haystack []Initializable, needle *mockInitializable) bool { + for _, initializable := range haystack { + if initializable == needle { return true } + } + return false +} diff --git a/assets/icon.png b/assets/icon.png new file mode 100644 index 0000000000000000000000000000000000000000..9ec021b6df0f251c6847e526d10d6139fa073889 GIT binary patch literal 8440 zcmd^Fs!@k?kLKE$sh>v}RH=Vuk&OtENkL`h(x~*qE{H&-K@5z>_C0x|aSR%) z)_>X%NNycrlXRbVMJi?G__S3d;qkyhNdM{bVw&Au+i+?Ht?+e%Q0nz(0O@C8GT^Q# z4~R3g{XP&nm`sGGW=R036|89ih1co?5Yo8k{~upE$Pa03}c-Zl~?+z&!-VJt+b8(+xQhAE8e<)V*TRZ0en~$PLz!1rg-R)i;wq z3r)$p!?ZHP=)|Ll_vfkzH3CQ!F<15W$imwW5_p*%AWMjKCHfSUx9ncdst>^EK)o=a zO-{b{IuP3sxbFWsKCpq9`ixy!G9URdQKQ57sn@%wLabGn_d-UA??oYO&U$V}(Cmth zH&z0TFWgLeBd!R@NW_HxE-7nfAXbDsIG=rU4WT+yWzFH#w=0d_LRL`V&0UAY9su$pn7% zdm0;E6f(tyJ&1wef(8)FFK`>Dq-GC`YlnQZr{HW7&0qXgYRHs)I3vNth#yr)3_^<_ zi?lXr_1V;Z`i+nQUYsnDpg*?wr4L}gU$%W5@qQ~mX6U)N*|8=1YW&!j2>!R)9^>j7gtug@%i=r2~l7GfS|VCRY{N3)H&BZLVz=OIapRTGw22hTC~OOyB%$ zA>?NT9|)X9%#|>CTRf6cxP%CMu5Y;bJoYxLpB>olT49_1L&uwuk8#^9!wnQz8j>bu z$W+nZ=w7)UZ{1CVp%VeS^2X%2`*1ri=r^5Z8FZ18l*TIh)ny~ahv@05ugj4UuYLrT zW7<80h&g=r3FRbQI<`fhfNy}jUj%ii!Dc#SCHaMs(Y`;=S+qSiJWa8ZCv-t)6Je(=t~ zOpUvz^3f%A`Qg}8s#m5Qk@ua1z9A5VSzEPgya_25%D4@Hr5KZ88Mk)Kd44O%Ww<9_ zEf1RFK5a&X@gR+;+F^HQGwEyqwg@<@1q1G_obphAdE4VBfeLZWDPEpl3Nz1Mho7I_ zQeKi#Spn*uBjsOkkn*xKIh;@S68Fk7rl+fQuv^7klPlbD+pj|qkxDGes5E^rsylf! z@>4H*yJ$#F_-_IS(H|mF^3!_uBST>7#ZU@EN)PB!xN^{nYcICJ!>})I*l}kz55pwX zlWipw>|%9SUa@!OgiAwL{YF#xeZSMSTmAT3?eCcU*6`ppH7!ss*tbcm-!Guxh zWZJ35!J_(iQ=qJ>FgdoyHj>1k5&rO~scbE7BiTjh@2BL)aB43elX}5nT8nH9gHVZp zBeN<-r|+<^?=YlawQw~a!1osX_1P%M{fNeTkSE&Oar0jm122Qb6Fm zZ{bX+pl4P~kO;j^dWayKg@XYK{tAbDn-J)A*YuSE>h<|MSg$5hSXJ&6gsutLReZR4 z^FMRWYJ24bFyAG+D{DuXs!EZ zLU>Bx4Y&ObD{_eZK=y)#o>`J&B6T}VZz{2Hi(ka};@UmEgTg{lP?~M}g>0ewTYvZj%KZ4UP2msP| zPQ&VzMR6L2W~x%-;d?fce)t)-|2*3a%^*h~xy$MyOakx1umeI0Y8+sM>{wORHql6! zWFGJDllN{FebAvR^MjWFwaqYUYzk+Gl~hKFwUl3kYYbEr7Ro50?rQE$oDg(NC?1Af z-pIDw233fU#Y+?%O;f#9r{pVk0#YGfiEtl!Twhgl9gJ1R(%82vTTt99t-1Z>xNP*( zVHnN}Y)c^9V;z%IPuPAM^lV;gB<#bzF_816qf2)Hv6b^FsV% ze%-6wVT*>yiTdn6NJAz+O0*&YG~&D!{H`VHWQ!inWjs?Jeh>yBEpO{b-R4;260jFr-ER?sq7vMUF>|qeQ|PlD zjaoIywtIN$eGC{L*NfgV}cVgVUm+F>FHu$7-z|5-=b zcCG+G#+gk9@zl+6XROE!v}iof!If>t$=gJC)huqdN+l66(}RzMh%2s3Hdczf#zTbG z>jy=ENV#p7Y4zD7y+-8k5tq|N5*)oDzNxsuORu6iMse1B;UQtl$&!4+U!@K8F6-1pFNt*84kvXJw%UpO;7*fS6S z7SR0YvT4GXO#u^;QR#Kt340fBo^P@{o9Xm244Rh*ZgyO{?C>=kTO8Ke`Bb%Qvv9wqYamO5Q??U2%Z(ER;z8rzF-5`D?}K)Kd~cn@ zk2vtbP^bFGmInSH@6-MvvLDe8Eu;l!g?*TBXIub~#_i+nILQE`nMqTN$5oRTH_-7i z(;gtEP!+P}zM#PYbLo?x|MnDXk54gCgSf-zDnzm)r3dLf=q0E0M@6K{ED5!G(>m&Z z{Xb&rIc0vHSOPVZ->)c#4kiLC-Va>))4Zlc^8yjYKvi)Q>_c(>0$Itggr*QuHNwwr z*LxhxizMiY@&vPHm-)F*nj28+fH{vKMJ4PlvHIgL%pBJmIqSyV%^db;AYuQhO3J7t zn_t20r$ZI2#MG}+B261-RiQZFyxTzN8I#rA1Q1cbGoblR6Uw~K|JG< zr`QUxOq*-B{n)Od!JbcVB7tEM$E12H7ft|=={izJ*qTVd#fxRC2g}a_j%9vczy>nh z{mNSpKg>5mYVw|=Xq-|!CR?W{mibLL_eoo`JAAc*-SAad7hBUG*BBnD-knMQfeDxz zvjBJhSogX^e#lis%CA;yy6tCY$OQ<&HBYOGV=7YbuEmSWJ#=yMVa7wAd+kw{{er0% z@m(T#^cO#0SLVf>k-WZXh={J%rO}(H>z)zKs3)0fBLaMe2jWB3b6q+mdic+oQu4pb zReM-e+kd#q?OmoMQRQP_NSZsUZ_?(H+2R0BB2xWDhT?l%b-}6T^+2dq^@(8hJOO9w z%n6}T?5<4m!2GYwVwP`i+4DU?&vbvP@A7?&s_c>@su7dS9b;f%oz_d%GLB95VgG8~ zIGbOrAHC+6y{;XqtEOxz_V;pgp9Dc*<;msu;}y_?*Z1fGeQxyTXWbz&eOLiHhQ2-toQ=)~ha3Z^C^ z?JWfj?k^k_PdEfl%&}L$4NssUA1=y#jliNC_Q2LnOwrw`*N=_@Am^Cx0R|Q0DxZ_R z=h2|SGrzyDMfxmSe;)o(U6eRjrb7&vNqgtYzkS{zXo(|PdgMSoMfi?qX@RQVuC@=x zO;<@OTaHxs)G1(`NIGnAbKal3O~5hY48X^!3-?i}@+9EthWB@{$&SgV^2;w!j8yI~ zZTsFI;|WML-P_Rf9>e3O1D$7AUTwFide47`}07N^yF5@NAA1pyg=F7i5L;4dn>pbcOR{JFLh?BBKquN z?3SNLqY;Y2e2I#Cwm;ii2qgJjhl~zV!;uwF;gOh>mvLcab(LijCKW_=Ov733;YtLP zFmxW7XykiGb!CR4F;~V~(0_FC5Z6CG4Wb;!2tJ7#H{UjRO3C620sd82XjSP=Re8FN zP>GEve?7tK+8l2~oo!z11h5!;Oz|k|$w*KD`tOT6h@!kQsUCi(fX*aA5DQZ$#9-rNNW)!Glh46=lE& ze$ABe+Kscu+0II+O4-~g$c=EVmf*Eo#{jS!NYD6{<7-P2uOYUiZkGAtfhV;Ka@HuM zcX_NN(7-G#fA0`J(s!&l@|7WdBI9LH=kTLsbK~2U?Imk=8)HJL615AN9l^aq`GUF7 zktgN(3o6wKq;mdsYBDcK%!@jzWa4B^a0U=B#<%b zT=G)cGY&-o`Yx2ieX~7QINxy>8Vv&q|osiR+`Q4v1Y+TcG4 z6;dRCfq5PY-2hnO7E*Wj$DIOm_n4{KUzlk}NmBsHEJmlIbeB2n4i&AknkX`JJ65jx z1LwArq2Ha|$2<>CrPqjo#QI=lhp*>V$mbLah`pL)OpD9+z6w1wVv04jnzm}x-Na)1 zyzsK8I`FxUlIH5_!K!m@y2RQGAY{Og_6;Y_YedW|V=gL?D;07mC-aQ$0kxCQTXc@z zZGDk1{e&5p9C-LcX=_Xld%3B&a`ZAO^9vme2dxwfMeQaF%BJHWjB zMls+t_+7gg`&0*9vcMiOu^8qlIbel@{=9K9vd%7rD{9oe*QG(;*jo7hj^a3K+%pq$ zf6p3x-L~c9{tC#!ulE8sfgvuC-!n_H^u@1gIb$>OtKq_2b(#7=)RCndiflJbo598-V3$3VFC6DM(Y*^w@D%scI! zzz~8^3k9YbGrW} z2GrS3Z=(S2u987~@2-HGlEU(Xix5_gev|f8&O*x(A@ACoKz{MVN{JzsQ_hrY2Z{2( zPsnMCpiA-oNB4;t5CQvXs+mU153F9$N6#Lqe9CzAJxEYb44ARlXbvB`mP3b}Dn==T z8x8t{MUNWuT^GLO$LvNQ$(vsVpw%WfN*{zSL+TFWUZhG1UJSAVv+L03ON-?-g;_E` z@T?&f@b>Omh*rWHzS7pF;-5tc;Lf0VY$c^cG4GS;;%FwSL7mb=1}U4VWiSKZ<|Y?B z5GJM$Fn$v)>Zu@JxhFfKlv`zeX!DiqB(`UI5no4>0$zmvZn=p}5Z9Do?KGkyxOhLjCA}z?HU5+# z=KW~Kp2r>Btd#M4N$%mYyv55k{*fv>Yfw#tq`<-u*zl*Xi#wy8PTe+Y&RpnWB0N z$XY>adZMlffI#8hZ zs7qS@!5RdU5CNq|MKT_&=~nu-dDGW{gGq!>wh?)&>LaKR79qkd^Yol|w#+mA%x1qg zm_8jf1U6BX16K%&ev3UVY;%f1LVv)jS^3%YE#QYozD8eMX{TxJKLV@A_(C(~FaN9+ zPJK-J{)Hs96K1ON>gWJ3V-ejJt<8m#2CiLx{TiRfGvM#Hv%NQZ3S9r{@(}ZQQHhWn z#iQb{n73}7*$ZRToeg!1p}d6Y#TFaJff0Y5r*8_8)7V}icR1Jrmt}oD${dfqd`n~L zTu8r;{#bwAlAoK0xy@^%Tly2IJAHGK^GTCGdNSug@W>mF$xp>0-NgiveT+WWrb$nE z|H)awW-{woy|s;O-=ARrNQvXX0RJUvjBorcjc~)~* zf(p=vIf0U=K0Ayyny_V>n{Pl@1^go(5IV zoHFmwGOuVx-Iu8SHA~IcaFbJZ>{!k4Dt=dmR{qAsig2fziJT!%`ybg9p;spJ*F~5Q zpB;R~eR_4w>N2cLQVTRZX<~EbD_W2tzkP}bTs}rOGW%gW&Bk~#h!pq(?SdH~;_R+1 z?_gI-Roq9H5~*J^W-P!{wDHVO76xEx>sHm?R&P$Z_yZQKGH={1bBYKq08%OMV6;pd zEe9hKHGXD;_ln8f%L>@2ET)oA_?rj2BYLE=8??6^*WXlA{JAlxVzxbbZ@K649gk34 zz041|w8t&n`aPM6f8@49Wh^J-pP=h8))^*}Fet1Wxz=oo**@M{^LmC7{)dXBQuFJk z#aC@^@HX|SXXb_`aA>>HyL5iL&y5_wZ^2EE2c0t2u%4V7mZC6L+r^S840w-UPh6*r_2Az^?R%F`Cmmy z=_>@fFTJYkhDCYr$e!zX0|3O=r>jeRQw6w_gGsVn_qOu6Nt}_l-CNA(1L@7ronQcK ziok!Hn{CC5&J@(V`C?-{UrmcNs%Ewu9By`5U?vHirNcWhCZj4H zajPObgbP+9mE=foAQkJCNLoAzH36L?2zIPm*hBoM#h2p2kEC0~!Qo6Gg1%QRd$3*t zu;!sfR=gmOISaX1;R~~8eE692dY2hKxVqdHo?VR>Wh?6swIj$P(aE;iTZvpVsF8S~ zu9w}wc|H$$y;M?@lBUtILs-*jx@7|8RmYbR*^_g26ZCs2CI?Ic{1H&1gcp+vym(oU zCR&&9thZ`^R&l~9@<1&@<`gt=144zDJTUL-k6!^)xkXA|{gi~uEx7tbKD<7~=Vk0h z2x<~kTYz2$K=)q(05-JKx{h!TDiS~>mao_cZngWm^jX%TlA93&r@V^6kp}2(KPm z{L}k_cDv(4cFEA5`9&%9QCyI5t7@!tb{z$6M$=RatU$Y8Sh z7?!zcS<+03P}(t>O1sRlQ!d^MS}Z@MnB6(X9&0&7e;bg?{+HLB?`xQ#u7J|koM3%? zkOXWVahC~!U2_daZKMeXBInJ3ne#Saz(bwNH~{BqRGjuhxl$9bcg2?!ZPh_5C7E;t&8mZf zjKVIHFH1l7Comm>1`D<(R)w<%E<^}3(L_Pgv12JN2ymJg6vdaw`7!_ek?zxT()v53 z66un^`SUH^sbH{ShWpiLtx+~`Bi`U$PPE{Nn0>qLZUbIjCUe`TGNU+>izFDOcs0Mn zu@%S)rv3xz3Ren&gp zATTsFQ#J=nH+p_ zlf}7#=s|qBUSKVCzDt)*g?Ft$BVx5-XmMNFHn=QxT9X2U6H3Q7-zh`GefL|nA;H#Y zM0CU0JrE9^5vTD3dF-4-)3Q6Yv?@S+{oe9^c9@+wVUJq!w|lDZG&;mVl8wiXRcpcj zWLEmA3?g)^I6TrI?lfnI3wpE$q*E%_D98)Ki5EQKxFJ}-LsH%8%v+6)Mq(U8mL07= z_>a32-_~>BgUNw{MU}&^N5p-1TkZfzY%-zqRtX`%4Df+7Y{U(E@UHYfiCqE#2MsD^ z@7=x5qSW|;xP@fO@iz-IzzAnGnY z#3LJzmHjfm;X+~#qN(=haLJXv@USykNgSG!PK z_51Ba_hL(ebcl05rT7DEyDuXnWq`>J+%$PW53Js%)vFce)N?*4-LP<&CYxUXC%?#f zn#84LzIYRFQ` + + + + + + + + + + + + diff --git a/config.go b/config.go new file mode 100644 index 0000000..124a0c1 --- /dev/null +++ b/config.go @@ -0,0 +1,20 @@ +package camfish + +import "iter" + +// Config represents key/value data. +type Config interface { + Get(key string) string + GetAll(key string) iter.Seq2[int, string] +} + +// MutableConfig is like Config, but can be changed. These methods should be +// assumed unsafe for use by multiple concurrent goroutines. A MutableConfig +// must not be retained nor shared. These methods must not be called while +// iterating over All or GetAll. +type MutableConfig interface { + Config + Add(key, value string) + Del(key string) + Set(key, value string) +} diff --git a/cron.go b/cron.go new file mode 100644 index 0000000..d59f9fe --- /dev/null +++ b/cron.go @@ -0,0 +1,48 @@ +package camfish + +import "time" +import "context" + +// cron is a built-in actor present in every environment that triggers routine +// tasks on time intervals. +type cron struct { + trimFunc func() bool + timing struct { + trimInterval time.Duration + } +} + +var _ Actor = new(cron) +var _ Runnable = new(cron) +var _ Configurable = new(cron) + +func (this *cron) Type () string { + return "cron" +} + +func (this *cron) Configure (config Config) error { + if str := config.Get("trim-interval"); str != "" { + value, err := time.ParseDuration(str) + if err != nil { + return NewConfigError(config, "trim-interval", 0, err) + } + this.timing.trimInterval = value + } + return nil +} + +func (this *cron) Run (ctx context.Context) error { + trimTicker := time.NewTicker(defaul( + this.timing.trimInterval, + defaultTrimInterval)) + defer trimTicker.Stop() + for { + select { + case <- trimTicker.C: + if this.trimFunc != nil { + this.trimFunc() + } + case <- ctx.Done(): return ctx.Err() + } + } +} diff --git a/doc.go b/doc.go new file mode 100644 index 0000000..656e296 --- /dev/null +++ b/doc.go @@ -0,0 +1,7 @@ +// Package camfish provides an actor-model oriented framework for daemons. +// +// The key words "MUST", "MUST NOT", "REQUIRED", "SHALL", "SHALL +// NOT", "SHOULD", "SHOULD NOT", "RECOMMENDED", "MAY", and +// "OPTIONAL" in this documentation are to be interpreted as described in +// RFC 2119. +package camfish diff --git a/environment.go b/environment.go new file mode 100644 index 0000000..c52d9b5 --- /dev/null +++ b/environment.go @@ -0,0 +1,389 @@ +package camfish + +import "os" +import "fmt" +import "log" +import "iter" +import "sync" +import "time" +import "errors" +import "context" +import "sync/atomic" +import "golang.org/x/sync/errgroup" +import "git.tebibyte.media/sashakoshka/go-util/sync" +import "git.tebibyte.media/sashakoshka/go-service/daemon" + +const defaultInitTimeout = 8 * time.Minute +const defaultRestartThreshold = 16 * time.Second +const defaultRestartInitialInterval = 8 * time.Second +const defaultRestartInitialIncrement = 8 * time.Second +const defaultRestartInitialMaximum = 1 * time.Hour +const defaultResetTimeout = 8 * time.Minute +const defaultTrimInterval = 1 * time.Minute +const defaultTrimTimeout = 1 * time.Minute +const defaultShutdownTimeout = 8 * time.Minute + +// environment is an object which handles requests by package-level functions. +// It is only a separate object for testing purposes. +type environment struct { + name string + description string + actors usync.RWLocker[*actorSets] + ctx context.Context + done context.CancelCauseFunc + group sync.WaitGroup + conf MutableConfig + + // flags stores information from built-in flags. + flags struct { + pidFile string + user string + logDirectory string + configFile string + verbose bool + } + + // running stores whether the environment is currently running. + // updated by environment.phase7Running. + // ATOMICALLY VOLATILE + running atomic.Bool + + // timing stores configurable timing information. + // updated by environment.phase5ConfigurationApplication. + // ATOMICALLY VOLATILE + timing struct { + initTimeout atomicDuration + restartThreshold atomicDuration + restartInitialInterval atomicDuration + restartIntervalIncrement atomicDuration + restartIntervalMaximum atomicDuration + resetTimeout atomicDuration + trimTimeout atomicDuration + shutdownTimeout atomicDuration + } +} + +// Run implements the package-level function [Run]. +func (this *environment) Run(name, description string, actors ...Actor) { + if len(actors) == 0 { + log.Println("XXX cannot start with no actors") + os.Exit(2) + } + + this.ctx, this.done = context.WithCancelCause(context.Background()) + defer this.done(nil) + daemon.OnSigint(func() { this.done(ErrProcessKilled) }) + defer log.SetOutput(os.Stderr) + + this.name = name + this.description = description + this.actors = usync.NewRWLocker(&actorSets { }) + this.addToSets(actors...) + this.addToSets(&cron { + trimFunc: this.phase70_5Trimming, + }) + + if !this.phase10FlagParsing() { os.Exit(2) } + if !this.phase13PidFileCreation() { os.Exit(2) } + if !this.phase17PrivilegeDropping() { os.Exit(2) } + if !this.phase20LogSwitching() { os.Exit(2) } + if !this.phase30ConfigurationParsing() { os.Exit(2) } + if !this.phase40ConfigurationProcessing() { os.Exit(2) } + if !this.phase50ConfigurationApplication() { os.Exit(2) } + if !this.phase60Initialization() { os.Exit(2) } + if !this.phase70Running() { os.Exit(1) } + if !this.phase80Shutdown() { os.Exit(1) } +} + +// Done implements the package-level function [Done]. +func (this *environment) Done(cause error) { + this.done(cause) +} + +// Add implements the package-level function [Add]. +func (this *environment) Add(ctx context.Context, actors ...Actor) error { + this.addToSets(actors...) + initializable := make([]Initializable, 0, len(actors)) + for _, actor := range actors { + if actor, ok := actor.(Initializable); ok { + initializable = append(initializable, actor) + } + } + err := this.initializeActors(ctx, initializable...) + if err != nil { return err } + for _, actor := range actors { + if actor, ok := actor.(Configurable); ok { + err := actor.Configure(this.conf) + if err != nil { + return fmt.Errorf ( + "could not apply configuration to %s: %w", + actor.(Actor).Type(), err) + } + } + } + for _, actor := range actors { + if actor, ok := actor.(Runnable); ok { + this.start(actor) + } + } + return ctx.Err() +} + +// Del implements the package-level function [Del]. +func (this *environment) Del(ctx context.Context, actors ...Actor) error { + channels := []<- chan struct { } { } + for _, actor := range actors { + info := this.info(actor) + if info.done != nil { + channels = append(channels, info.stopped) + } + } + for _, channel := range channels { + select { + case <- channel: + case <- ctx.Done(): + return ctx.Err() + } + } + return ctx.Err() +} + +// Find implements the package-level function [Find]. +func (this *environment) Find(typ string) Actor { + actors, done := this.actors.RBorrow() + defer done() + return actors.all.find(typ) +} + +// FindAll implements the package-level function [FindAll]. +func (this *environment) FindAll(typ string) iter.Seq[Actor] { + actors, done := this.actors.RBorrow() + defer done() + slice := actors.all.findAll(typ) + return func (yield func(Actor) bool) { + for _, actor := range slice { + if !yield(actor) { return } + } + } +} + +// All implements the package-level function [All]. +func (this *environment) All() iter.Seq[Actor] { + actors, done := this.actors.RBorrow() + defer done() + slice := actors.all.all() + return func (yield func(Actor) bool) { + for _, actor := range slice { + if !yield(actor) { return } + } + } +} + +// Verb implements the package-level function [Verb]. +func (this *environment) Verb() bool { + return this.flags.verbose +} + +// addToSets adds the actors to the actorSets in a thread-safe manner. +func (this *environment) addToSets(actors ...Actor) { + thisActors, done := this.actors.Borrow() + defer done() + for _, actor := range actors { + thisActors.add(this.ctx, actor) + } +} + +// delFromSets deletes the actors from the actorSets in a thread-safe manner. +func (this *environment) delFromSets(actors ...Actor) { + thisActors, done := this.actors.Borrow() + defer done() + for _, actor := range actors { + thisActors.del(actor) + } +} + +// info retrieves information about an actor from the actorSets in thread-safe +// manner. +func (this *environment) info(actor Actor) actorInfo { + thisActors, done := this.actors.RBorrow() + defer done() + return thisActors.info(actor) +} + +// start increments the wait group by one and starts the given actor in the +// background, restarting it if it fails. this function will exit immediately. +// see the documentation for run for details. +func (this *environment) start(actor Runnable) { + this.group.Add(1) + go this.run(actor) +} + +// run runs the given actor, restarting it if it fails. This function will exit +// when the actor's context is canceled. The actor will be removed from the +// environment once this function exits, and the environment's wait group +// counter will be decremented. note that this function will never increment the +// wait group counter, so start should usually be used instead. +func (this *environment) run(actor Runnable) { + // clean up when done + defer this.group.Done() + + // logging + acto, ok := actor.(Actor) + if !ok { return } + typ := acto.Type() + if this.Verb() { log.Printf("(i) [%s] running", typ) } + var stopErr error + var exited bool + defer func() { + if stopErr == nil { + if exited { + if this.Verb() { log.Printf("(i) [%s] exited", typ) } + } else { + if this.Verb() { log.Printf("(i) [%s] stopped", typ) } + } + } else { + log.Printf("!!! [%s] stopped with error: %v", typ, stopErr) + } + }() + + // contains context information + info := this.info(acto) + ctx := info.ctx + defer close(info.stopped) + + // timing + restartThreshold := defaul(this.timing.restartThreshold.Load(), defaultRestartThreshold) + restartInitialInterval := defaul(this.timing.restartInitialInterval.Load(), defaultRestartInitialInterval) + restartIntervalIncrement := defaul(this.timing.restartIntervalIncrement.Load(), defaultRestartInitialIncrement) + restartIntervalMaximum := defaul(this.timing.restartIntervalMaximum.Load(), defaultRestartInitialMaximum) + resetTimeout := defaul(this.timing.resetTimeout.Load(), defaultResetTimeout) + restartInterval := restartInitialInterval + + // main loop + for { + // run actor + lastStart := time.Now() + err := panicWrap(ctx, actor.Run) + + // detect context cancellation + if ctxErr := ctx.Err(); ctxErr != nil { + if err != nil && !errors.Is(err, context.Canceled) { + stopErr = err + } + return + } + + if err == nil { + // normal exit + exited = true + return + } else { + // failure + log.Printf("XXX [%s] failed", typ) + } + + // restart logic + if time.Since(lastStart) < restartThreshold { + log.Printf("!!! [%s] failed too soon, restarting in %v", typ, restartInterval) + timer := time.NewTimer(restartInterval) + select { + case <- timer.C: + // ok + case <- ctx.Done(): + if this.Verb() { log.Printf("(i) [%s] canceled while dormant", typ) } + return + } + restartInterval += restartIntervalIncrement + if restartInterval > restartIntervalMaximum { + restartInterval = restartIntervalMaximum + } + } else { + restartInterval = restartInitialInterval + } + + // reset if needed + if actor, ok := actor.(Resettable); ok { + if this.Verb() { log.Printf("... [%s] resetting", typ) } + func() { + ctx, done := context.WithTimeout(ctx, resetTimeout) + defer done() + err := actor.Reset(ctx) + if err != nil { + log.Printf("XXX [%s] failed to reset", typ) + } + }() + if this.Verb() { log.Printf(".// [%s] reset", typ) } + } + + log.Printf("(i) [%s] restarting", typ) + } +} + +// initializeActors spawns initialization functions for the given actors and +// blocks until all of them have exited. +func (this *environment) initializeActors(ctx context.Context, actors ...Initializable) error { + ctx, done := context.WithTimeout( + ctx, defaul(this.timing.initTimeout.Load(), defaultInitTimeout)) + defer done() + group, ctx := errgroup.WithContext(ctx) + for _, actor := range actors { + actor := actor + acto := actor.(Actor) + typ := acto.Type() + group.Go(func() error { + err := actor.Init(ctx) + if err != nil { return fmt.Errorf("%s: %w", typ, err) } + return nil + }) + } + return group.Wait() +} + +// trimActors spawns actor trimming functions, which can be waited upon via the +// returned errgroup. +func (this *environment) trimActors(ctx context.Context, actors ...Trimmable) error { + ctx, done := context.WithTimeout( + ctx, defaul(this.timing.trimTimeout.Load(), defaultTrimTimeout)) + defer done() + group, ctx := errgroup.WithContext(ctx) + for _, actor := range actors { + actor := actor + acto := actor.(Actor) + typ := acto.Type() + group.Go(func() error { + err := actor.Trim(ctx) + if err != nil { return fmt.Errorf("%s: %w", typ, err) } + return nil + }) + } + return group.Wait() +} + +// applyConfig reads and applies environment-specific values from this.conf. +func (this *environment) applyConfig() error { + parseDuration := func(key string, destination interface { Store(time.Duration) }) error { + if str := this.conf.Get(key); str != "" { + value, err := time.ParseDuration(str) + if err != nil { return NewConfigError(this.conf, key, 0, err) } + this.timing.initTimeout.Store(value) + } + return nil + } + err := parseDuration("init-timeout", &this.timing.initTimeout) + if err != nil { return err } + err = parseDuration("restart-threshold", &this.timing.restartThreshold) + if err != nil { return err } + err = parseDuration("restart-initial-interval", &this.timing.restartInitialInterval) + if err != nil { return err } + err = parseDuration("restart-interval-increment", &this.timing.restartIntervalIncrement) + if err != nil { return err } + err = parseDuration("restart-interval-maximum", &this.timing.restartIntervalMaximum) + if err != nil { return err } + err = parseDuration("reset-timeout", &this.timing.resetTimeout) + if err != nil { return err } + err = parseDuration("trim-timeout", &this.timing.trimTimeout) + if err != nil { return err } + err = parseDuration("shutdown-timeout", &this.timing.shutdownTimeout) + if err != nil { return err } + return nil +} diff --git a/environment_test.go b/environment_test.go new file mode 100644 index 0000000..9f52295 --- /dev/null +++ b/environment_test.go @@ -0,0 +1,7 @@ +package camfish + +import "testing" + +func TestEnvironment(test *testing.T) { + // TODO +} diff --git a/error.go b/error.go new file mode 100644 index 0000000..0ae0f5d --- /dev/null +++ b/error.go @@ -0,0 +1,103 @@ +package camfish + +import "fmt" +import "strings" + +// Error enumerates common errors in this package. +type Error string; const ( + ErrNotFound Error = "not found" + ErrNotRunning Error = "not running" + ErrProcessKilled Error = "process killed" + ErrExtraneousValues Error = "extraneous value(s)" + ErrSectionHeadingMalformed Error = "section heading malformed" + ErrPairMalformed Error = "key/value pair malformed" + ErrKeyEmpty Error = "key empty" +) + +// Error implements the error interface. +func (err Error) Error() string { + return string(err) +} + +// ConfigError pairs an error with a location in a config file. +type ConfigError struct { + File string + Key string + Line int + Column int + Err error +} + +// Error implements the error interface. +func (err ConfigError) Error() string { + out := strings.Builder { } + if err.File != "" { + out.WriteString(err.File) + } + if err.Key != "" { + fmt.Fprintf(&out, "[%s]", err.Key) + } + switch { + case err.Line != 0 && err.Column != 0: + if out.Len() > 0 { out.WriteRune(':') } + fmt.Fprintf(&out, "%d:%d", err.Line, err.Column) + case err.Line != 0: + if out.Len() > 0 { out.WriteRune(':') } + fmt.Fprintf(&out, "%d", err.Line) + } + if out.Len() > 0 { out.WriteString(": ") } + if err.Err == nil { + out.WriteString("configuration error") + } else { + fmt.Fprint(&out, err.Err) + } + return out.String() +} + +// Unwrap returns err.Err. +func (err ConfigError) Unwrap() error { + return err.Err +} + +// NewConfigError creates a new config error with the given key and config. It +// will attempt to fill in as many details as possible with the information +// given. If the config has a method with an identical name and signature to +// this function, it will be called and its value will be returned. Otherwise, +// an error is returned containing only the provided information. +func NewConfigError(config Config, key string, index int, wrapped error) ConfigError { + type configErrorFactory interface { + NewConfigError(string, int, error) ConfigError + } + if config, ok := config.(configErrorFactory); ok { + return config.NewConfigError(key, index, wrapped) + } + return ConfigError { + Key: key, + Err: wrapped, + } +} + +// FlagError pairs a flag with a long flag name. +type FlagError struct { + Long string + Err error +} + +// Error implements the error interface. +func (err FlagError) Error() string { + output := err.Long + if output != "" { + output = fmt.Sprintf("--%s: ", output) + } + if err.Err == nil { + output += "argument parsing error" + } else { + output += err.Unwrap().Error() + } + return output +} + +// Unwrap returns err.Err +func (err FlagError) Unwrap() error { + return err.Err +} diff --git a/error_test.go b/error_test.go new file mode 100644 index 0000000..a7c0ea7 --- /dev/null +++ b/error_test.go @@ -0,0 +1,74 @@ +package camfish + +import "testing" + +func TestConfigError (test *testing.T) { + err := ConfigError { + File: "example.conf", + Key: "some-key", + Line: 6, + Column: 20, + Err: ErrNotFound, + } + str := err.Error() + test.Log(str) + if str != "example.conf[some-key]:6:20: not found" { + test.Fatal("not equal") + } + unwrapped := err.Unwrap() + test.Log(unwrapped) + if unwrapped!= ErrNotFound { + test.Fatal("not equal") + } + + err = ConfigError { + File: "example.conf", + Line: 6, + Column: 20, + Err: ErrNotFound, + } + str = err.Error() + test.Log(str) + if str != "example.conf:6:20: not found" { + test.Fatal("not equal") + } + + err = ConfigError { + Key: "some-key", + Column: 20, + Err: ErrNotFound, + } + str = err.Error() + test.Log(str) + if str != "[some-key]: not found" { + test.Fatal("not equal") + } + + err = ConfigError { + File: "example.conf", + Key: "some-key", + Line: 20, + Err: ErrNotFound, + } + str = err.Error() + test.Log(str) + if str != "example.conf[some-key]:20: not found" { + test.Fatal("not equal") + } + + err = ConfigError { + Err: ErrNotFound, + } + str = err.Error() + test.Log(str) + if str != "not found" { + test.Fatal("not equal") + } + + err = ConfigError { } + str = err.Error() + test.Log(str) + if str != "configuration error" { + test.Fatal("not equal") + } +} diff --git a/examples/pipeline/main.go b/examples/pipeline/main.go new file mode 100644 index 0000000..6120bbc --- /dev/null +++ b/examples/pipeline/main.go @@ -0,0 +1,93 @@ +// Example pipeline demonstrates a three-stage pipeline with an actor for each +// stage. +package main + +import "log" +import "time" +import "context" +import "math/rand" +import "git.tebibyte.media/sashakoshka/camfish" + +func main() { + camfish.Run("pipeline", + "Example pipeline demonstrates a three-stage pipeline with " + + "an actor for each stage", + new(generator), + new(reverser), + new(printer)) +} + +// generator produces strings randomly. +type generator struct { } +var _ camfish.Runnable = new(generator) +func (this *generator) Type() string { return "generator" } + +func (this *generator) Run(ctx context.Context) error { + reverser := camfish.Find("reverser").(*reverser) + timer := time.NewTimer(0) + for { + select { + case <- timer.C: + timer.Reset(time.Duration(float64(time.Second) * (rand.Float64() + 0.2))) + reverser.Send([]string { + "red", "yellow", "green", "blue", + }[rand.Int() % 4]) + case <- ctx.Done(): return ctx.Err() + } + } +} + +// reverser reverses strings. +type reverser struct { input chan string } +var _ camfish.Runnable = new(reverser) +var _ camfish.Initializable = new(reverser) +func (this *reverser) Type() string { return "reverser" } + +func (this *reverser) Init(ctx context.Context) error { + this.input = make(chan string) + return ctx.Err() +} + +func (this *reverser) Run(ctx context.Context) error { + printer := camfish.Find("printer").(*printer) + for { + select { + case str := <- this.input: + runes := []rune(str) + for i, j := 0, len(runes) - 1; i < j; i, j = i + 1, j - 1 { + runes[i], runes[j] = runes[j], runes[i] + } + printer.Print(string(runes)) + case <- ctx.Done(): return ctx.Err() + } + } +} + +func (this *reverser) Send(str string) { + this.input <- str +} + +// printer prints strings. +type printer struct { input chan string } +var _ camfish.Runnable = new(printer) +var _ camfish.Initializable = new(printer) +func (this *printer) Type() string { return "printer" } + +func (this *printer) Init(ctx context.Context) error { + this.input = make(chan string) + return ctx.Err() +} + +func (this *printer) Run(ctx context.Context) error { + for { + select { + case str := <- this.input: + log.Println("(i) [printer]", str) + case <- ctx.Done(): return ctx.Err() + } + } +} + +func (this *printer) Print(str string) { + this.input <- str +} diff --git a/flag.go b/flag.go new file mode 100644 index 0000000..ca57312 --- /dev/null +++ b/flag.go @@ -0,0 +1,142 @@ +package camfish + +import "fmt" +import "iter" +import "git.tebibyte.media/sashakoshka/go-cli" + +// FlagSet holds command-line flags to be parsed. +type FlagSet interface { + // Flag creates a new flag. If there are naming collisions between + // flags, flags specified later take precedence over those specified + // earlier. + // + // A short and long form of the flag may be specified. The short form of + // a flag is invoked with a single dash and one or more runes, each rune + // invoking its corresponding flag exactly once. The long form is + // invoked with two dashes followed by the text of the long form, and it + // should be in lower kebab case. If short is zero, the flag will not + // have a short form. All flags must have a long form. + Flag(short rune, long string, help string, validate func(string) error) Flag +} + +// Flag represents the result of parsing a command-line flag. It is filled in +// automatically during argument parsing, and the values within it can be +// accessed afterwards. +type Flag interface { + // First returns the value where the flag was first found. If the flag + // was never specified, false is returned for found. If the flag either + // was never specified or does not take input, the string "true" will be + // returned. If at least one value is given but this function is never + // called to recieve it, the environment will exit with an error. + First() (value string, found bool) + // All returns an iterator over all instances of this flag that were + // specified. If this flag takes no input, all returned values will be + // the string "true". If multiple instances of a flag are given but this + // function is never called to receive them, will the environment exit + // with an error. + All() iter.Seq2[int, string] +} + +// flagSet is an implementation of [FlagSet] that wraps [cli.Cli]. +type flagSet struct { + name string + description string + cli cli.Cli + short map[rune ] int + long map[string] int + flags []*flag +} + +func (this *flagSet) Flag(short rune, long string, help string, validate func(string) error) Flag { + if this.short == nil { this.short = make(map[rune ] int) } + if this.long == nil { this.long = make(map[string] int) } + fla := &flag { + short: short, + long: long, + help: help, + validate: validate, + } + if short != 0 { this.short[short] = len(this.flags) } + if long != "" { this.long[long] = len(this.flags) } + this.flags = append(this.flags, fla) + return fla +} + +// parse parses the arguments using the [flag]s contained within the set. +func (this *flagSet) parse(args []string) error { + cliFlags := make([]*cli.Flag, len(this.flags)) + for index, fla := range this.flags { + index := index + cliFlags[index] = &cli.Flag { + Short: fla.short, + Long: fla.long, + Help: fla.help, + Validate: fla.validate, + Found: func(_ *cli.Cli, value string) { + this.flags[index].values = append(this.flags[index].values, value) + }, + } + } + + this.cli.Description = this.description + this.cli.Flags = cliFlags + _, err := this.cli.Parse(args) + return err +} + +// fullyReceived returns an error if not all specified values were used. This +// behavior is currently unused. +func (this *flagSet) fullyReceived() error { + for _, fla := range this.flags { + if !fla.fullyReceived() { + return FlagError { + Long: fla.long, + Err: fmt.Errorf ( + "%w: %v", + ErrExtraneousValues, + fla.values[fla.received:]), + } + } + } + if len(this.cli.Args) > 0 { + return ErrExtraneousValues + } + return nil +} + +// usage prints usage/help information to [os.Stderr]. it only works properly +// after parse has been run. +func (this *flagSet) usage() { + this.cli.Usage() +} + +// flag is an implementation of [Flag] that +type flag struct { + short rune + long string + help string + validate func(string) error + values []string + received int +} + +func (this *flag) First() (value string, found bool) { + if len(this.values) < 1 { return "", false } + if this.received < 1 { this.received = 1 } + return this.values[0], true +} + +func (this *flag) All() iter.Seq2[int, string] { + return func(yield func(int, string) bool) { + for index, value := range this.values { + if this.received <= index { this.received = index + 1 } + if !yield(index, value) { return } + } + } +} + +// fullyReceived returns whether all values were used. If this flag does not +// take in a value, it just returns true. +func (this *flag) fullyReceived() bool { + return this.validate == nil || this.received >= len(this.values) +} diff --git a/flag_test.go b/flag_test.go new file mode 100644 index 0000000..283dc0e --- /dev/null +++ b/flag_test.go @@ -0,0 +1,161 @@ +package camfish + +import "errors" +import "testing" +import "git.tebibyte.media/sashakoshka/go-cli" + +func TestFlagSet(test *testing.T) { + set := flagSet { } + flag0 := set.Flag('a', "flag-0", "some help", nil) + flag1 := set.Flag('b', "flag-1", "some help", nil) + flag2 := set.Flag('c', "flag-2", "some help", nil) + flag6 := set.Flag('d', "flag-3", "some help", nil) + flag3 := set.Flag('d', "flag-3", "some help", nil) + flag4v := set.Flag('e', "flag-4v", "some help", cli.ValString) + flag5 := set.Flag('f', "flag-5", "some help", nil) + err := set.parse([]string { + "thing", + "-baf", + "-e", "value 4 a", + "-d", "-ff", + "--flag-4v", "value 4 b", + }) + if err != nil { test.Fatal(err) } + + if value, ok := flag0.First(); ok { + if value != "true" { test.Fatalf("not equal: %s", value) } + } else { test.Fatal("not found") } + for index, value := range flag0.All() { + if index != 0 { test.Fatalf("extra value: %s", value) } + if value != "true" { test.Fatalf("not equal: %s", value) } + } + + if value, ok := flag1.First(); ok { + if value != "true" { test.Fatalf("not equal: %s", value) } + } else { test.Fatal("not found") } + for index, value := range flag1.All() { + if index != 0 { test.Fatalf("extra value: %s", value) } + if value != "true" { test.Fatalf("not equal: %s", value) } + } + + if value, ok := flag2.First(); ok { + if value != "" { test.Fatalf("found: %s", value) } + } + for _, value := range flag2.All() { + test.Fatalf("extra value: %s", value) + } + + if value, ok := flag3.First(); ok { + if value != "true" { test.Fatalf("not equal: %s", value) } + } else { test.Fatal("not found") } + for index, value := range flag3.All() { + if index != 0 { test.Fatalf("extra value: %s", value) } + if value != "true" { test.Fatalf("not equal: %s", value) } + } + + if value, ok := flag4v.First(); ok { + if value != "value 4 a" { test.Fatalf("not equal: %s", value) } + } else { test.Fatal("not found") } + for index, value := range flag4v.All() { + switch index { + case 0: if value != "value 4 a" { test.Fatalf("not equal: %s", value) } + case 1: if value != "value 4 b" { test.Fatalf("not equal: %s", value) } + default: test.Fatalf("extra value: %s", value) + } + } + + if value, ok := flag5.First(); ok { + if value != "true" { test.Fatalf("not equal: %s", value) } + } else { test.Fatal("not found") } + for index, value := range flag5.All() { + if index > 3 { test.Fatalf("extra value: %s", value) } + if value != "true" { test.Fatalf("not equal: %s", value) } + } + + if value, ok := flag6.First(); ok { + if value != "" { test.Fatalf("found: %s", value) } + } + for _, value := range flag6.All() { + test.Fatalf("extra value: %s", value) + } + + err = set.fullyReceived() + if err != nil { test.Fatal(err) } +} + +func TestFlagSetErrExtraneousValuesA(test *testing.T) { + set := flagSet { } + flag0 := set.Flag('a', "flag-0", "some help", cli.ValString) + err := set.parse([]string { + "thing", + "--flag-0", "value 0", + "--flag-0", "value 1", + "--flag-0", "value 2", + }) + if err != nil { test.Fatal(err) } + + // access the first value + flag0.First() + + err = set.fullyReceived() + test.Log(err) + if !errors.Is(err, ErrExtraneousValues) { + test.Fatal("error is not ErrExtraneousValues") + } + var flagErr FlagError + if !errors.As(err, &flagErr) { + test.Fatal("error is not FlagError") + } + if flagErr.Long != "flag-0" { + test.Fatal("not equal") + } +} + +func TestFlagSetErrExtraneousValuesB(test *testing.T) { + set := flagSet { } + flag0 := set.Flag('a', "flag-0", "some help", cli.ValString) + err := set.parse([]string { + "thing", + "--flag-0", "value 0", + "--flag-0", "value 1", + "--flag-0", "value 2", + }) + if err != nil { test.Fatal(err) } + + // access the first two values + for index, _ := range flag0.All() { + if index >= 1 { break } + } + + err = set.fullyReceived() + test.Log(err) + if !errors.Is(err, ErrExtraneousValues) { + test.Fatal("error is not ErrExtraneousValues") + } + var flagErr FlagError + if !errors.As(err, &flagErr) { + test.Fatal("error is not FlagError") + } + if flagErr.Long != "flag-0" { + test.Fatal("not equal") + } +} + +func TestFlagSetErrExtraneousValuesC(test *testing.T) { + set := flagSet { } + flag0 := set.Flag('a', "flag-0", "some help", cli.ValString) + err := set.parse([]string { + "thing", + "--flag-0", "value 0", "value 1", "value 2", + }) + if err != nil { test.Fatal(err) } + + // access the value + flag0.First() + + err = set.fullyReceived() + test.Log(err) + if !errors.Is(err, ErrExtraneousValues) { + test.Fatal("error is not ErrExtraneousValues") + } +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..7d0e120 --- /dev/null +++ b/go.mod @@ -0,0 +1,10 @@ +module git.tebibyte.media/sashakoshka/camfish + +go 1.23.0 + +require ( + git.tebibyte.media/sashakoshka/go-cli v0.1.3 + git.tebibyte.media/sashakoshka/go-service v0.1.1 + git.tebibyte.media/sashakoshka/go-util v0.8.0 + golang.org/x/sync v0.10.0 +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..0bc9a43 --- /dev/null +++ b/go.sum @@ -0,0 +1,8 @@ +git.tebibyte.media/sashakoshka/go-cli v0.1.3 h1:tSkWjyx2JrGu6KotbXWSTKSYGGS1D4O3qwCrRoZuwbs= +git.tebibyte.media/sashakoshka/go-cli v0.1.3/go.mod h1:JFA3wSdRkXxa4iQJWHfe3DokiG7Dh2XUJBzPmuVlbuY= +git.tebibyte.media/sashakoshka/go-service v0.1.1 h1:WhDK532iB3hrVILih2+rJmRtCctXIoj2uEWMm8tU4+E= +git.tebibyte.media/sashakoshka/go-service v0.1.1/go.mod h1:qPtzuqB1psUWZrmy3XTU1dZHHhVNHHP2pSBkpzlTazk= +git.tebibyte.media/sashakoshka/go-util v0.8.0 h1:XFuZ8HQkrnibrV016rso00geCFPatKpX4jxkIVhZPaQ= +git.tebibyte.media/sashakoshka/go-util v0.8.0/go.mod h1:0Q1t+PePdx6tFYkRuJNcpM1Mru7wE6X+it1kwuOH+6Y= +golang.org/x/sync v0.10.0 h1:3NQrjDixjgGwUOCaF8w2+VYHv0Ve/vGYSbdkTa98gmQ= +golang.org/x/sync v0.10.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= diff --git a/ini.go b/ini.go new file mode 100644 index 0000000..958ffe9 --- /dev/null +++ b/ini.go @@ -0,0 +1,168 @@ +package camfish + +import "os" +import "io" +import "fmt" +import "iter" +import "slices" +import "strings" +import "strconv" +import "path/filepath" + +type iniConfig map[string] []iniValue + +type iniValue struct { + value string + file string + line int + column int +} + +// ParseINI parses a string containing INI configuration data. +func ParseINI(filename, input string) (MutableConfig, error) { + ini := make(iniConfig) + configErr := ConfigError { + File: filename, + } + section := "" + for index, line := range strings.Split(input, "\n") { + configErr.Line = index + 1 + configErr.Key = "" + line = strings.TrimSpace(line) + if line == "" { continue } + if strings.HasPrefix(line, "#") { continue } + if strings.HasPrefix(line, "[") { + // section heading + if !strings.HasSuffix(line, "]") { + configErr.Err = ErrSectionHeadingMalformed + return nil, configErr + } + section = strings.TrimSpace(strings.TrimSuffix(strings.TrimPrefix(line, "["), "]")) + if section == "" { + configErr.Err = ErrSectionHeadingMalformed + return nil, configErr + } + continue + } + + // split key/value + key, value, ok := strings.Cut(line, "=") + if !ok { + configErr.Err = ErrPairMalformed + return nil, configErr + } + + // key + key = strings.TrimSpace(key) + if key == "" { + configErr.Err = ErrKeyEmpty + return nil, configErr + } + configErr.Key = key + if section != "" { + key = fmt.Sprintf("%s.%s", section, key) + } + + // value + value = strings.TrimSpace(value) + if strings.HasPrefix(value, "\"") || strings.HasPrefix(value, "'") { + unquoted, err := strconv.Unquote(value) + if err != nil { + configErr.Column = strings.Index(line, "=") + 2 + configErr.Err = err + return nil, configErr + } + value = unquoted + } + ini.Add(key, value) + } + return ini, nil +} + +// DecodeINI decodes INI data from an io.Reader. The entire reader is consumed. +func DecodeINI(filename string, input io.Reader) (MutableConfig, error) { + buffer, err := io.ReadAll(input) + if err != nil { return nil, err } + return ParseINI(filename, string(buffer)) +} + +func (ini iniConfig) Add(key, value string) { + key = canonicalINIKey(key) + ini[key] = append(ini[key], iniValue { + value: value, + }) +} + +func (ini iniConfig) Del(key string) { + key = canonicalINIKey(key) + delete(ini, key) +} + +func (ini iniConfig) Get(key string) string { + key = canonicalINIKey(key) + slice, ok := ini[key] + if !ok { return "" } + if len(slice) == 0 { return "" } + return slice[0].value +} + +func (ini iniConfig) GetAll(key string) iter.Seq2[int, string] { + return func(yield func(int, string) bool) { + for index, value := range ini[key] { + if !yield(index, value.value) { return } + } + } +} + +func (ini iniConfig) Set(key, value string) { + key = canonicalINIKey(key) + valueInfo := iniValue { } + if prevValues := ini[key]; len(prevValues) > 0 { + valueInfo = prevValues[0] + } + valueInfo.value = value + ini[key] = []iniValue { valueInfo } +} + +func (ini iniConfig) NewConfigError(key string, index int, wrapped error) ConfigError { + if values, ok := ini[key]; ok { + if index > 0 && index < len(values) { + value := values[index] + return ConfigError { + File: value.file, + Key: key, + Line: value.line, + Column: value.column, + Err: wrapped, + } + } + } + return ConfigError { + Key: key, + Err: wrapped, + } +} + +func canonicalINIKey(key string) string { + return strings.ToLower(key) +} + +func mergeINI(inis ...iniConfig) iniConfig { + ini := make(iniConfig) + for index := len(inis) - 1; index >= 0; index -- { + for key, values := range inis[index] { + if _, exists := ini[key]; exists { continue } + ini[key] = slices.Clone(values) + } + } + return ini +} + +func configFiles(program string) ([]string, error) { + userConfig, err := os.UserConfigDir() + if err != nil { return nil, err } + return []string { + filepath.Join("/etc", program), + filepath.Join(userConfig, program), + }, nil +} diff --git a/ini_test.go b/ini_test.go new file mode 100644 index 0000000..070b33b --- /dev/null +++ b/ini_test.go @@ -0,0 +1,93 @@ +package camfish + +import "reflect" +import "testing" + +func TestParseINI_LF(test *testing.T) { + ini, err := ParseINI("input", +`thing= " Quoted string!!!!! " +Other-Thing = askdjlksajd + +number = 3849 +# comment + #also a comment +[section0] +value=1 + +[section1.a] +#dkjsaf +value=7 + +`) + if err != nil { + test.Fatal(err) + } + + test.Log("INI:") + test.Log(ini) + if ini.Get("thing") != "\tQuoted string!!!!! " { + test.Fatal("value is not correct") + } + if ini.Get("other-thing") != "askdjlksajd" { + test.Fatal("value is not correct") + } + if ini.Get("number") != "3849" { + test.Fatal("value is not correct") + } + if ini.Get("section0.value") != "1" { + test.Fatal("value is not correct") + } + if ini.Get("section1.a.value") != "7" { + test.Fatal("value is not correct") + } +} + +func TestParseINI_CRLF(test *testing.T) { + ini, err := ParseINI("input", "thing= \"\tQuoted string!!!!! \"\r\nOther-Thing = askdjlksajd\r\n\r\nnumber = 3849\r\n# comment\r\n #also a comment\r\n[section0]\r\nvalue=1\r\n\r\n[section1.a]\r\n#dkjsaf\r\nvalue=7\r\n") + if err != nil { + test.Fatal(err) + } + + test.Log("INI:") + test.Log(ini) + if ini.Get("thing") != "\tQuoted string!!!!! " { + test.Fatal("value is not correct") + } + if ini.Get("other-thing") != "askdjlksajd" { + test.Fatal("value is not correct") + } + if ini.Get("number") != "3849" { + test.Fatal("value is not correct") + } + if ini.Get("section0.value") != "1" { + test.Fatal("value is not correct") + } + if ini.Get("section1.a.value") != "7" { + test.Fatal("value is not correct") + } +} + +func TestMergeINI(test *testing.T) { + iv := func(value string) iniValue { + return iniValue { value: value } + } + + ini := mergeINI(iniConfig { + "foo": []iniValue { iv("bar") }, + "baz": []iniValue { iv("something") }, + }, + iniConfig { + "baz": []iniValue { iv("value 1"), iv("value 2") }, + }, + iniConfig { + "thing": []iniValue { iv("????") }, + }); + test.Log(ini) + if !reflect.DeepEqual(ini, iniConfig { + "foo": []iniValue { iv("bar") }, + "baz": []iniValue { iv("value 1"), iv("value 2") }, + "thing": []iniValue { iv("????") }, + }) { + test.Fatal("not equal") + } +} diff --git a/phases.go b/phases.go new file mode 100644 index 0000000..f8c798e --- /dev/null +++ b/phases.go @@ -0,0 +1,266 @@ +package camfish + +import "os" +import "fmt" +import "log" +import "io/fs" +import "errors" +import "context" +import "strings" +import "path/filepath" +import "git.tebibyte.media/sashakoshka/go-cli" +import "git.tebibyte.media/sashakoshka/go-service/daemon" +import "git.tebibyte.media/sashakoshka/go-service/rotate" + +func (this *environment) phase10FlagParsing() bool { + // create flag set and specify built-in flags + set := flagSet { + name: this.name, + description: this.description, + } + flagHelp := set.Flag('h', "help", "Display usage information and exit", nil) + flagPidFile := set.Flag('p', "pid-file", "Write the PID to the specified file", cli.ValString) + flagUser := set.Flag('u', "user", "The user:group to run as", cli.ValString) + flagLogDirectory := set.Flag('l', "log-directory", "Write logs to the specified directory", cli.ValString) + flagConfigFile := set.Flag('c', "config-file", "Use this configuration file", cli.ValString) + flagVerbose := set.Flag('v', "verbose", "Enable verbose output/logging", nil) + + // ask actors to add flags + actors, done := this.actors.RBorrow() + defer done() + for _, actor := range sortActors(actors, actors.flagAdder.all()) { + actor.AddFlags(&set) + } + + // parse flags + err := set.parse(os.Args) + if err != nil { + fmt.Fprintf(os.Stderr, "%s: %v\n", os.Args[0], err) + set.usage() + return false + } + + // handle built-in flags + if _, ok := flagHelp.First(); ok { + set.usage() + return false + } + if pidFile, ok := flagPidFile.First(); ok { + this.flags.pidFile = pidFile + } + if user, ok := flagUser.First(); ok { + this.flags.user = user + } + if logDirectory, ok := flagLogDirectory.First(); ok { + this.flags.logDirectory = logDirectory + } + if configFile, ok := flagConfigFile.First(); ok { + this.flags.configFile = configFile + } + if _, ok := flagVerbose.First(); ok { + this.flags.verbose = true + } + return true +} + +func (this *environment) phase13PidFileCreation() bool { + if this.flags.pidFile != "" { + err := daemon.PidFile(this.flags.pidFile).Start() + if err != nil { + fmt.Fprintf(os.Stderr, "%s: %v\n", os.Args[0], err) + return false + } + } + return true +} + +func (this *environment) phase17PrivilegeDropping() bool { + if this.flags.user != "" { + if this.Verb() { + fmt.Fprintf(os.Stderr, + "%s: dropping privilege to %s\n", + os.Args[0], this.flags.user) + } + user, group, _ := strings.Cut(this.flags.user, ":") + err := dropPrivelege(user, group) + fmt.Fprintf(os.Stderr, "%s: could not drop privilege %v\n", os.Args[0], err) + } + return true +} + +func (this *environment) phase20LogSwitching() bool { + if this.flags.logDirectory != "" { + directory := this.flags.logDirectory + if this.Verb() { + fmt.Fprintf(os.Stderr, + "%s: logging to %s\n", + os.Args[0], directory) + } + directory, err := filepath.Abs(directory) + if err != nil { + fmt.Fprintf(os.Stderr, "%s: could not rotate logs: %v\n", os.Args[0], err) + return false + } + logger, err := rotate.New(directory) + if err != nil { + fmt.Fprintf(os.Stderr, "%s: could not rotate logs: %v\n", os.Args[0], err) + return false + } + defer logger.Close() + log.SetOutput(logger) + } + log.Printf("====== [%s] START =======", this.name) + log.Printf("(i) (20) CAMFISH environment %s", version) + logActors(All()) + return true +} + +func (this *environment) phase30ConfigurationParsing() bool { + if this.Verb() { log.Println("... (30) parsing configuration") } + // blank config if nothing happens + this.conf = make(iniConfig) + // get list of config files + paths, err := configFiles(this.name) + if err != nil { + log.Println("!!! (30) could not determine location of file(s):", err) + return true + } + if this.flags.configFile != "" { + paths = append(paths, this.flags.configFile) + } + // parse every config and merge them all + configs := make([]iniConfig, 0, len(paths)) + for _, path := range paths { + file, err := os.Open(path) + if err != nil { + if !errors.Is(err, fs.ErrNotExist) { + log.Println("!!! (30) file present but inaccessible:", err) + } else if path == this.flags.configFile { + log.Println("!!! (30)", err) + } + continue + } + defer file.Close() + config, err := DecodeINI(path, file) + if err != nil { + log.Println("!!! (30) could not parse:", err) + continue + } + configs = append(configs, config.(iniConfig)) + } + this.conf = mergeINI(configs...) + if this.Verb() { log.Println(".// (30) parsed configuration") } + return true +} + +func (this *environment) phase40ConfigurationProcessing() bool { + if this.Verb() { log.Println("... (40) processing configuration") } + actors, done := this.actors.RBorrow() + defer done() + for _, actor := range sortActors(actors, actors.configProcessor.all()) { + err := actor.ProcessConfig(this.conf) + if err != nil { + log.Println("XXX (50) could not process configuration:", err) + return false + } + } + if this.Verb() { log.Println(".// (40) processed configuration") } + return true +} + +func (this *environment) phase50ConfigurationApplication() bool { + if this.Verb() { log.Println("... (50) applying configuration") } + err := this.applyConfig() + if err != nil { + log.Println("XXX (50) could not apply configuration:", err) + return false + } + actors, done := this.actors.RBorrow() + defer done() + for _, actor := range sortActors(actors, actors.configurable.all()) { + err := actor.Configure(this.conf) + if err != nil { + log.Printf ( + "XXX (50) could not apply configuration to %s: %v", + actor.(Actor).Type(), err) + return false + } + } + if this.Verb() { log.Println(".// (50) applied configuration") } + return true +} + +func (this *environment) phase60Initialization() bool { + if this.Verb() { log.Println("... (60) initializing") } + var initializable []Initializable + func() { + actors, done := this.actors.RBorrow() + defer done() + initializable = actors.initializable.all() + }() + if err := this.initializeActors(this.ctx, initializable...); err != nil { + log.Println(".// (60) failed to initialize:", err) + return false + } + if this.Verb() { log.Println(".// (60) initialized") } + return true +} + +func (this *environment) phase70Running() bool { + defer this.Done(nil) + if this.Verb() { log.Println("... (70) starting up") } + this.running.Store(true) + defer this.running.Store(false) + func() { + actors, done := this.actors.RBorrow() + defer done() + for _, actor := range actors.runnable.all() { + this.start(actor) + } + + }() + log.Println(".// (70) startup sequence complete") + // await context cancellation or waitgroup completion + wgChannel := make(chan struct { }, 1) + go func() { + this.group.Wait() + wgChannel <- struct { } { } + }() + select { + case <- this.ctx.Done(): + if this.Verb() { log.Println("(i) (70) canceled") } + case <- wgChannel: + if this.Verb() { log.Println("(i) (70) all actors have finished") } + } + return true +} + +func (this *environment) phase70_5Trimming() bool { + if this.Verb() { log.Println("... (70.5) trimming") } + var trimmable []Trimmable + func() { + actors, done := this.actors.RBorrow() + defer done() + trimmable = actors.trimmable.all() + }() + if err := this.trimActors(this.ctx, trimmable...); err != nil { + log.Println(".// (70.5) failed to trim:", err) + return false + } + if this.Verb() { log.Println(".// (70.5) trimmed") } + return true +} + +func (this *environment) phase80Shutdown() bool { + cause := context.Cause(this.ctx) + if cause != nil { + log.Println("XXX (80) shutting down because:", cause) + } + log.Println("... (80) waiting for actors to shut down") + defer func() { + log.Println(".// (80) shutdown succeeded, goodbye") + log.Printf("====== [%s] END =======", this.name) + }() + this.group.Wait() + return cause == nil +} diff --git a/run.go b/run.go new file mode 100644 index 0000000..a050587 --- /dev/null +++ b/run.go @@ -0,0 +1,124 @@ +package camfish + +import "iter" +import "context" + +var env environment + +// Run runs the daemon given the slice of actors, and shuts down the program +// when all running actors have stopped. Error and log messages will be printed. +// The correct way to use this function is to have it be the only thing in main: +// +// func main () { +// camfish.Run("name", "what it does", new(SomeActor), new(AnotherActor)) +// } +// +// Run operates in several phases. In each phase that involves actors, the +// actors are operated on in the order they are specified in the variadic actors +// argument. This as well as the order of the phases is considered part of the +// API and are stable for versions above v1.0.0, but some exact details such as +// timing are not and may change in the future. The phases are as follows: +// +// 10. Flag parsing: Actors which implement [FlagAdder] are given an object +// which will allow them to add command line flags. Actors are given the +// object one after the other in the order that they are specified in the +// vararg list for Run. The flags are then parsed, giving values to the +// actors that requested them. +// +// 20. Log switching: The environment begins redirecting all logging output to +// a file if specified in the flags. After Run exits, logging will be +// redirected back to [os.Stderr]. +// +// 30. Configuration parsing: The configuration file is parsed. If a specific +// file was not specified by a flag, it is loaded from +// /etc//.conf +// +// 40. Configuration processing: Actors which implement [ConfigProcessor] are +// given a MutableConfig to read and modify. Actors are given the config +// one after the other in the order that they are specified in the vararg +// list for Run. Actors can use flag values they have received to override +// config values, apply macros, etc. +// +// 50. Configuration application: Actors which implement [Configurable] are +// given a [Config] to read. The order is not guaranteed. +// +// 60. Initialization: Actors which implement [Initializable] are initialized +// in parallel. During this time, actors may do things like establish +// network connections, start up servers, initialize data structures, etc. +// Actors may establish references to each-other during this time, but +// they must not interact yet. The amount of time actors have to do this +// is configurable, but by default it is 8 minutes. The vast majority of +// actors should initialize in under 100 milliseconds. +// +// 70. Running: Actors which implement [Runnable] are run, each in their own +// goroutine. The environment is able to restart actors which have failed, +// which entails resetting the actor if it implements [Resettable], and +// running the actor again within the same goroutine. If an actor does not +// run for a meaningful amount of time after resetting/initialization +// before failing, it is considered erratic and further attempts to restart +// it will be spaced by a limited, constantly increasing time interval. The +// timing is configurable, but by default the threshold for a meaningful +// amount of runtime is 16 seconds, the initial delay interval is 8 +// seconds, the interval increase per attempt is 8 seconds, and the maximum +// interval is one hour. Additionally, programs which implement [Trimmable] +// will be trimmed regularly whenever they are running. The trimming +// interval is also configurable, but by default it is once every minute. +// When an actor which implements [Resettable] is reset, it is given a +// configurable timeout, which is 8 minutes by default. +// +// 80. Shutdown: This can be triggered by all actors being removed from the +// environment, a catastrophic error, [Done] being called, or the program +// recieving SIGINT. If necessary, the environment shuts down all running +// actors and waits for them to stop. If they do not all stop in time, an +// error message is printed and the program will exit with a non-zero code. +// Otherwise, it will exit with a code of 0. The amount of time actors +// have to shut down is configurable, but by default it is 8 minutes. +func Run(name, description string, actors ...Actor) { + env.Run(name, description, actors...) +} + +// Done sends a shutdown signal to the environment with the given "cause" error. +// This will be logged as the reason for the shutdown. +func Done(cause error) { + env.Done(cause) +} + +// Add adds an actor to the environment, starting it if necessary. If the +// environment is not running, it does nothing. Note that this function will +// block the current goroutine while the actors are initializing. +func Add(ctx context.Context, actors ...Actor) error { + return env.Add(ctx, actors...) +} + +// Del removes an actor from the environment, stopping it if necessary. If the +// environment is not running, it does nothing. Note that this function will +// block the current goroutine while the actors are shutting down. +func Del(ctx context.Context, actors ...Actor) error { + return env.Del(ctx , actors...) +} + +// Find finds an actor in the environment with the given type name. If no actor +// is found or the environment is not running, it returns nil. +func Find(typ string) Actor { + return env.Find(typ) +} + +// FindAll returns an iterator over all actors in the environment with the given +// type name. If the environment is not running, it returns an empty iterator. +func FindAll(typ string) iter.Seq[Actor] { + return env.FindAll(typ) +} + +// All returns an iterator over all actors in the environment. If the +// environment is not running, it returns an empty iterator. +func All() iter.Seq[Actor] { + return env.All() +} + +// Verb returns true if verbose output is permitted. Actors should log less +// information when this is false. +func Verb() bool { + return env.Verb() +} + +// tell me how tf its snowiung outside if its 33° f diff --git a/util.go b/util.go new file mode 100644 index 0000000..1545db8 --- /dev/null +++ b/util.go @@ -0,0 +1,104 @@ +package camfish + +import "fmt" +import "log" +import "time" +import "iter" +import "errors" +import "strconv" +import "syscall" +import "os/user" +import "strings" +import "context" +import "sync/atomic" +import "unicode/utf8" + +func defaul[T comparable](value, def T) T { + var zero T + if value == zero { return def } + return value +} + +func panicWrap(ctx context.Context, f func (context.Context) error) (err error) { + defer func () { + if pan := recover(); pan != nil { + if panErr, ok := pan.(error); ok { + err = panErr + } else { + err = errors.New(fmt.Sprint(pan)) + } + } + } () + + err = f(ctx) + return +} + +type atomicDuration atomic.Int64 +type atomicInt64Ptr = *atomic.Int64 + +func (duration* atomicDuration) Load() time.Duration { + return time.Duration((*atomic.Int64)(duration).Load()) +} + +func (duration *atomicDuration) Store(newDuration time.Duration) { + (*atomic.Int64)(duration).Store(int64(newDuration)) +} + +func logActors (actors iter.Seq[Actor]) { + output := "actors: " + x := utf8.RuneCountInString(output) + first := true + line := func () { + if output == "" { return } + if first { + first = false + log.Println("(i)", output) + } else { + log.Println(" ", output) + } + output = "" + x = 0 + } + types := make(map[string] int) + for actor := range actors { + types[actor.Type()] += 1 + } + for typ, count := range types { + if count > 1 { + typ += fmt.Sprintf("(%d)", count) + } + typ += ", " + x += 2 + typeLen := utf8.RuneCountInString(typ) + if x + typeLen >= 60 { + line() + } + output += typ + x += typeLen + } + if output != "" { + output = strings.TrimSuffix(output, ", ") + } + line() +} + +func dropPrivelege (usr, group string) error { + if group != "" { + groupInfo, err := user.LookupGroup(group) + if err != nil { return err } + gid, err := strconv.Atoi(groupInfo.Gid) + if err != nil { return err } + err = syscall.Setgid(gid) + if err != nil { return err } + } + if usr != "" { + usrInfo, err := user.Lookup(usr) + if err != nil { return err } + uid, err := strconv.Atoi(usrInfo.Uid) + if err != nil { return err } + err = syscall.Setuid(uid) + if err != nil { return err } + } + return nil +} diff --git a/util_test.go b/util_test.go new file mode 100644 index 0000000..8062ce8 --- /dev/null +++ b/util_test.go @@ -0,0 +1,35 @@ +package camfish + +import "errors" +import "testing" +import "context" + +func TestDefaul(test *testing.T) { + value := defaul("", "askjd") + test.Log(value) + if value != "askjd" { test.Fatal("not equal") } + value1 := defaul("zzzz", "askjd") + test.Log(value1) + if value1 != "zzzz" { test.Fatal("not equal") } + value2 := defaul(0, 3) + test.Log(value2) + if value2 != 3 { test.Fatal("not equal") } +} + +func TestPanicWrap(test *testing.T) { + err := panicWrap(context.Background(), func (ctx context.Context) error { + return errors.New("test case 0") + }) + test.Log(err) + if err.Error() != "test case 0" { test.Fatal("not equal") } + err = panicWrap(context.Background(), func (ctx context.Context) error { + panic(errors.New("test case 1")) + }) + test.Log(err) + if err.Error() != "test case 1" { test.Fatal("not equal") } + err = panicWrap(context.Background(), func (ctx context.Context) error { + return nil + }) + test.Log(err) + if err != nil { test.Fatal("not equal") } +} diff --git a/version.go b/version.go new file mode 100644 index 0000000..435f79e --- /dev/null +++ b/version.go @@ -0,0 +1,3 @@ +package camfish + +const version = "v0.0.0"