Compare commits

...

6 Commits

5 changed files with 75 additions and 30 deletions

View File

@ -1,8 +1,11 @@
package hopp
import "io"
import "net"
// import "time"
const defaultSizeLimit = 1024 * 1024 // 1 megabyte
// Conn is a HOPP connection.
type Conn interface {
// Close closes the connection. Any blocked operations on the connection
@ -19,6 +22,10 @@ type Conn interface {
// AcceptTrans accepts a transaction from the other party. This must
// be called in a loop to avoid the connection locking up.
AcceptTrans() (Trans, error)
// SetSizeLimit sets a limit (in bytes) for how large messages can be.
// By default, this limit is 1 megabyte.
SetSizeLimit(limit int)
}
// Trans is a HOPP transaction.
@ -37,4 +44,7 @@ type Trans interface {
Send(method uint16, data []byte) error
// Receive receives a message.
Receive() (method uint16, data []byte, err error)
// ReceiveReader receives a message as an [io.Reader]. Any reader
// previously opened through this function will be discarded.
ReceiveReader() (method uint16, data io.Reader, err error)
}

View File

@ -123,14 +123,16 @@ be of the same size.
## Transports
A transport is a protocol that HOPP connections can run on top of. HOPP
currently supports the QUIC transport protocol for communicating between
machines, and UNIX domain sockets for quicker communication among applications
on the same machine. Both protocols are supported through METADAPT.
machines, TCP/TLS for legacy systems that do not support QUIC, and UNIX domain
sockets for faster communication among applications on the same machine. Both
protocols are supported through METADAPT.
## Message and Transaction Demarcation Protocol (METADAPT)
The Message and Transaction Demarcation Protocol is used to break one or more
reliable data streams into transactions, which are broken down further into
messages. A message, as well as its associated metadata (length, transaction,
method, etc.) together is referred to as METADAPT Message Block (MMB).
messages. The representation of a message (or a part thereof) on the protocol,
including its associated metadata (length, transaction, method, etc.) is
referred to as METADAPT Message Block (MMB).
For transports that offer multiple multiplexed data streams that can be created
and destroyed on-demand (such as QUIC) each stream is used as a transaction. If
@ -145,8 +147,12 @@ METADAPT-A requires a transport which offers a single full-duplex data stream
that persists for the duration of the connection. All transactions are
multiplexed onto this single stream. Each MMB contains a 12-octet long header,
with the transaction ID, then the method, and then the payload size (in octets).
The transaction ID is encoded as an I64, and the method and payload size are
both encoded as U16s. The remainder of the message is the payload. Since each
The transaction ID is encoded as an I64, the method is encoded as a U16 and the
and payload size is encoded as a U64. Only the 63 least significant bits of the
payload size describe the actual size, the most significant bit controlling
chunking. See the section on chunking for more information.
The remainder of the message is the payload. Since each
MMB is self-describing, they are sent sequentially with no gaps in-between them.
Transactions "open" when the first message with a given transaction ID is sent.
@ -162,13 +168,25 @@ used up, the connection must fail. Don't worry about this though, because the
sun will have expanded to swallow earth by then. Your connection will not last
that long.
#### Message Chunking
The most significant bit of the payload size field of an MMB is called the Chunk
Control Bit (CCB). If the CCB of a given MMB is zero, the represented message is
interpreted as being self-contained and the data is processed immediately. If
the CCB is one, the message is interpreted as being chunked, with the data of
the current MMB being the first chunk. The data of further MMBs sent along the
transaction will be appended to the message until an MMB is read with a zero
CCB, in which case the MMB will be the last chunk and any more MMBs will be
interpreted as normal.
### METADAPT-B
METADAPT-B requires a transport which offers multiple multiplexed full-duplex
data streams per connection that can be created and destroyed on-demand. Each
data stream is used as an individual transaction. Each MMB contains a 4-octet
long header with the method and then the payload size (in octets) both encoded
as U16s. The remainder of the message is the payload. Since each MMB is
self-describing, they are sent sequentially with no gaps in-between them.
long header with the method and then the payload size (in octets) encoded as a
U16 and U64 respectively. The remainder of the message is the payload. Since
each MMB is self-describing, they are sent sequentially with no gaps in-between
them.
The ID of any transaction will reflect the ID of its corresponding stream. The
lifetime of the transaction is tied to the lifetime of the stream, that is to

View File

@ -78,7 +78,6 @@ func TestConnA(test *testing.T) {
test.Fatal("CLIENT wrong error:", err)
}
test.Log("CLIENT done")
// TODO test error from trans/connection closed by other side
}
func TestEncodeMessageA(test *testing.T) {

View File

@ -8,6 +8,7 @@ import "git.tebibyte.media/sashakoshka/hopp/tape"
// B implements METADAPT-B over a multiplexed stream-oriented transport such as
// QUIC.
type b struct {
sizeLimit int
underlying MultiConn
}
@ -15,6 +16,7 @@ type b struct {
// oriented transport such as TCP or UNIX domain stream sockets.
func AdaptB(underlying MultiConn) Conn {
return &b {
sizeLimit: defaultSizeLimit,
underlying: underlying,
}
}
@ -34,16 +36,28 @@ func (this *b) RemoteAddr() net.Addr {
func (this *b) OpenTrans() (Trans, error) {
stream, err := this.underlying.OpenStream()
if err != nil { return nil, err }
return transB { underlying: stream }, nil
return this.newTrans(stream), nil
}
func (this *b) AcceptTrans() (Trans, error) {
stream, err := this.underlying.AcceptStream(context.Background())
if err != nil { return nil, err }
return transB { underlying: stream }, nil
return this.newTrans(stream), nil
}
func (this *b) SetSizeLimit(limit int) {
this.sizeLimit = limit
}
func (this *b) newTrans(underlying Stream) transB {
return transB {
sizeLimit: this.sizeLimit,
underlying: underlying,
}
}
type transB struct {
sizeLimit int
underlying Stream
}
@ -56,11 +70,11 @@ func (trans transB) ID() int64 {
}
func (trans transB) Send(method uint16, data []byte) error {
return encodeMessageB(trans.underlying, method, data)
return encodeMessageB(trans.underlying, trans.sizeLimit, method, data)
}
func (trans transB) Receive() (uint16, []byte, error) {
return decodeMessageB(trans.underlying)
return decodeMessageB(trans.underlying, trans.sizeLimit)
}
// MultiConn represens a multiplexed stream-oriented transport for use in
@ -84,25 +98,29 @@ type Stream interface {
ID() int64
}
func encodeMessageB(writer io.Writer, method uint16, data []byte) error {
buffer := make([]byte, 4 + len(data))
func encodeMessageB(writer io.Writer, sizeLimit int, method uint16, data []byte) error {
if len(data) > sizeLimit {
return ErrPayloadTooLarge
}
buffer := make([]byte, 10 + len(data))
tape.EncodeI16(buffer[:2], method)
length, ok := tape.U16CastSafe(len(data))
if !ok { return ErrPayloadTooLarge }
tape.EncodeI16(buffer[2:4], length)
copy(buffer[4:], data)
tape.EncodeI64(buffer[2:10], uint64(len(data)))
copy(buffer[10:], data)
_, err := writer.Write(buffer)
return err
}
func decodeMessageB(reader io.Reader) (uint16, []byte, error) {
headerBuffer := [4]byte { }
func decodeMessageB(reader io.Reader, sizeLimit int) (uint16, []byte, error) {
headerBuffer := [10]byte { }
_, err := io.ReadFull(reader, headerBuffer[:])
if err != nil { return 0, nil, err }
method, err := tape.DecodeI16[uint16](headerBuffer[:2])
if err != nil { return 0, nil, err }
length, err := tape.DecodeI16[uint16](headerBuffer[2:4])
length, err := tape.DecodeI64[uint64](headerBuffer[2:10])
if err != nil { return 0, nil, err }
if length > uint64(sizeLimit) {
return 0, nil, ErrPayloadTooLarge
}
payloadBuffer := make([]byte, int(length))
_, err = io.ReadFull(reader, payloadBuffer)
if err != nil { return 0, nil, err }

View File

@ -9,9 +9,9 @@ import "testing"
func TestEncodeMessageB(test *testing.T) {
buffer := new(bytes.Buffer)
payload := []byte { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05 }
err := encodeMessageB(buffer, 0x6B12, payload)
err := encodeMessageB(buffer, defaultSizeLimit, 0x6B12, payload)
correct := []byte {
0x6B, 0x12,
0x6B, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x06,
0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
}
@ -26,7 +26,7 @@ func TestEncodeMessageB(test *testing.T) {
func TestEncodeMessageBErr(test *testing.T) {
buffer := new(bytes.Buffer)
payload := make([]byte, 0x10000)
err := encodeMessageB(buffer, 0x6B12, payload)
err := encodeMessageB(buffer, 255, 0x6B12, payload)
if !errors.Is(err, ErrPayloadTooLarge) {
test.Fatalf("wrong error: %v", err)
}
@ -34,10 +34,10 @@ func TestEncodeMessageBErr(test *testing.T) {
func TestDecodeMessageB(test *testing.T) {
method, payload, err := decodeMessageB(bytes.NewReader([]byte {
0x6B, 0x12,
0x6B, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x06,
0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
}))
}), defaultSizeLimit)
if err != nil {
test.Fatal(err)
}
@ -52,10 +52,10 @@ func TestDecodeMessageB(test *testing.T) {
func TestDecodeMessageBErr(test *testing.T) {
_, _, err := decodeMessageB(bytes.NewReader([]byte {
0x6B, 0x12,
0x6B, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x01, 0x06,
0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
}))
}), defaultSizeLimit)
if !errors.Is(err, io.ErrUnexpectedEOF) {
test.Fatalf("wrong error: %v", err)
}