From fd85de4a44501cb29242653b7d60a3fc310f6ddc Mon Sep 17 00:00:00 2001 From: "sashakoshka@tebibyte.media" Date: Sun, 26 Jan 2025 00:09:02 -0500 Subject: [PATCH] sync: Found a way to make Gate work --- sync/gate.go | 61 ++++++++++++++++++++++------------------------------ 1 file changed, 26 insertions(+), 35 deletions(-) diff --git a/sync/gate.go b/sync/gate.go index cc63d4b..1ad6fd3 100644 --- a/sync/gate.go +++ b/sync/gate.go @@ -1,6 +1,6 @@ package usync -import "sync" +import "sync/atomic" // Error defines errors that this package can produce type Error string; const ( @@ -12,83 +12,74 @@ func (err Error) Error () string { return string(err) } +// if you know of a better way, i'd like to hear of it. otherwise, STFU!!!! + // Gate wraps a channel and allows the receiver to abruptly stop receiving // messages without causing the sender to lock up. type Gate[T any] struct { - channel chan T - lock sync.RWMutex - open bool + channel atomic.Value bufferSize int } // NewGate creates a new gate with no buffer. func NewGate[T any] () Gate[T] { - return Gate[T] { - channel: make(chan T), - open: true, - } + return NewBufferedGate[T](0) } // NewBufferedGate creates a new gate with a buffer. func NewBufferedGate[T any] (buffer int) Gate[T] { - return Gate[T] { - channel: make(chan T, buffer), - open: true, + gate := Gate[T] { bufferSize: buffer, } + gate.channel.Store(make(chan T, buffer)) + return gate } // Send sends and item to the channel, returning whether the item was sent. -func (this *Gate[T]) Send (item T) bool { - this.lock.RLock() - defer this.lock.RUnlock() - if !this.open { return false } - this.channel <- item +func (this *Gate[T]) Send (item T) (ok bool) { + channel := this.load() + if channel == nil { return false } + // if the channel send panics, "return true" never gets run so it just + // returns false after recovering + defer func() { recover() }() + channel <- item return true } // Receive returns a receive-only channel that can be used to receive items. func (this *Gate[T]) Receive () <- chan T { - this.lock.RLock() - defer this.lock.RUnlock() - return this.channel + return this.load() } // Close closes the gate, drains all remaining messages, and closes the channel. func (this *Gate[T]) Close () error { - this.lock.Lock() - if !this.open { return ErrAlreadyClosed } - this.open = false - this.lock.Unlock() - for len(this.channel) > 0 { <- this.channel } - close(this.channel) + channel := this.channel.Swap((chan T)(nil)).(chan T) + if channel == nil { return ErrAlreadyClosed } + close(channel) return nil } // Reset re-opens the gate if it is closed, and creates a new channel. func (this *Gate[T]) Reset () error { - this.lock.Lock() - defer this.lock.Unlock() - this.open = true - if this.channel != nil { close(this.channel) } - this.channel = make(chan T, this.bufferSize) + this.channel.Store(make(chan T, this.bufferSize)) return nil } // Open returns whether the gate is open. func (this *Gate[T]) Open () bool { - this.lock.RLock() - defer this.lock.RUnlock() - return this.open + return this.load() != nil } // Len returns the amount of items in the channel. func (this *Gate[T]) Len () int { - return len(this.channel) + return len(this.load()) } // Cap returns the amount of items the channel can hold. func (this *Gate[T]) Cap () int { - return cap(this.channel) + return cap(this.load()) } +func (this *Gate[T]) load() chan T { + return this.channel.Load().(chan T) +}