Compare commits
No commits in common. "7a0bf64c17a4dd86e8ad518addda487805b83fe7" and "8a3df95491f47b1f85d5e38b5f78f31e24ccec65" have entirely different histories.
7a0bf64c17
...
8a3df95491
@ -44,12 +44,10 @@ type Trans interface {
|
|||||||
|
|
||||||
// Send sends a message. This method is not safe for concurrent use.
|
// Send sends a message. This method is not safe for concurrent use.
|
||||||
Send(method uint16, data []byte) error
|
Send(method uint16, data []byte) error
|
||||||
// SendWriter sends data written to an [io.Writer]. The writer must be
|
// SendWriter sends data written to an [io.Writer]. Any writer
|
||||||
// closed after use. Closing the writer flushes any data that hasn't
|
// previously opened through this function will be discarded. This
|
||||||
// been written yet. Any writer previously opened through this function
|
// method is not safe for concurrent use, and neither is its result.
|
||||||
// will be discarded. This method is not safe for concurrent use, and
|
SendWriter(method uint16) (io.Writer, error)
|
||||||
// neither is its result.
|
|
||||||
SendWriter(method uint16) (io.WriteCloser, error)
|
|
||||||
// Receive receives a message. This method is not safe for concurrent
|
// Receive receives a message. This method is not safe for concurrent
|
||||||
// use.
|
// use.
|
||||||
Receive() (method uint16, data []byte, err error)
|
Receive() (method uint16, data []byte, err error)
|
||||||
|
76
metadapta.go
76
metadapta.go
@ -9,7 +9,7 @@ import "git.tebibyte.media/sashakoshka/go-util/sync"
|
|||||||
|
|
||||||
const closeMethod = 0xFFFF
|
const closeMethod = 0xFFFF
|
||||||
const int64Max = int64((^uint64(0)) >> 1)
|
const int64Max = int64((^uint64(0)) >> 1)
|
||||||
const defaultChunkSize = 0x1000
|
|
||||||
|
|
||||||
// Party represents a side of a connection.
|
// Party represents a side of a connection.
|
||||||
type Party bool; const (
|
type Party bool; const (
|
||||||
@ -171,8 +171,6 @@ type transA struct {
|
|||||||
id int64
|
id int64
|
||||||
incoming usync.Gate[incomingMessage]
|
incoming usync.Gate[incomingMessage]
|
||||||
currentReader io.Reader
|
currentReader io.Reader
|
||||||
currentWriter io.Closer
|
|
||||||
writeBuffer []byte
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *transA) Close() error {
|
func (this *transA) Close() error {
|
||||||
@ -194,27 +192,6 @@ func (this *transA) Send(method uint16, data []byte) error {
|
|||||||
return this.parent.sendMessageSafe(this.id, method, data)
|
return this.parent.sendMessageSafe(this.id, method, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *transA) SendWriter(method uint16) (io.WriteCloser, error) {
|
|
||||||
// close previous writer if necessary
|
|
||||||
if this.currentWriter != nil {
|
|
||||||
this.currentWriter.Close()
|
|
||||||
this.currentWriter = nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// create new writer
|
|
||||||
writer := &writerA {
|
|
||||||
parent: this,
|
|
||||||
// there is only ever one writer at a time, so they can all
|
|
||||||
// share a buffer
|
|
||||||
buffer: this.writeBuffer[:0],
|
|
||||||
method: method,
|
|
||||||
chunkSize: defaultChunkSize,
|
|
||||||
open: true,
|
|
||||||
}
|
|
||||||
this.currentWriter = writer
|
|
||||||
return writer, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (this *transA) Receive() (method uint16, data []byte, err error) {
|
func (this *transA) Receive() (method uint16, data []byte, err error) {
|
||||||
method, reader, err := this.ReceiveReader()
|
method, reader, err := this.ReceiveReader()
|
||||||
if err != nil { return 0, nil, err }
|
if err != nil { return 0, nil, err }
|
||||||
@ -286,57 +263,6 @@ func (this *readerA) Read(buffer []byte) (int, error) {
|
|||||||
return copied, nil
|
return copied, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type writerA struct {
|
|
||||||
parent *transA
|
|
||||||
buffer []byte
|
|
||||||
method uint16
|
|
||||||
chunkSize int64
|
|
||||||
open bool
|
|
||||||
}
|
|
||||||
|
|
||||||
func (this *writerA) Write(data []byte) (n int, err error) {
|
|
||||||
if !this.open { return 0, io.EOF }
|
|
||||||
toSend := data
|
|
||||||
for len(toSend) > 0 {
|
|
||||||
nn, err := this.writeOne(toSend)
|
|
||||||
n += nn
|
|
||||||
toSend = toSend[nn:]
|
|
||||||
if err != nil { return n, err }
|
|
||||||
}
|
|
||||||
return n, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (this *writerA) Close() error {
|
|
||||||
this.open = false
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (this *writerA) writeOne(data []byte) (n int, err error) {
|
|
||||||
data = data[:min(len(data), int(this.chunkSize))]
|
|
||||||
|
|
||||||
// if there is more room, append to the buffer and exit
|
|
||||||
if int64(len(this.buffer) + len(data)) <= this.chunkSize {
|
|
||||||
this.buffer = append(this.buffer, data...)
|
|
||||||
n = len(data)
|
|
||||||
// if have a full chunk, flush
|
|
||||||
if int64(len(this.buffer)) == this.chunkSize {
|
|
||||||
err = this.flush()
|
|
||||||
if err != nil { return n, err }
|
|
||||||
}
|
|
||||||
return n, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// if not, flush and store as much as we can in the buffer
|
|
||||||
err = this.flush()
|
|
||||||
if err != nil { return n, err }
|
|
||||||
this.buffer = append(this.buffer, data...)
|
|
||||||
return n, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (this *writerA) flush() error {
|
|
||||||
return this.parent.parent.sendMessageSafe(this.parent.id, this.method, this.buffer)
|
|
||||||
}
|
|
||||||
|
|
||||||
type incomingMessage struct {
|
type incomingMessage struct {
|
||||||
method uint16
|
method uint16
|
||||||
chunked bool
|
chunked bool
|
||||||
|
40
metadaptb.go
40
metadaptb.go
@ -2,7 +2,6 @@ package hopp
|
|||||||
|
|
||||||
import "io"
|
import "io"
|
||||||
import "net"
|
import "net"
|
||||||
import "bytes"
|
|
||||||
import "errors"
|
import "errors"
|
||||||
import "context"
|
import "context"
|
||||||
import "git.tebibyte.media/sashakoshka/hopp/tape"
|
import "git.tebibyte.media/sashakoshka/hopp/tape"
|
||||||
@ -59,10 +58,9 @@ func (this *b) newTrans(underlying Stream) *transB {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type transB struct {
|
type transB struct {
|
||||||
sizeLimit int64
|
sizeLimit int64
|
||||||
underlying Stream
|
underlying Stream
|
||||||
currentData io.Reader
|
currentData io.Reader
|
||||||
currentWriter *writerB
|
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *transB) Close() error {
|
func (this *transB) Close() error {
|
||||||
@ -77,24 +75,6 @@ func (this *transB) Send(method uint16, data []byte) error {
|
|||||||
return encodeMessageB(this.underlying, this.sizeLimit, method, data)
|
return encodeMessageB(this.underlying, this.sizeLimit, method, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *transB) SendWriter(method uint16) (io.WriteCloser, error) {
|
|
||||||
if this.currentWriter != nil {
|
|
||||||
this.currentWriter.Close()
|
|
||||||
}
|
|
||||||
// TODO: come up with a fix that allows us to pipe data through the
|
|
||||||
// writer. as of now, it just reads whatever is written into a buffer
|
|
||||||
// and sends the message on close. we should probably introduce chunked
|
|
||||||
// encoding to METADAPT-B to fix this. the implementation would be
|
|
||||||
// simpler than on METADAPT-A, but most of the code could just be
|
|
||||||
// copied over.
|
|
||||||
writer := &writerB {
|
|
||||||
parent: this,
|
|
||||||
method: method,
|
|
||||||
}
|
|
||||||
this.currentWriter = writer
|
|
||||||
return writer, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
func (this *transB) Receive() (uint16, []byte, error) {
|
func (this *transB) Receive() (uint16, []byte, error) {
|
||||||
// get a reader for the next message
|
// get a reader for the next message
|
||||||
method, size, data, err := this.receiveReader()
|
method, size, data, err := this.receiveReader()
|
||||||
@ -125,20 +105,6 @@ func (this *transB) receiveReader() (uint16, int64, io.Reader, error) {
|
|||||||
return method, size, data, nil
|
return method, size, data, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
type writerB struct {
|
|
||||||
parent *transB
|
|
||||||
buffer bytes.Buffer
|
|
||||||
method uint16
|
|
||||||
}
|
|
||||||
|
|
||||||
func (this *writerB) Write(data []byte) (int, error) {
|
|
||||||
return this.buffer.Write(data)
|
|
||||||
}
|
|
||||||
|
|
||||||
func (this *writerB) Close() error {
|
|
||||||
return this.parent.Send(this.method, this.buffer.Bytes())
|
|
||||||
}
|
|
||||||
|
|
||||||
// MultiConn represens a multiplexed stream-oriented transport for use in
|
// MultiConn represens a multiplexed stream-oriented transport for use in
|
||||||
// [AdaptB].
|
// [AdaptB].
|
||||||
type MultiConn interface {
|
type MultiConn interface {
|
||||||
|
Loading…
Reference in New Issue
Block a user