package usync import "sync/atomic" // Error defines errors that this package can produce type Error string; const ( ErrAlreadyClosed Error = "AlreadyClosed" ) // Error fullfills the error interface. func (err Error) Error () string { return string(err) } // if you know of a better way, i'd like to hear of it. otherwise, STFU!!!! // Gate wraps a channel and allows the receiver to abruptly stop receiving // messages without causing the sender to lock up. type Gate[T any] struct { channel atomic.Value bufferSize int } // NewGate creates a new gate with no buffer. func NewGate[T any] () Gate[T] { return NewBufferedGate[T](0) } // NewBufferedGate creates a new gate with a buffer. func NewBufferedGate[T any] (buffer int) Gate[T] { gate := Gate[T] { bufferSize: buffer, } gate.channel.Store(make(chan T, buffer)) return gate } // Send sends and item to the channel, returning whether the item was sent. func (this *Gate[T]) Send (item T) (ok bool) { channel := this.load() if channel == nil { return false } // if the channel send panics, "return true" never gets run so it just // returns false after recovering defer func() { recover() }() channel <- item return true } // Receive returns a receive-only channel that can be used to receive items. func (this *Gate[T]) Receive () <- chan T { return this.load() } // Close closes the gate, unblocking any send or receive operations. func (this *Gate[T]) Close () error { channel := this.channel.Swap((chan T)(nil)).(chan T) if channel == nil { return ErrAlreadyClosed } close(channel) return nil } // Reset re-opens the gate if it is closed, and creates a new channel. func (this *Gate[T]) Reset () error { this.channel.Store(make(chan T, this.bufferSize)) return nil } // Open returns whether the gate is open. func (this *Gate[T]) Open () bool { return this.load() != nil } // Len returns the amount of items in the channel. func (this *Gate[T]) Len () int { return len(this.load()) } // Cap returns the amount of items the channel can hold. func (this *Gate[T]) Cap () int { return cap(this.load()) } func (this *Gate[T]) load() chan T { return this.channel.Load().(chan T) }