From 709f41a974b64296b34ccaa1d8a803edd326cf88 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Tue, 5 Nov 2024 17:12:50 -0500 Subject: [PATCH] sync: Add a Reset method to Gate which re-opens it. --- sync/gate.go | 28 ++++++++++++++++++++++------ 1 file changed, 22 insertions(+), 6 deletions(-) diff --git a/sync/gate.go b/sync/gate.go index f470da8..f55a18d 100644 --- a/sync/gate.go +++ b/sync/gate.go @@ -5,9 +5,10 @@ import "sync" // Gate wraps a channel and allows the receiver to abruptly stop receiving // messages without causing the sender to lock up. type Gate[T any] struct { - channel chan T - lock sync.RWMutex - open bool + channel chan T + lock sync.RWMutex + open bool + bufferSize int } // NewGate creates a new gate with no buffer. @@ -21,20 +22,25 @@ func NewGate[T any] () Gate[T] { // NewBufferedGate creates a new gate with a buffer. func NewBufferedGate[T any] (buffer int) Gate[T] { return Gate[T] { - channel: make(chan T, buffer), - open: true, + channel: make(chan T, buffer), + open: true, + bufferSize: buffer, } } // Send sends and item to the channel, returning whether the item was sent. func (this *Gate[T]) Send (item T) bool { - if !this.Open() { return false } + this.lock.RLock() + defer this.lock.RUnlock() + if !this.open { return false } this.channel <- item return true } // Receive returns a receive-only channel that can be used to receive items. func (this *Gate[T]) Receive () <- chan T { + this.lock.RLock() + defer this.lock.RUnlock() return this.channel } @@ -48,6 +54,16 @@ func (this *Gate[T]) Close () error { return nil } +// Reset re-opens the gate if it is closed, and creates a new channel. +func (this *Gate[T]) Reset () error { + this.lock.Lock() + defer this.lock.Unlock() + this.open = true + if this.channel != nil { close(this.channel) } + this.channel = make(chan T, this.bufferSize) + return nil +} + // Open returns whether the gate is open. func (this *Gate[T]) Open () bool { this.lock.RLock()