diff --git a/metadaptb.go b/metadaptb.go index 867dedb..c91db72 100644 --- a/metadaptb.go +++ b/metadaptb.go @@ -2,6 +2,7 @@ package hopp import "io" import "net" +import "bytes" import "errors" import "context" import "git.tebibyte.media/sashakoshka/hopp/tape" @@ -58,9 +59,10 @@ func (this *b) newTrans(underlying Stream) *transB { } type transB struct { - sizeLimit int64 - underlying Stream - currentData io.Reader + sizeLimit int64 + underlying Stream + currentData io.Reader + currentWriter *writerB } func (this *transB) Close() error { @@ -75,6 +77,24 @@ func (this *transB) Send(method uint16, data []byte) error { return encodeMessageB(this.underlying, this.sizeLimit, method, data) } +func (this *transB) SendWriter(method uint16) (io.WriteCloser, error) { + if this.currentWriter != nil { + this.currentWriter.Close() + } + // TODO: come up with a fix that allows us to pipe data through the + // writer. as of now, it just reads whatever is written into a buffer + // and sends the message on close. we should probably introduce chunked + // encoding to METADAPT-B to fix this. the implementation would be + // simpler than on METADAPT-A, but most of the code could just be + // copied over. + writer := &writerB { + parent: this, + method: method, + } + this.currentWriter = writer + return writer, nil +} + func (this *transB) Receive() (uint16, []byte, error) { // get a reader for the next message method, size, data, err := this.receiveReader() @@ -105,6 +125,20 @@ func (this *transB) receiveReader() (uint16, int64, io.Reader, error) { return method, size, data, nil } +type writerB struct { + parent *transB + buffer bytes.Buffer + method uint16 +} + +func (this *writerB) Write(data []byte) (int, error) { + return this.buffer.Write(data) +} + +func (this *writerB) Close() error { + return this.parent.Send(this.method, this.buffer.Bytes()) +} + // MultiConn represens a multiplexed stream-oriented transport for use in // [AdaptB]. type MultiConn interface {