// Example pipeline demonstrates a three-stage pipeline with an actor for each // stage. package main import "log" import "time" import "context" import "math/rand" import "git.tebibyte.media/sashakoshka/camfish" func main() { camfish.Run("pipeline", "Example pipeline demonstrates a three-stage pipeline with " + "an actor for each stage", new(generator), new(reverser), new(printer)) } // generator produces strings randomly. type generator struct { } var _ camfish.Runnable = new(generator) func (this *generator) Type() string { return "generator" } func (this *generator) Run(ctx context.Context) error { reverser := camfish.Find("reverser").(*reverser) timer := time.NewTimer(0) for { select { case <- timer.C: timer.Reset(time.Duration(float64(time.Second) * (rand.Float64() + 0.2))) reverser.Send([]string { "red", "yellow", "green", "blue", }[rand.Int() % 4]) case <- ctx.Done(): return ctx.Err() } } } // reverser reverses strings. type reverser struct { input chan string } var _ camfish.Runnable = new(reverser) var _ camfish.Initializable = new(reverser) func (this *reverser) Type() string { return "reverser" } func (this *reverser) Init(ctx context.Context) error { this.input = make(chan string) return ctx.Err() } func (this *reverser) Run(ctx context.Context) error { printer := camfish.Find("printer").(*printer) for { select { case str := <- this.input: runes := []rune(str) for i, j := 0, len(runes) - 1; i < j; i, j = i + 1, j - 1 { runes[i], runes[j] = runes[j], runes[i] } printer.Print(string(runes)) case <- ctx.Done(): return ctx.Err() } } } func (this *reverser) Send(str string) { this.input <- str } // printer prints strings. type printer struct { input chan string } var _ camfish.Runnable = new(printer) var _ camfish.Initializable = new(printer) func (this *printer) Type() string { return "printer" } func (this *printer) Init(ctx context.Context) error { this.input = make(chan string) return ctx.Err() } func (this *printer) Run(ctx context.Context) error { for { select { case str := <- this.input: log.Println("(i) [printer]", str) case <- ctx.Done(): return ctx.Err() } } } func (this *printer) Print(str string) { this.input <- str }