76 lines
1.7 KiB
Go
76 lines
1.7 KiB
Go
package service
|
|
|
|
import "io"
|
|
import "sync"
|
|
import "hnakra/protocol"
|
|
|
|
const HTTP_BODY_CHANNEL_BUFFER_SIZE = 32
|
|
|
|
type requestManager struct {
|
|
requests map[protocol.ID] chan []byte
|
|
lock sync.RWMutex
|
|
}
|
|
|
|
func (manager *requestManager) init () {
|
|
manager.requests = make(map[protocol.ID] chan []byte)
|
|
}
|
|
|
|
func (manager *requestManager) add (id protocol.ID) {
|
|
manager.lock.Lock()
|
|
manager.requests[id] = make(chan []byte)
|
|
manager.lock.Unlock()
|
|
}
|
|
|
|
func (manager *requestManager) feed (id protocol.ID, buffer []byte) {
|
|
manager.lock.RLock()
|
|
if manager.requests[id] != nil {
|
|
manager.requests[id] <- buffer
|
|
}
|
|
manager.lock.RUnlock()
|
|
}
|
|
|
|
func (manager *requestManager) end (id protocol.ID) {
|
|
manager.lock.RLock()
|
|
if manager.requests[id] != nil {
|
|
close(manager.requests[id])
|
|
manager.requests[id] = nil
|
|
}
|
|
manager.lock.RUnlock()
|
|
}
|
|
|
|
func (manager *requestManager) remove (id protocol.ID) {
|
|
manager.lock.Lock()
|
|
delete(manager.requests, id)
|
|
manager.lock.Unlock()
|
|
}
|
|
|
|
func (manager *requestManager) readerFor (id protocol.ID) io.Reader {
|
|
manager.lock.RLock()
|
|
channel := manager.requests[id]
|
|
manager.lock.RUnlock()
|
|
return byteChannelReader { channel: channel }
|
|
}
|
|
|
|
type byteChannelReader struct {
|
|
channel <- chan []byte
|
|
leftover []byte
|
|
}
|
|
|
|
func (reader byteChannelReader) Read (buffer []byte) (amount int, err error) {
|
|
// first, try to fill the buffer with leftover data
|
|
amount = copy(buffer, reader.leftover)
|
|
if amount > 0 {
|
|
reader.leftover = reader.leftover[amount:]
|
|
return
|
|
}
|
|
|
|
// if there is no leftover data, read from the channel
|
|
incoming, open := <- reader.channel
|
|
if !open { return 0, io.EOF }
|
|
amount = copy(buffer, incoming)
|
|
|
|
// save any data we could not fit into the buffer
|
|
reader.leftover = incoming[amount:]
|
|
return
|
|
}
|