Compare commits

...

4 Commits

3 changed files with 118 additions and 8 deletions

View File

@ -44,10 +44,12 @@ type Trans interface {
// Send sends a message. This method is not safe for concurrent use. // Send sends a message. This method is not safe for concurrent use.
Send(method uint16, data []byte) error Send(method uint16, data []byte) error
// SendWriter sends data written to an [io.Writer]. Any writer // SendWriter sends data written to an [io.Writer]. The writer must be
// previously opened through this function will be discarded. This // closed after use. Closing the writer flushes any data that hasn't
// method is not safe for concurrent use, and neither is its result. // been written yet. Any writer previously opened through this function
SendWriter(method uint16) (io.Writer, error) // will be discarded. This method is not safe for concurrent use, and
// neither is its result.
SendWriter(method uint16) (io.WriteCloser, error)
// Receive receives a message. This method is not safe for concurrent // Receive receives a message. This method is not safe for concurrent
// use. // use.
Receive() (method uint16, data []byte, err error) Receive() (method uint16, data []byte, err error)

View File

@ -9,7 +9,7 @@ import "git.tebibyte.media/sashakoshka/go-util/sync"
const closeMethod = 0xFFFF const closeMethod = 0xFFFF
const int64Max = int64((^uint64(0)) >> 1) const int64Max = int64((^uint64(0)) >> 1)
const defaultChunkSize = 0x1000
// Party represents a side of a connection. // Party represents a side of a connection.
type Party bool; const ( type Party bool; const (
@ -171,6 +171,8 @@ type transA struct {
id int64 id int64
incoming usync.Gate[incomingMessage] incoming usync.Gate[incomingMessage]
currentReader io.Reader currentReader io.Reader
currentWriter io.Closer
writeBuffer []byte
} }
func (this *transA) Close() error { func (this *transA) Close() error {
@ -192,6 +194,27 @@ func (this *transA) Send(method uint16, data []byte) error {
return this.parent.sendMessageSafe(this.id, method, data) return this.parent.sendMessageSafe(this.id, method, data)
} }
func (this *transA) SendWriter(method uint16) (io.WriteCloser, error) {
// close previous writer if necessary
if this.currentWriter != nil {
this.currentWriter.Close()
this.currentWriter = nil
}
// create new writer
writer := &writerA {
parent: this,
// there is only ever one writer at a time, so they can all
// share a buffer
buffer: this.writeBuffer[:0],
method: method,
chunkSize: defaultChunkSize,
open: true,
}
this.currentWriter = writer
return writer, nil
}
func (this *transA) Receive() (method uint16, data []byte, err error) { func (this *transA) Receive() (method uint16, data []byte, err error) {
method, reader, err := this.ReceiveReader() method, reader, err := this.ReceiveReader()
if err != nil { return 0, nil, err } if err != nil { return 0, nil, err }
@ -263,6 +286,57 @@ func (this *readerA) Read(buffer []byte) (int, error) {
return copied, nil return copied, nil
} }
type writerA struct {
parent *transA
buffer []byte
method uint16
chunkSize int64
open bool
}
func (this *writerA) Write(data []byte) (n int, err error) {
if !this.open { return 0, io.EOF }
toSend := data
for len(toSend) > 0 {
nn, err := this.writeOne(toSend)
n += nn
toSend = toSend[nn:]
if err != nil { return n, err }
}
return n, nil
}
func (this *writerA) Close() error {
this.open = false
return nil
}
func (this *writerA) writeOne(data []byte) (n int, err error) {
data = data[:min(len(data), int(this.chunkSize))]
// if there is more room, append to the buffer and exit
if int64(len(this.buffer) + len(data)) <= this.chunkSize {
this.buffer = append(this.buffer, data...)
n = len(data)
// if have a full chunk, flush
if int64(len(this.buffer)) == this.chunkSize {
err = this.flush()
if err != nil { return n, err }
}
return n, nil
}
// if not, flush and store as much as we can in the buffer
err = this.flush()
if err != nil { return n, err }
this.buffer = append(this.buffer, data...)
return n, nil
}
func (this *writerA) flush() error {
return this.parent.parent.sendMessageSafe(this.parent.id, this.method, this.buffer)
}
type incomingMessage struct { type incomingMessage struct {
method uint16 method uint16
chunked bool chunked bool

View File

@ -2,6 +2,7 @@ package hopp
import "io" import "io"
import "net" import "net"
import "bytes"
import "errors" import "errors"
import "context" import "context"
import "git.tebibyte.media/sashakoshka/hopp/tape" import "git.tebibyte.media/sashakoshka/hopp/tape"
@ -58,9 +59,10 @@ func (this *b) newTrans(underlying Stream) *transB {
} }
type transB struct { type transB struct {
sizeLimit int64 sizeLimit int64
underlying Stream underlying Stream
currentData io.Reader currentData io.Reader
currentWriter *writerB
} }
func (this *transB) Close() error { func (this *transB) Close() error {
@ -75,6 +77,24 @@ func (this *transB) Send(method uint16, data []byte) error {
return encodeMessageB(this.underlying, this.sizeLimit, method, data) return encodeMessageB(this.underlying, this.sizeLimit, method, data)
} }
func (this *transB) SendWriter(method uint16) (io.WriteCloser, error) {
if this.currentWriter != nil {
this.currentWriter.Close()
}
// TODO: come up with a fix that allows us to pipe data through the
// writer. as of now, it just reads whatever is written into a buffer
// and sends the message on close. we should probably introduce chunked
// encoding to METADAPT-B to fix this. the implementation would be
// simpler than on METADAPT-A, but most of the code could just be
// copied over.
writer := &writerB {
parent: this,
method: method,
}
this.currentWriter = writer
return writer, nil
}
func (this *transB) Receive() (uint16, []byte, error) { func (this *transB) Receive() (uint16, []byte, error) {
// get a reader for the next message // get a reader for the next message
method, size, data, err := this.receiveReader() method, size, data, err := this.receiveReader()
@ -105,6 +125,20 @@ func (this *transB) receiveReader() (uint16, int64, io.Reader, error) {
return method, size, data, nil return method, size, data, nil
} }
type writerB struct {
parent *transB
buffer bytes.Buffer
method uint16
}
func (this *writerB) Write(data []byte) (int, error) {
return this.buffer.Write(data)
}
func (this *writerB) Close() error {
return this.parent.Send(this.method, this.buffer.Bytes())
}
// MultiConn represens a multiplexed stream-oriented transport for use in // MultiConn represens a multiplexed stream-oriented transport for use in
// [AdaptB]. // [AdaptB].
type MultiConn interface { type MultiConn interface {