From f40eca261b10526cef065a74b6095a147801eed2 Mon Sep 17 00:00:00 2001 From: "sashakoshka@tebibyte.media" Date: Thu, 31 Oct 2024 17:46:28 -0400 Subject: [PATCH] Add sync package containing Gate --- sync/gate.go | 67 ++++++++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 67 insertions(+) create mode 100644 sync/gate.go diff --git a/sync/gate.go b/sync/gate.go new file mode 100644 index 0000000..f470da8 --- /dev/null +++ b/sync/gate.go @@ -0,0 +1,67 @@ +package usync + +import "sync" + +// Gate wraps a channel and allows the receiver to abruptly stop receiving +// messages without causing the sender to lock up. +type Gate[T any] struct { + channel chan T + lock sync.RWMutex + open bool +} + +// NewGate creates a new gate with no buffer. +func NewGate[T any] () Gate[T] { + return Gate[T] { + channel: make(chan T), + open: true, + } +} + +// NewBufferedGate creates a new gate with a buffer. +func NewBufferedGate[T any] (buffer int) Gate[T] { + return Gate[T] { + channel: make(chan T, buffer), + open: true, + } +} + +// Send sends and item to the channel, returning whether the item was sent. +func (this *Gate[T]) Send (item T) bool { + if !this.Open() { return false } + this.channel <- item + return true +} + +// Receive returns a receive-only channel that can be used to receive items. +func (this *Gate[T]) Receive () <- chan T { + return this.channel +} + +// Close closes the gate, drains all remaining messages, and closes the channel. +func (this *Gate[T]) Close () error { + this.lock.Lock() + this.open = false + this.lock.Unlock() + for len(this.channel) > 0 { <- this.channel } + close(this.channel) + return nil +} + +// Open returns whether the gate is open. +func (this *Gate[T]) Open () bool { + this.lock.RLock() + defer this.lock.RUnlock() + return this.open +} + +// Len returns the amount of items in the channel. +func (this *Gate[T]) Len () int { + return len(this.channel) +} + +// Cap returns the amount of items the channel can hold. +func (this *Gate[T]) Cap () int { + return cap(this.channel) +} +