Implement SendWriter for METADAPT-B
This commit is contained in:
parent
9d2bbec7f9
commit
7a0bf64c17
40
metadaptb.go
40
metadaptb.go
@ -2,6 +2,7 @@ package hopp
|
|||||||
|
|
||||||
import "io"
|
import "io"
|
||||||
import "net"
|
import "net"
|
||||||
|
import "bytes"
|
||||||
import "errors"
|
import "errors"
|
||||||
import "context"
|
import "context"
|
||||||
import "git.tebibyte.media/sashakoshka/hopp/tape"
|
import "git.tebibyte.media/sashakoshka/hopp/tape"
|
||||||
@ -58,9 +59,10 @@ func (this *b) newTrans(underlying Stream) *transB {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type transB struct {
|
type transB struct {
|
||||||
sizeLimit int64
|
sizeLimit int64
|
||||||
underlying Stream
|
underlying Stream
|
||||||
currentData io.Reader
|
currentData io.Reader
|
||||||
|
currentWriter *writerB
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *transB) Close() error {
|
func (this *transB) Close() error {
|
||||||
@ -75,6 +77,24 @@ func (this *transB) Send(method uint16, data []byte) error {
|
|||||||
return encodeMessageB(this.underlying, this.sizeLimit, method, data)
|
return encodeMessageB(this.underlying, this.sizeLimit, method, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *transB) SendWriter(method uint16) (io.WriteCloser, error) {
|
||||||
|
if this.currentWriter != nil {
|
||||||
|
this.currentWriter.Close()
|
||||||
|
}
|
||||||
|
// TODO: come up with a fix that allows us to pipe data through the
|
||||||
|
// writer. as of now, it just reads whatever is written into a buffer
|
||||||
|
// and sends the message on close. we should probably introduce chunked
|
||||||
|
// encoding to METADAPT-B to fix this. the implementation would be
|
||||||
|
// simpler than on METADAPT-A, but most of the code could just be
|
||||||
|
// copied over.
|
||||||
|
writer := &writerB {
|
||||||
|
parent: this,
|
||||||
|
method: method,
|
||||||
|
}
|
||||||
|
this.currentWriter = writer
|
||||||
|
return writer, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (this *transB) Receive() (uint16, []byte, error) {
|
func (this *transB) Receive() (uint16, []byte, error) {
|
||||||
// get a reader for the next message
|
// get a reader for the next message
|
||||||
method, size, data, err := this.receiveReader()
|
method, size, data, err := this.receiveReader()
|
||||||
@ -105,6 +125,20 @@ func (this *transB) receiveReader() (uint16, int64, io.Reader, error) {
|
|||||||
return method, size, data, nil
|
return method, size, data, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
type writerB struct {
|
||||||
|
parent *transB
|
||||||
|
buffer bytes.Buffer
|
||||||
|
method uint16
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *writerB) Write(data []byte) (int, error) {
|
||||||
|
return this.buffer.Write(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *writerB) Close() error {
|
||||||
|
return this.parent.Send(this.method, this.buffer.Bytes())
|
||||||
|
}
|
||||||
|
|
||||||
// MultiConn represens a multiplexed stream-oriented transport for use in
|
// MultiConn represens a multiplexed stream-oriented transport for use in
|
||||||
// [AdaptB].
|
// [AdaptB].
|
||||||
type MultiConn interface {
|
type MultiConn interface {
|
||||||
|
Loading…
Reference in New Issue
Block a user