package service import "io" import "sync" import "hnakra/protocol" const HTTP_BODY_CHANNEL_BUFFER_SIZE = 32 type requestManager struct { requests map[protocol.ID] chan []byte lock sync.RWMutex } func (manager *requestManager) init () { manager.requests = make(map[protocol.ID] chan []byte) } func (manager *requestManager) add (id protocol.ID) { manager.lock.Lock() manager.requests[id] = make(chan []byte) manager.lock.Unlock() } func (manager *requestManager) feed (id protocol.ID, buffer []byte) { manager.lock.RLock() if manager.requests[id] != nil { manager.requests[id] <- buffer } manager.lock.RUnlock() } func (manager *requestManager) end (id protocol.ID) { manager.lock.RLock() if manager.requests[id] != nil { close(manager.requests[id]) manager.requests[id] = nil } manager.lock.RUnlock() } func (manager *requestManager) remove (id protocol.ID) { manager.lock.Lock() delete(manager.requests, id) manager.lock.Unlock() } func (manager *requestManager) readerFor (id protocol.ID) io.Reader { manager.lock.RLock() channel := manager.requests[id] manager.lock.RUnlock() return byteChannelReader { channel: channel } } type byteChannelReader struct { channel <- chan []byte leftover []byte } func (reader byteChannelReader) Read (buffer []byte) (amount int, err error) { // first, try to fill the buffer with leftover data amount = copy(buffer, reader.leftover) if amount > 0 { reader.leftover = reader.leftover[amount:] return } // if there is no leftover data, read from the channel incoming, open := <- reader.channel if !open { return 0, io.EOF } amount = copy(buffer, incoming) // save any data we could not fit into the buffer reader.leftover = incoming[amount:] return }