sync: Add a Reset method to Gate which re-opens it.
This commit is contained in:
		
							parent
							
								
									f40eca261b
								
							
						
					
					
						commit
						709f41a974
					
				
							
								
								
									
										28
									
								
								sync/gate.go
									
									
									
									
									
								
							
							
						
						
									
										28
									
								
								sync/gate.go
									
									
									
									
									
								
							| @ -5,9 +5,10 @@ import "sync" | ||||
| // Gate wraps a channel and allows the receiver to abruptly stop receiving | ||||
| // messages without causing the sender to lock up. | ||||
| type Gate[T any] struct { | ||||
| 	channel chan T | ||||
| 	lock sync.RWMutex | ||||
| 	open bool | ||||
| 	channel    chan T | ||||
| 	lock       sync.RWMutex | ||||
| 	open       bool | ||||
| 	bufferSize int | ||||
| } | ||||
| 
 | ||||
| // NewGate creates a new gate with no buffer. | ||||
| @ -21,20 +22,25 @@ func NewGate[T any] () Gate[T] { | ||||
| // NewBufferedGate creates a new gate with a buffer. | ||||
| func NewBufferedGate[T any] (buffer int) Gate[T] { | ||||
| 	return Gate[T] { | ||||
| 		channel: make(chan T, buffer), | ||||
| 		open:    true, | ||||
| 		channel:    make(chan T, buffer), | ||||
| 		open:       true, | ||||
| 		bufferSize: buffer, | ||||
| 	} | ||||
| } | ||||
| 
 | ||||
| // Send sends and item to the channel, returning whether the item was sent. | ||||
| func (this *Gate[T]) Send (item T) bool { | ||||
| 	if !this.Open() { return false } | ||||
| 	this.lock.RLock() | ||||
| 	defer this.lock.RUnlock() | ||||
| 	if !this.open { return false } | ||||
| 	this.channel <- item | ||||
| 	return true | ||||
| } | ||||
| 
 | ||||
| // Receive returns a receive-only channel that can be used to receive items. | ||||
| func (this *Gate[T]) Receive () <- chan T { | ||||
| 	this.lock.RLock() | ||||
| 	defer this.lock.RUnlock() | ||||
| 	return this.channel | ||||
| } | ||||
| 
 | ||||
| @ -48,6 +54,16 @@ func (this *Gate[T]) Close () error { | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // Reset re-opens the gate if it is closed, and creates a new channel. | ||||
| func (this *Gate[T]) Reset () error { | ||||
| 	this.lock.Lock() | ||||
| 	defer this.lock.Unlock() | ||||
| 	this.open = true | ||||
| 	if this.channel != nil { close(this.channel) } | ||||
| 	this.channel = make(chan T, this.bufferSize) | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| // Open returns whether the gate is open. | ||||
| func (this *Gate[T]) Open () bool { | ||||
| 	this.lock.RLock() | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user