Compare commits

...

33 Commits

Author SHA1 Message Date
2fdf7d490d Remove unneeded code 2025-05-14 13:52:03 -04:00
d60beccbcd Finally fix A... this took too long 2025-05-14 13:44:06 -04:00
23c37c3d1f Fix transaction ID counting 2025-04-25 19:57:33 -04:00
a83aedc128 Break METADAPT-A client/server environment from TestConnA 2025-04-25 18:38:01 -04:00
c0bfcc02f7 Send a close message when METADAPT-A transactions close 2025-04-25 18:19:43 -04:00
7a0bf64c17 Implement SendWriter for METADAPT-B 2025-04-25 18:15:38 -04:00
9d2bbec7f9 Update METADAPT-A implementation 2025-04-25 18:14:47 -04:00
dd89245c34 Change the result of Trans.SendWriter to a WriteCloser 2025-04-25 18:06:00 -04:00
41f5cfefab Implement SendWriter for METADAPT-A 2025-04-25 17:53:12 -04:00
8a3df95491 Clarify concurrency in Trans methods 2025-04-25 16:06:17 -04:00
c51a81bc13 Add a SendWriter method to Trans 2025-04-25 16:02:23 -04:00
47645a8fce Pass TestDecodeMessageBErr 2025-04-25 15:26:12 -04:00
87c4ac8efb More robust integer comparison 2025-04-25 15:21:52 -04:00
f6fe9c307d This should have been in the last commit 2025-04-25 15:17:32 -04:00
9bf0c596ba Make TestEncodeMessageAErr pass 2025-04-25 15:12:01 -04:00
86cf3ee89d Make the TestConnA pass 2025-04-25 15:08:31 -04:00
8fe3ba8d4f Close METADAPT-A transaction channel 2025-04-25 15:07:47 -04:00
cbaff8b593 Allow readerA.pull to return an actual result 2025-04-25 14:15:53 -04:00
46c6361602 Encode METADAPT-A MMBs properly lmao 2025-04-22 20:10:57 -04:00
fac0c4e31d Actually use defaultSizeLimit 2025-04-22 20:03:40 -04:00
945d81c505 METADAPT-B tests run 2025-04-21 20:51:02 -04:00
f34620c434 METADAPT-A tests run 2025-04-21 20:50:33 -04:00
7a766b74d8 Name return values of decodeMessageA 2025-04-21 20:49:58 -04:00
6de3cbbc48 Fix method signature of SetSizeLimit 2025-04-21 19:10:45 -04:00
e4f13a4142 WIP METADAPT-A changes 2025-04-06 17:01:00 -04:00
db10355c84 Change the size limit type to an int64 2025-04-06 14:19:39 -04:00
f4f8039fa0 Support getting a reader for a message in METADAPT-B 2025-04-06 14:17:39 -04:00
fe8f2fc3ea Do not require METADAPT to return a message length when getting a reader 2025-04-06 11:25:28 -04:00
b07cdf088a design: State support for TCP/TLS 2025-04-06 11:25:12 -04:00
4eae69dc94 Add ReceiveReader to Transaction interface 2025-04-05 21:08:41 -04:00
5c28510342 Add new METADAPT protocol specifications from #2 2025-04-05 21:04:45 -04:00
1ac0ed51c7 METADAPT-B supports setting a message length limit 2025-04-04 16:07:20 -04:00
174634a330 METADAPT-B can now send very large messages 2025-04-04 15:42:15 -04:00
6 changed files with 620 additions and 155 deletions

View File

@ -1,8 +1,11 @@
package hopp
import "io"
import "net"
// import "time"
const defaultSizeLimit int64 = 1024 * 1024 // 1 megabyte
// Conn is a HOPP connection.
type Conn interface {
// Close closes the connection. Any blocked operations on the connection
@ -19,22 +22,39 @@ type Conn interface {
// AcceptTrans accepts a transaction from the other party. This must
// be called in a loop to avoid the connection locking up.
AcceptTrans() (Trans, error)
// SetSizeLimit sets a limit (in bytes) for how large messages can be.
// By default, this limit is 1 megabyte.
SetSizeLimit(limit int64)
}
// Trans is a HOPP transaction.
// Trans is a HOPP transaction. Methods of this interface are not safe for
// concurrent use with the exception of the Close and ID methods. The
// recommended use case is one goroutine per transaction.
type Trans interface {
// Close closes the transaction. Any blocked operations will be
// unblocked and return errors.
// unblocked and return errors. This method is safe for concurrent use.
Close() error
// ID returns the transaction ID. This must not change, and it must be
// unique within the connection.
// unique within the connection. This method is safe for concurrent use.
ID() int64
// TODO: add methods for setting send and receive deadlines
// Send sends a message.
// Send sends a message. This method is not safe for concurrent use.
Send(method uint16, data []byte) error
// Receive receives a message.
// SendWriter sends data written to an [io.Writer]. The writer must be
// closed after use. Closing the writer flushes any data that hasn't
// been written yet. Any writer previously opened through this function
// will be discarded. This method is not safe for concurrent use, and
// neither is its result.
SendWriter(method uint16) (io.WriteCloser, error)
// Receive receives a message. This method is not safe for concurrent
// use.
Receive() (method uint16, data []byte, err error)
// ReceiveReader receives a message as an [io.Reader]. Any reader
// previously opened through this function will be discarded. This
// method is not safe for concurrent use, and neither is its result.
ReceiveReader() (method uint16, data io.Reader, err error)
}

View File

@ -123,14 +123,16 @@ be of the same size.
## Transports
A transport is a protocol that HOPP connections can run on top of. HOPP
currently supports the QUIC transport protocol for communicating between
machines, and UNIX domain sockets for quicker communication among applications
on the same machine. Both protocols are supported through METADAPT.
machines, TCP/TLS for legacy systems that do not support QUIC, and UNIX domain
sockets for faster communication among applications on the same machine. Both
protocols are supported through METADAPT.
## Message and Transaction Demarcation Protocol (METADAPT)
The Message and Transaction Demarcation Protocol is used to break one or more
reliable data streams into transactions, which are broken down further into
messages. A message, as well as its associated metadata (length, transaction,
method, etc.) together is referred to as METADAPT Message Block (MMB).
messages. The representation of a message (or a part thereof) on the protocol,
including its associated metadata (length, transaction, method, etc.) is
referred to as METADAPT Message Block (MMB).
For transports that offer multiple multiplexed data streams that can be created
and destroyed on-demand (such as QUIC) each stream is used as a transaction. If
@ -145,8 +147,12 @@ METADAPT-A requires a transport which offers a single full-duplex data stream
that persists for the duration of the connection. All transactions are
multiplexed onto this single stream. Each MMB contains a 12-octet long header,
with the transaction ID, then the method, and then the payload size (in octets).
The transaction ID is encoded as an I64, and the method and payload size are
both encoded as U16s. The remainder of the message is the payload. Since each
The transaction ID is encoded as an I64, the method is encoded as a U16 and the
and payload size is encoded as a U64. Only the 63 least significant bits of the
payload size describe the actual size, the most significant bit controlling
chunking. See the section on chunking for more information.
The remainder of the message is the payload. Since each
MMB is self-describing, they are sent sequentially with no gaps in-between them.
Transactions "open" when the first message with a given transaction ID is sent.
@ -162,13 +168,25 @@ used up, the connection must fail. Don't worry about this though, because the
sun will have expanded to swallow earth by then. Your connection will not last
that long.
#### Message Chunking
The most significant bit of the payload size field of an MMB is called the Chunk
Control Bit (CCB). If the CCB of a given MMB is zero, the represented message is
interpreted as being self-contained and the data is processed immediately. If
the CCB is one, the message is interpreted as being chunked, with the data of
the current MMB being the first chunk. The data of further MMBs sent along the
transaction will be appended to the message until an MMB is read with a zero
CCB, in which case the MMB will be the last chunk and any more MMBs will be
interpreted as normal.
### METADAPT-B
METADAPT-B requires a transport which offers multiple multiplexed full-duplex
data streams per connection that can be created and destroyed on-demand. Each
data stream is used as an individual transaction. Each MMB contains a 4-octet
long header with the method and then the payload size (in octets) both encoded
as U16s. The remainder of the message is the payload. Since each MMB is
self-describing, they are sent sequentially with no gaps in-between them.
long header with the method and then the payload size (in octets) encoded as a
U16 and U64 respectively. The remainder of the message is the payload. Since
each MMB is self-describing, they are sent sequentially with no gaps in-between
them.
The ID of any transaction will reflect the ID of its corresponding stream. The
lifetime of the transaction is tied to the lifetime of the stream, that is to

View File

@ -4,11 +4,16 @@ import "io"
import "fmt"
import "net"
import "sync"
import "sync/atomic"
import "git.tebibyte.media/sashakoshka/hopp/tape"
import "git.tebibyte.media/sashakoshka/go-util/sync"
// TODO investigate why 30 never reaches the server, causing it to wait for ever
// and never close the connection, causing the client to also wait forever
const closeMethod = 0xFFFF
const int64Max = int64((^uint64(0)) >> 1)
const defaultChunkSize = 0x1000
// Party represents a side of a connection.
type Party bool; const (
@ -16,7 +21,16 @@ type Party bool; const (
ClientSide Party = true
)
func (party Party) String() string {
if party == ServerSide {
return "server"
} else {
return "client"
}
}
type a struct {
sizeLimit int64
underlying net.Conn
party Party
transID int64
@ -32,6 +46,7 @@ type a struct {
// oriented transport such as TCP or UNIX domain stream sockets.
func AdaptA(underlying net.Conn, party Party) Conn {
conn := &a {
sizeLimit: defaultSizeLimit,
underlying: underlying,
party: party,
transMap: make(map[int64] *transA),
@ -49,7 +64,7 @@ func AdaptA(underlying net.Conn, party Party) Conn {
func (this *a) Close() error {
close(this.done)
return this.underlying.Close()
return nil
}
func (this *a) LocalAddr() net.Addr {
@ -63,30 +78,41 @@ func (this *a) RemoteAddr() net.Addr {
func (this *a) OpenTrans() (Trans, error) {
this.transLock.Lock()
defer this.transLock.Unlock()
if this.transID == int64Max {
return nil, fmt.Errorf("could not open transaction: %w", ErrIntegerOverflow)
}
id := this.transID
this.transID ++
trans := &transA {
parent: this,
id: id,
incoming: usync.NewGate[incomingMessage](),
}
this.transMap[id] = trans
if this.transID == int64Max {
return nil, fmt.Errorf("could not open transaction: %w", ErrIntegerOverflow)
if this.party == ClientSide {
this.transID ++
} else {
this.transID --
}
this.transID ++
return trans, nil
}
func (this *a) AcceptTrans() (Trans, error) {
eof := fmt.Errorf("could not accept transaction: %w", io.EOF)
select {
case trans := <- this.transChan:
if trans == nil {
return nil, eof
}
return trans, nil
case <- this.done:
return nil, fmt.Errorf("could not accept transaction: %w", io.EOF)
return nil, eof
}
}
func (this *a) SetSizeLimit(limit int64) {
this.sizeLimit = limit
}
func (this *a) unlistTransactionSafe(id int64) {
this.transLock.Lock()
defer this.transLock.Unlock()
@ -96,27 +122,32 @@ func (this *a) unlistTransactionSafe(id int64) {
func (this *a) sendMessageSafe(trans int64, method uint16, data []byte) error {
this.sendLock.Lock()
defer this.sendLock.Unlock()
return encodeMessageA(this.underlying, trans, method, data)
return encodeMessageA(this.underlying, this.sizeLimit, trans, method, data)
}
func (this *a) receive() {
defer func() {
this.underlying.Close()
close(this.transChan)
this.transLock.Lock()
defer this.transLock.Unlock()
for _, trans := range this.transMap {
trans.closeDontUnlist()
}
clear(this.transMap)
this.underlying.Close()
}()
// receive MMBs in a loop and forward them to transactions until shit
// starts closing
for {
transID, method, payload, err := decodeMessageA(this.underlying)
transID, method, chunked, payload, err := decodeMessageA(this.underlying, this.sizeLimit)
if err != nil {
this.err = fmt.Errorf("could not receive message: %w", err)
return
}
err = this.receiveMultiplex(transID, method, payload)
err = this.multiplexMMB(transID, method, chunked, payload)
if err != nil {
this.err = fmt.Errorf("could not receive message: %w", err)
return
@ -124,7 +155,7 @@ func (this *a) receive() {
}
}
func (this *a) receiveMultiplex(transID int64, method uint16, payload []byte) error {
func (this *a) multiplexMMB(transID int64, method uint16, chunked bool, payload []byte) error {
if transID == 0 { return ErrMessageMalformed }
trans, err := func() (*transA, error) {
@ -133,6 +164,12 @@ func (this *a) receiveMultiplex(transID int64, method uint16, payload []byte) er
trans, ok := this.transMap[transID]
if !ok {
// check if this is a superfluous close message and just
// do nothing if so
if method == closeMethod {
return nil, nil
}
// it is forbidden for the other party to initiate a transaction
// with an ID from this party
if this.party == partyFromTransID(transID) {
@ -150,28 +187,49 @@ func (this *a) receiveMultiplex(transID int64, method uint16, payload []byte) er
}()
if err != nil { return err }
trans.incoming.Send(incomingMessage {
method: method,
payload: payload,
})
if trans == nil {
return nil
}
if method == closeMethod {
return trans.Close()
} else {
trans.incoming.Send(incomingMessage {
method: method,
chunked: chunked,
payload: payload,
})
}
return nil
}
// most methods in transA don't need to be goroutine safe except those marked
// as such
type transA struct {
parent *a
id int64
incoming usync.Gate[incomingMessage]
parent *a
id int64
incoming usync.Gate[incomingMessage]
currentReader io.Reader
currentWriter io.Closer
writeBuffer []byte
closed atomic.Bool
}
func (this *transA) Close() error {
// MUST be goroutine safe
err := this.closeDontUnlist()
this.parent.unlistTransactionSafe(this.ID())
return err
}
func (this *transA) closeDontUnlist() error {
this.Send(closeMethod, nil)
return this.incoming.Close()
func (this *transA) closeDontUnlist() (err error) {
// MUST be goroutine safe
this.incoming.Close()
if !this.closed.Load() {
err = this.Send(closeMethod, nil)
}
this.closed.Store(true)
return err
}
func (this *transA) ID() int64 {
@ -182,58 +240,213 @@ func (this *transA) Send(method uint16, data []byte) error {
return this.parent.sendMessageSafe(this.id, method, data)
}
func (this *transA) SendWriter(method uint16) (io.WriteCloser, error) {
// close previous writer if necessary
if this.currentWriter != nil {
this.currentWriter.Close()
this.currentWriter = nil
}
// create new writer
writer := &writerA {
parent: this,
// there is only ever one writer at a time, so they can all
// share a buffer
buffer: this.writeBuffer[:0],
method: method,
chunkSize: defaultChunkSize,
open: true,
}
this.currentWriter = writer
return writer, nil
}
func (this *transA) Receive() (method uint16, data []byte, err error) {
receive := this.incoming.Receive()
method, reader, err := this.ReceiveReader()
if err != nil { return 0, nil, err }
data, err = io.ReadAll(reader)
if err != nil { return 0, nil, err }
return method, data, nil
}
func (this *transA) ReceiveReader() (uint16, io.Reader, error) {
// if the transaction has been closed, return an io.EOF
if this.closed.Load() {
return 0, nil, io.EOF
}
// drain previous reader if necessary
if this.currentReader != nil {
io.Copy(io.Discard, this.currentReader)
}
// create new reader
reader := &readerA {
parent: this,
}
method, err := reader.pull()
if err != nil { return 0, nil, err}
this.currentReader = reader
return method, reader, nil
}
type readerA struct {
parent *transA
leftover []byte
eof bool
}
// pull pulls the next MMB in this message from the transaction.
func (this *readerA) pull() (uint16, error) {
// if the previous message ended the chain, return an io.EOF
if this.eof {
return 0, io.EOF
}
// get an MMB from the transaction we are a part of
receive := this.parent.incoming.Receive()
if receive != nil {
if message, ok := <- receive; ok {
if message.method != closeMethod {
return message.method, message.payload, nil
this.leftover = append(this.leftover, message.payload...)
if !message.chunked {
this.eof = true
}
return message.method, nil
}
}
}
// close and return error on failure
this.Close()
if this.parent.err == nil {
return 0, nil, fmt.Errorf("could not receive message: %w", io.EOF)
this.eof = true
this.parent.Close()
if this.parent.parent.err == nil {
return 0, fmt.Errorf("could not receive message: %w", io.EOF)
} else {
return 0, nil, this.parent.err
return 0, this.parent.parent.err
}
}
func (this *readerA) Read(buffer []byte) (int, error) {
if len(this.leftover) == 0 {
if this.eof { return 0, io.EOF }
this.pull()
}
copied := copy(buffer, this.leftover)
this.leftover = this.leftover[copied:]
return copied, nil
}
type writerA struct {
parent *transA
buffer []byte
method uint16
chunkSize int64
open bool
}
func (this *writerA) Write(data []byte) (n int, err error) {
if !this.open { return 0, io.EOF }
toSend := data
for len(toSend) > 0 {
nn, err := this.writeOne(toSend)
n += nn
toSend = toSend[nn:]
if err != nil { return n, err }
}
return n, nil
}
func (this *writerA) Close() error {
this.open = false
return nil
}
func (this *writerA) writeOne(data []byte) (n int, err error) {
data = data[:min(len(data), int(this.chunkSize))]
// if there is more room, append to the buffer and exit
if int64(len(this.buffer) + len(data)) <= this.chunkSize {
this.buffer = append(this.buffer, data...)
n = len(data)
// if have a full chunk, flush
if int64(len(this.buffer)) == this.chunkSize {
err = this.flush()
if err != nil { return n, err }
}
return n, nil
}
// if not, flush and store as much as we can in the buffer
err = this.flush()
if err != nil { return n, err }
this.buffer = append(this.buffer, data...)
return n, nil
}
func (this *writerA) flush() error {
return this.parent.parent.sendMessageSafe(this.parent.id, this.method, this.buffer)
}
type incomingMessage struct {
method uint16
chunked bool
payload []byte
}
func encodeMessageA(writer io.Writer, trans int64, method uint16, data []byte) error {
buffer := make([]byte, 12 + len(data))
func encodeMessageA(
writer io.Writer,
sizeLimit int64,
trans int64,
method uint16,
data []byte,
) error {
if int64(len(data)) > sizeLimit {
return ErrPayloadTooLarge
}
buffer := make([]byte, 18 + len(data))
tape.EncodeI64(buffer[:8], trans)
tape.EncodeI16(buffer[8:10], method)
length, ok := tape.U16CastSafe(len(data))
if !ok { return ErrPayloadTooLarge }
tape.EncodeI16(buffer[10:12], length)
copy(buffer[12:], data)
tape.EncodeI64(buffer[10:18], uint64(len(data)))
copy(buffer[18:], data)
_, err := writer.Write(buffer)
return err
}
func decodeMessageA(reader io.Reader) (int64, uint16, []byte, error) {
headerBuffer := [12]byte { }
_, err := io.ReadFull(reader, headerBuffer[:])
if err != nil { return 0, 0, nil, err }
transID, err := tape.DecodeI64[int64](headerBuffer[:8])
if err != nil { return 0, 0, nil, err }
method, err := tape.DecodeI16[uint16](headerBuffer[8:10])
if err != nil { return 0, 0, nil, err }
length, err := tape.DecodeI16[uint16](headerBuffer[10:12])
if err != nil { return 0, 0, nil, err }
payloadBuffer := make([]byte, int(length))
func decodeMessageA(
reader io.Reader,
sizeLimit int64,
) (
transID int64,
method uint16,
chunked bool,
payloadBuffer []byte,
err error,
) {
headerBuffer := [18]byte { }
_, err = io.ReadFull(reader, headerBuffer[:])
if err != nil { return 0, 0, false, nil, err }
transID, err = tape.DecodeI64[int64](headerBuffer[:8])
if err != nil { return 0, 0, false, nil, err }
method, err = tape.DecodeI16[uint16](headerBuffer[8:10])
if err != nil { return 0, 0, false, nil, err }
size, err := tape.DecodeI64[uint64](headerBuffer[10:18])
if err != nil { return 0, 0, false, nil, err }
chunked, size = splitCCBSize(size)
if size > uint64(sizeLimit) {
return 0, 0, false, nil, ErrPayloadTooLarge
}
payloadBuffer = make([]byte, int(size))
_, err = io.ReadFull(reader, payloadBuffer)
if err != nil { return 0, 0, nil, err }
return transID, method, payloadBuffer, nil
if err != nil { return 0, 0, false, nil, err }
return transID, method, chunked, payloadBuffer, nil
}
func partyFromTransID(id int64) Party {
return id > 0
}
func splitCCBSize(size uint64) (bool, uint64) {
return size >> 63 > 1, size & 0x7FFFFFFFFFFFFFFF
}

View File

@ -24,71 +24,132 @@ func TestConnA(test *testing.T) {
"world",
"When the impostor is sus!",
}
network := "tcp"
addr := "localhost:7959"
// server
listener, err := net.Listen(network, addr)
if err != nil { test.Fatal(err) }
defer listener.Close()
go func() {
test.Log("SERVER listening")
conn, err := listener.Accept()
if err != nil { test.Error("SERVER", err); return }
defer conn.Close()
a := AdaptA(conn, ServerSide)
clientFunc := func(a Conn) {
test.Log("CLIENT accepting transaction")
trans, err := a.AcceptTrans()
if err != nil { test.Fatal("CLIENT", err) }
test.Log("CLIENT accepted transaction")
test.Cleanup(func() { trans.Close() })
for method, payload := range payloads {
test.Log("CLIENT waiting...")
gotMethod, gotPayloadBytes, err := trans.Receive()
if err != nil { test.Fatal("CLIENT", err) }
gotPayload := string(gotPayloadBytes)
test.Log("CLIENT m:", gotMethod, "p:", gotPayload)
if int(gotMethod) != method {
test.Errorf("CLIENT method not equal")
}
if gotPayload != payload {
test.Errorf("CLIENT payload not equal")
}
}
test.Log("CLIENT waiting for transaction close...")
gotMethod, gotPayload, err := trans.Receive()
if !errors.Is(err, io.EOF) {
test.Error("CLIENT wrong error:", err)
test.Error("CLIENT method:", gotMethod)
test.Error("CLIENT payload:", gotPayload)
test.Fatal("CLIENT ok byeeeeeeeeeeeee")
}
}
serverFunc := func(a Conn) {
trans, err := a.OpenTrans()
if err != nil { test.Error("SERVER", err); return }
defer trans.Close()
test.Cleanup(func() { trans.Close() })
for method, payload := range payloads {
test.Log("SERVER", method, payload)
test.Log("SERVER m:", method, "p:", payload)
err := trans.Send(uint16(method), []byte(payload))
if err != nil { test.Error("SERVER", err); return }
}
}()
test.Log("SERVER closing connection")
}
// client
test.Log("CLIENT dialing")
conn, err := net.Dial(network, addr)
if err != nil { test.Fatal("CLIENT", err) }
test.Log("CLIENT dialed")
a := AdaptA(conn, ClientSide)
defer a.Close()
test.Log("CLIENT accepting transaction")
trans, err := a.AcceptTrans()
if err != nil { test.Fatal("CLIENT", err) }
test.Log("CLIENT accepted transaction")
defer trans.Close()
for method, payload := range payloads {
test.Log("CLIENT waiting...")
gotMethod, gotPayloadBytes, err := trans.Receive()
if err != nil { test.Fatal("CLIENT", err) }
gotPayload := string(gotPayloadBytes)
test.Log("CLIENT", gotMethod, gotPayload)
if int(gotMethod) != method {
test.Errorf("CLIENT method not equal")
}
if gotPayload != payload {
test.Errorf("CLIENT payload not equal")
}
clientServerEnvironment(test, clientFunc, serverFunc)
}
func TestTransOpenCloseA(test *testing.T) {
// currently:
//
// | data sent | data recvd | close sent | close recvd
// 10 | X | X | X | server hangs
// 20 | X | X | X | client hangs
// 30 | X | | X |
//
// when a close message is recvd, it tries to push to the trans and
// hangs on trans.incoming.Send, which hangs on sending the value to the
// underlying channel. why is this?
//
// check if we are really getting values from the channel when pulling
// from the trans channel when we are expecting a close.
clientFunc := func(conn Conn) {
// 10
trans, err := conn.OpenTrans()
if err != nil { test.Error("CLIENT", err); return }
test.Log("CLIENT sending 10")
trans.Send(10, []byte("hi"))
trans.Close()
// 20
test.Log("CLIENT awaiting 20")
trans, err = conn.AcceptTrans()
if err != nil { test.Error("CLIENT", err); return }
test.Cleanup(func() { trans.Close() })
gotMethod, gotPayload, err := trans.Receive()
if err != nil { test.Error("CLIENT", err); return }
test.Logf("CLIENT m: %d p: %s", gotMethod, gotPayload)
if gotMethod != 20 { test.Error("CLIENT wrong method")}
// 30
trans, err = conn.OpenTrans()
if err != nil { test.Error("CLIENT", err); return }
test.Log("CLIENT sending 30")
trans.Send(30, []byte("good"))
trans.Close()
}
_, _, err = trans.Receive()
if !errors.Is(err, io.EOF) {
test.Fatal("CLIENT wrong error:", err)
serverFunc := func(conn Conn) {
// 10
test.Log("SERVER awaiting 10")
trans, err := conn.AcceptTrans()
if err != nil { test.Error("SERVER", err); return }
test.Cleanup(func() { trans.Close() })
gotMethod, gotPayload, err := trans.Receive()
if err != nil { test.Error("SERVER", err); return }
test.Logf("SERVER m: %d p: %s", gotMethod, gotPayload)
if gotMethod != 10 { test.Error("SERVER wrong method")}
// 20
trans, err = conn.OpenTrans()
if err != nil { test.Error("SERVER", err); return }
test.Log("SERVER sending 20")
trans.Send(20, []byte("hi how r u"))
trans.Close()
// 30
test.Log("SERVER awaiting 30")
trans, err = conn.AcceptTrans()
if err != nil { test.Error("SERVER", err); return }
test.Cleanup(func() { trans.Close() })
gotMethod, gotPayload, err = trans.Receive()
if err != nil { test.Error("SERVER", err); return }
test.Logf("SERVER m: %d p: %s", gotMethod, gotPayload)
if gotMethod != 30 { test.Error("SERVER wrong method")}
}
test.Log("CLIENT done")
// TODO test error from trans/connection closed by other side
clientServerEnvironment(test, clientFunc, serverFunc)
}
func TestEncodeMessageA(test *testing.T) {
buffer := new(bytes.Buffer)
payload := []byte { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05 }
err := encodeMessageA(buffer, 0x5800FEABC3104F04, 0x6B12, payload)
err := encodeMessageA(buffer, defaultSizeLimit, 0x5800FEABC3104F04, 0x6B12, payload)
correct := []byte {
0x58, 0x00, 0xFE, 0xAB, 0xC3, 0x10, 0x4F, 0x04,
0x6B, 0x12,
0x00, 0x06,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06,
0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
}
if err != nil {
@ -102,19 +163,19 @@ func TestEncodeMessageA(test *testing.T) {
func TestEncodeMessageAErr(test *testing.T) {
buffer := new(bytes.Buffer)
payload := make([]byte, 0x10000)
err := encodeMessageA(buffer, 0x5800FEABC3104F04, 0x6B12, payload)
err := encodeMessageA(buffer, 0x20, 0x5800FEABC3104F04, 0x6B12, payload)
if !errors.Is(err, ErrPayloadTooLarge) {
test.Fatalf("wrong error: %v", err)
}
}
func TestDecodeMessageA(test *testing.T) {
transID, method, payload, err := decodeMessageA(bytes.NewReader([]byte {
transID, method, _, payload, err := decodeMessageA(bytes.NewReader([]byte {
0x58, 0x00, 0xFE, 0xAB, 0xC3, 0x10, 0x4F, 0x04,
0x6B, 0x12,
0x00, 0x06,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06,
0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
}))
}), defaultSizeLimit)
if err != nil {
test.Fatal(err)
}
@ -131,13 +192,76 @@ func TestDecodeMessageA(test *testing.T) {
}
func TestDecodeMessageAErr(test *testing.T) {
_, _, _, err := decodeMessageA(bytes.NewReader([]byte {
_, _, _, _, err := decodeMessageA(bytes.NewReader([]byte {
0x58, 0x00, 0xFE, 0xAB, 0xC3, 0x10, 0x4F, 0x04,
0x6B, 0x12,
0x01, 0x06,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x06,
0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
}))
}), defaultSizeLimit)
if !errors.Is(err, io.ErrUnexpectedEOF) {
test.Fatalf("wrong error: %v", err)
}
}
func TestEncodeDecodeMessageA(test *testing.T) {
correctTransID := int64(2)
correctMethod := uint16(30)
correctPayload := []byte("good")
buffer := bytes.Buffer { }
err := encodeMessageA(&buffer, defaultSizeLimit, correctTransID, correctMethod, correctPayload)
if err != nil { test.Fatal(err) }
transID, method, chunked, payload, err := decodeMessageA(&buffer, defaultSizeLimit)
if got, correct := transID, int64(2); got != correct {
test.Fatalf("not equal: %v %v", got, correct)
}
if got, correct := method, uint16(30); got != correct {
test.Fatalf("not equal: %v %v", got, correct)
}
if chunked {
test.Fatalf("message should not be chunked")
}
if got, correct := payload, correctPayload; !slices.Equal(got, correct) {
test.Fatalf("not equal: %v %v", got, correct)
}
}
func clientServerEnvironment(test *testing.T, clientFunc func(conn Conn), serverFunc func(conn Conn)) {
network := "tcp"
addr := "localhost:7959"
// server
listener, err := net.Listen(network, addr)
if err != nil { test.Fatal(err) }
test.Cleanup(func() { listener.Close() })
go func() {
test.Log("SERVER listening")
conn, err := listener.Accept()
if err != nil { test.Error("SERVER", err); return }
defer conn.Close()
test.Cleanup(func() { conn.Close() })
a := AdaptA(conn, ServerSide)
test.Cleanup(func() { a.Close() })
serverFunc(a)
test.Log("SERVER closing")
}()
// client
test.Log("CLIENT dialing")
conn, err := net.Dial(network, addr)
if err != nil { test.Fatal("CLIENT", err) }
test.Log("CLIENT dialed")
a := AdaptA(conn, ClientSide)
test.Cleanup(func() { a.Close() })
clientFunc(a)
test.Log("CLIENT waiting for connection close...")
trans, err := a.AcceptTrans()
if !errors.Is(err, io.EOF) {
test.Error("CLIENT wrong error:", err)
test.Fatal("CLIENT trans:", trans)
}
test.Log("CLIENT DONE")
conn.Close()
}

View File

@ -2,19 +2,23 @@ package hopp
import "io"
import "net"
import "bytes"
import "errors"
import "context"
import "git.tebibyte.media/sashakoshka/hopp/tape"
// B implements METADAPT-B over a multiplexed stream-oriented transport such as
// QUIC.
type b struct {
sizeLimit int64
underlying MultiConn
}
// AdaptB returns a connection implementing METADAPT-B over a singular stream-
// oriented transport such as TCP or UNIX domain stream sockets.
// AdaptB returns a connection implementing METADAPT-B over a multiplexed
// stream-oriented transport such as QUIC.
func AdaptB(underlying MultiConn) Conn {
return &b {
sizeLimit: defaultSizeLimit,
underlying: underlying,
}
}
@ -34,33 +38,105 @@ func (this *b) RemoteAddr() net.Addr {
func (this *b) OpenTrans() (Trans, error) {
stream, err := this.underlying.OpenStream()
if err != nil { return nil, err }
return transB { underlying: stream }, nil
return this.newTrans(stream), nil
}
func (this *b) AcceptTrans() (Trans, error) {
stream, err := this.underlying.AcceptStream(context.Background())
if err != nil { return nil, err }
return transB { underlying: stream }, nil
return this.newTrans(stream), nil
}
func (this *b) SetSizeLimit(limit int64) {
this.sizeLimit = limit
}
func (this *b) newTrans(underlying Stream) *transB {
return &transB {
sizeLimit: this.sizeLimit,
underlying: underlying,
}
}
type transB struct {
underlying Stream
sizeLimit int64
underlying Stream
currentData io.Reader
currentWriter *writerB
}
func (trans transB) Close() error {
return trans.underlying.Close()
func (this *transB) Close() error {
return this.underlying.Close()
}
func (trans transB) ID() int64 {
return trans.underlying.ID()
func (this *transB) ID() int64 {
return this.underlying.ID()
}
func (trans transB) Send(method uint16, data []byte) error {
return encodeMessageB(trans.underlying, method, data)
func (this *transB) Send(method uint16, data []byte) error {
return encodeMessageB(this.underlying, this.sizeLimit, method, data)
}
func (trans transB) Receive() (uint16, []byte, error) {
return decodeMessageB(trans.underlying)
func (this *transB) SendWriter(method uint16) (io.WriteCloser, error) {
if this.currentWriter != nil {
this.currentWriter.Close()
}
// TODO: come up with a fix that allows us to pipe data through the
// writer. as of now, it just reads whatever is written into a buffer
// and sends the message on close. we should probably introduce chunked
// encoding to METADAPT-B to fix this. the implementation would be
// simpler than on METADAPT-A, but most of the code could just be
// copied over.
writer := &writerB {
parent: this,
method: method,
}
this.currentWriter = writer
return writer, nil
}
func (this *transB) Receive() (uint16, []byte, error) {
// get a reader for the next message
method, size, data, err := this.receiveReader()
if err != nil { return 0, nil, err }
// read the entire thing
payloadBuffer := make([]byte, int(size))
_, err = io.ReadFull(data, payloadBuffer)
if err != nil { return 0, nil, err }
// we have used up the reader by now so we can forget it exists
this.currentData = nil
return method, payloadBuffer, nil
}
func (this *transB) ReceiveReader() (uint16, io.Reader, error) {
method, _, data, err := this.receiveReader()
return method, data, err
}
func (this *transB) receiveReader() (uint16, int64, io.Reader, error) {
// decode the message
method, size, data, err := decodeMessageB(this.underlying, this.sizeLimit)
if err != nil { return 0, 0, nil, err }
// discard current reader if there is one
if this.currentData == nil {
io.Copy(io.Discard, this.currentData)
}
this.currentData = data
return method, size, data, nil
}
type writerB struct {
parent *transB
buffer bytes.Buffer
method uint16
}
func (this *writerB) Write(data []byte) (int, error) {
return this.buffer.Write(data)
}
func (this *writerB) Close() error {
return this.parent.Send(this.method, this.buffer.Bytes())
}
// MultiConn represens a multiplexed stream-oriented transport for use in
@ -84,27 +160,42 @@ type Stream interface {
ID() int64
}
func encodeMessageB(writer io.Writer, method uint16, data []byte) error {
buffer := make([]byte, 4 + len(data))
func encodeMessageB(writer io.Writer, sizeLimit int64, method uint16, data []byte) error {
if int64(len(data)) > sizeLimit {
return ErrPayloadTooLarge
}
buffer := make([]byte, 10 + len(data))
tape.EncodeI16(buffer[:2], method)
length, ok := tape.U16CastSafe(len(data))
if !ok { return ErrPayloadTooLarge }
tape.EncodeI16(buffer[2:4], length)
copy(buffer[4:], data)
tape.EncodeI64(buffer[2:10], uint64(len(data)))
copy(buffer[10:], data)
_, err := writer.Write(buffer)
return err
}
func decodeMessageB(reader io.Reader) (uint16, []byte, error) {
headerBuffer := [4]byte { }
_, err := io.ReadFull(reader, headerBuffer[:])
if err != nil { return 0, nil, err }
method, err := tape.DecodeI16[uint16](headerBuffer[:2])
if err != nil { return 0, nil, err }
length, err := tape.DecodeI16[uint16](headerBuffer[2:4])
if err != nil { return 0, nil, err }
payloadBuffer := make([]byte, int(length))
_, err = io.ReadFull(reader, payloadBuffer)
if err != nil { return 0, nil, err }
return method, payloadBuffer, nil
func decodeMessageB(
reader io.Reader,
sizeLimit int64,
) (
method uint16,
size int64,
data io.Reader,
err error,
) {
headerBuffer := [10]byte { }
_, err = io.ReadFull(reader, headerBuffer[:])
if err != nil {
if errors.Is(err, io.EOF) { return 0, 0, nil, io.ErrUnexpectedEOF }
return 0, 0, nil, err
}
method, err = tape.DecodeI16[uint16](headerBuffer[:2])
if err != nil { return 0, 0, nil, err }
length, err := tape.DecodeI64[uint64](headerBuffer[2:10])
if err != nil { return 0, 0, nil, err }
if length > uint64(sizeLimit) {
return 0, 0, nil, ErrPayloadTooLarge
}
return method, int64(length), &io.LimitedReader {
R: reader,
N: int64(length),
}, nil
}

View File

@ -9,9 +9,9 @@ import "testing"
func TestEncodeMessageB(test *testing.T) {
buffer := new(bytes.Buffer)
payload := []byte { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05 }
err := encodeMessageB(buffer, 0x6B12, payload)
err := encodeMessageB(buffer, defaultSizeLimit, 0x6B12, payload)
correct := []byte {
0x6B, 0x12,
0x6B, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x06,
0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
}
@ -26,24 +26,25 @@ func TestEncodeMessageB(test *testing.T) {
func TestEncodeMessageBErr(test *testing.T) {
buffer := new(bytes.Buffer)
payload := make([]byte, 0x10000)
err := encodeMessageB(buffer, 0x6B12, payload)
err := encodeMessageB(buffer, 255, 0x6B12, payload)
if !errors.Is(err, ErrPayloadTooLarge) {
test.Fatalf("wrong error: %v", err)
}
}
func TestDecodeMessageB(test *testing.T) {
method, payload, err := decodeMessageB(bytes.NewReader([]byte {
0x6B, 0x12,
method, _, data, err := decodeMessageB(bytes.NewReader([]byte {
0x6B, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x06,
0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
}))
}), defaultSizeLimit)
if err != nil {
test.Fatal(err)
}
if got, correct := method, uint16(0x6B12); got != correct {
test.Fatalf("not equal: %v %v", got, correct)
}
payload, _ := io.ReadAll(data)
correctPayload := []byte { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05 }
if got, correct := payload, correctPayload; !slices.Equal(got, correct) {
test.Fatalf("not equal: %v %v", got, correct)
@ -51,11 +52,9 @@ func TestDecodeMessageB(test *testing.T) {
}
func TestDecodeMessageBErr(test *testing.T) {
_, _, err := decodeMessageB(bytes.NewReader([]byte {
0x6B, 0x12,
0x01, 0x06,
0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
}))
_, _, _, err := decodeMessageB(bytes.NewReader([]byte {
0x6B, 0x12, 0x00, 0x00, 0x00, 0x00,
}), defaultSizeLimit)
if !errors.Is(err, io.ErrUnexpectedEOF) {
test.Fatalf("wrong error: %v", err)
}