diff --git a/metadapta.go b/metadapta.go index 503e8af..03bef8e 100644 --- a/metadapta.go +++ b/metadapta.go @@ -9,7 +9,7 @@ import "git.tebibyte.media/sashakoshka/go-util/sync" const closeMethod = 0xFFFF const int64Max = int64((^uint64(0)) >> 1) - +const defaultChunkSize = 0x1000 // Party represents a side of a connection. type Party bool; const ( @@ -171,6 +171,8 @@ type transA struct { id int64 incoming usync.Gate[incomingMessage] currentReader io.Reader + currentWriter io.Closer + writeBuffer []byte } func (this *transA) Close() error { @@ -192,6 +194,27 @@ func (this *transA) Send(method uint16, data []byte) error { return this.parent.sendMessageSafe(this.id, method, data) } +func (this *transA) SendWriter(method uint16) (io.Writer, error) { + // close previous writer if necessary + if this.currentWriter != nil { + this.currentWriter.Close() + this.currentWriter = nil + } + + // create new writer + writer := &writerA { + parent: this, + // there is only ever one writer at a time, so they can all + // share a buffer + buffer: this.writeBuffer[:0], + method: method, + chunkSize: defaultChunkSize, + open: true, + } + this.currentWriter = writer + return writer, nil +} + func (this *transA) Receive() (method uint16, data []byte, err error) { method, reader, err := this.ReceiveReader() if err != nil { return 0, nil, err } @@ -263,6 +286,57 @@ func (this *readerA) Read(buffer []byte) (int, error) { return copied, nil } +type writerA struct { + parent *transA + buffer []byte + method uint16 + chunkSize int64 + open bool +} + +func (this *writerA) Write(data []byte) (n int, err error) { + if !this.open { return 0, io.EOF } + toSend := data + for len(toSend) > 0 { + nn, err := this.writeOne(toSend) + n += nn + toSend = toSend[nn:] + if err != nil { return n, err } + } + return n, nil +} + +func (this *writerA) Close() error { + this.open = false + return nil +} + +func (this *writerA) writeOne(data []byte) (n int, err error) { + data = data[:min(len(data), int(this.chunkSize))] + + // if there is more room, append to the buffer and exit + if int64(len(this.buffer) + len(data)) <= this.chunkSize { + this.buffer = append(this.buffer, data...) + n = len(data) + // if have a full chunk, flush + if int64(len(this.buffer)) == this.chunkSize { + err = this.flush() + if err != nil { return n, err } + } + return n, nil + } + + // if not, flush and store as much as we can in the buffer + err = this.flush() + if err != nil { return n, err } + this.buffer = append(this.buffer, data...) + return n, nil +} + +func (this *writerA) flush() error { + return this.parent.parent.sendMessageSafe(this.parent.id, this.method, this.buffer) +} + type incomingMessage struct { method uint16 chunked bool