Compare commits

...

6 Commits

5 changed files with 75 additions and 30 deletions

View File

@ -1,8 +1,11 @@
package hopp package hopp
import "io"
import "net" import "net"
// import "time" // import "time"
const defaultSizeLimit = 1024 * 1024 // 1 megabyte
// Conn is a HOPP connection. // Conn is a HOPP connection.
type Conn interface { type Conn interface {
// Close closes the connection. Any blocked operations on the connection // Close closes the connection. Any blocked operations on the connection
@ -19,6 +22,10 @@ type Conn interface {
// AcceptTrans accepts a transaction from the other party. This must // AcceptTrans accepts a transaction from the other party. This must
// be called in a loop to avoid the connection locking up. // be called in a loop to avoid the connection locking up.
AcceptTrans() (Trans, error) AcceptTrans() (Trans, error)
// SetSizeLimit sets a limit (in bytes) for how large messages can be.
// By default, this limit is 1 megabyte.
SetSizeLimit(limit int)
} }
// Trans is a HOPP transaction. // Trans is a HOPP transaction.
@ -37,4 +44,7 @@ type Trans interface {
Send(method uint16, data []byte) error Send(method uint16, data []byte) error
// Receive receives a message. // Receive receives a message.
Receive() (method uint16, data []byte, err error) Receive() (method uint16, data []byte, err error)
// ReceiveReader receives a message as an [io.Reader]. Any reader
// previously opened through this function will be discarded.
ReceiveReader() (method uint16, data io.Reader, err error)
} }

View File

@ -123,14 +123,16 @@ be of the same size.
## Transports ## Transports
A transport is a protocol that HOPP connections can run on top of. HOPP A transport is a protocol that HOPP connections can run on top of. HOPP
currently supports the QUIC transport protocol for communicating between currently supports the QUIC transport protocol for communicating between
machines, and UNIX domain sockets for quicker communication among applications machines, TCP/TLS for legacy systems that do not support QUIC, and UNIX domain
on the same machine. Both protocols are supported through METADAPT. sockets for faster communication among applications on the same machine. Both
protocols are supported through METADAPT.
## Message and Transaction Demarcation Protocol (METADAPT) ## Message and Transaction Demarcation Protocol (METADAPT)
The Message and Transaction Demarcation Protocol is used to break one or more The Message and Transaction Demarcation Protocol is used to break one or more
reliable data streams into transactions, which are broken down further into reliable data streams into transactions, which are broken down further into
messages. A message, as well as its associated metadata (length, transaction, messages. The representation of a message (or a part thereof) on the protocol,
method, etc.) together is referred to as METADAPT Message Block (MMB). including its associated metadata (length, transaction, method, etc.) is
referred to as METADAPT Message Block (MMB).
For transports that offer multiple multiplexed data streams that can be created For transports that offer multiple multiplexed data streams that can be created
and destroyed on-demand (such as QUIC) each stream is used as a transaction. If and destroyed on-demand (such as QUIC) each stream is used as a transaction. If
@ -145,8 +147,12 @@ METADAPT-A requires a transport which offers a single full-duplex data stream
that persists for the duration of the connection. All transactions are that persists for the duration of the connection. All transactions are
multiplexed onto this single stream. Each MMB contains a 12-octet long header, multiplexed onto this single stream. Each MMB contains a 12-octet long header,
with the transaction ID, then the method, and then the payload size (in octets). with the transaction ID, then the method, and then the payload size (in octets).
The transaction ID is encoded as an I64, and the method and payload size are The transaction ID is encoded as an I64, the method is encoded as a U16 and the
both encoded as U16s. The remainder of the message is the payload. Since each and payload size is encoded as a U64. Only the 63 least significant bits of the
payload size describe the actual size, the most significant bit controlling
chunking. See the section on chunking for more information.
The remainder of the message is the payload. Since each
MMB is self-describing, they are sent sequentially with no gaps in-between them. MMB is self-describing, they are sent sequentially with no gaps in-between them.
Transactions "open" when the first message with a given transaction ID is sent. Transactions "open" when the first message with a given transaction ID is sent.
@ -162,13 +168,25 @@ used up, the connection must fail. Don't worry about this though, because the
sun will have expanded to swallow earth by then. Your connection will not last sun will have expanded to swallow earth by then. Your connection will not last
that long. that long.
#### Message Chunking
The most significant bit of the payload size field of an MMB is called the Chunk
Control Bit (CCB). If the CCB of a given MMB is zero, the represented message is
interpreted as being self-contained and the data is processed immediately. If
the CCB is one, the message is interpreted as being chunked, with the data of
the current MMB being the first chunk. The data of further MMBs sent along the
transaction will be appended to the message until an MMB is read with a zero
CCB, in which case the MMB will be the last chunk and any more MMBs will be
interpreted as normal.
### METADAPT-B ### METADAPT-B
METADAPT-B requires a transport which offers multiple multiplexed full-duplex METADAPT-B requires a transport which offers multiple multiplexed full-duplex
data streams per connection that can be created and destroyed on-demand. Each data streams per connection that can be created and destroyed on-demand. Each
data stream is used as an individual transaction. Each MMB contains a 4-octet data stream is used as an individual transaction. Each MMB contains a 4-octet
long header with the method and then the payload size (in octets) both encoded long header with the method and then the payload size (in octets) encoded as a
as U16s. The remainder of the message is the payload. Since each MMB is U16 and U64 respectively. The remainder of the message is the payload. Since
self-describing, they are sent sequentially with no gaps in-between them. each MMB is self-describing, they are sent sequentially with no gaps in-between
them.
The ID of any transaction will reflect the ID of its corresponding stream. The The ID of any transaction will reflect the ID of its corresponding stream. The
lifetime of the transaction is tied to the lifetime of the stream, that is to lifetime of the transaction is tied to the lifetime of the stream, that is to

View File

@ -78,7 +78,6 @@ func TestConnA(test *testing.T) {
test.Fatal("CLIENT wrong error:", err) test.Fatal("CLIENT wrong error:", err)
} }
test.Log("CLIENT done") test.Log("CLIENT done")
// TODO test error from trans/connection closed by other side
} }
func TestEncodeMessageA(test *testing.T) { func TestEncodeMessageA(test *testing.T) {

View File

@ -8,6 +8,7 @@ import "git.tebibyte.media/sashakoshka/hopp/tape"
// B implements METADAPT-B over a multiplexed stream-oriented transport such as // B implements METADAPT-B over a multiplexed stream-oriented transport such as
// QUIC. // QUIC.
type b struct { type b struct {
sizeLimit int
underlying MultiConn underlying MultiConn
} }
@ -15,6 +16,7 @@ type b struct {
// oriented transport such as TCP or UNIX domain stream sockets. // oriented transport such as TCP or UNIX domain stream sockets.
func AdaptB(underlying MultiConn) Conn { func AdaptB(underlying MultiConn) Conn {
return &b { return &b {
sizeLimit: defaultSizeLimit,
underlying: underlying, underlying: underlying,
} }
} }
@ -34,16 +36,28 @@ func (this *b) RemoteAddr() net.Addr {
func (this *b) OpenTrans() (Trans, error) { func (this *b) OpenTrans() (Trans, error) {
stream, err := this.underlying.OpenStream() stream, err := this.underlying.OpenStream()
if err != nil { return nil, err } if err != nil { return nil, err }
return transB { underlying: stream }, nil return this.newTrans(stream), nil
} }
func (this *b) AcceptTrans() (Trans, error) { func (this *b) AcceptTrans() (Trans, error) {
stream, err := this.underlying.AcceptStream(context.Background()) stream, err := this.underlying.AcceptStream(context.Background())
if err != nil { return nil, err } if err != nil { return nil, err }
return transB { underlying: stream }, nil return this.newTrans(stream), nil
}
func (this *b) SetSizeLimit(limit int) {
this.sizeLimit = limit
}
func (this *b) newTrans(underlying Stream) transB {
return transB {
sizeLimit: this.sizeLimit,
underlying: underlying,
}
} }
type transB struct { type transB struct {
sizeLimit int
underlying Stream underlying Stream
} }
@ -56,11 +70,11 @@ func (trans transB) ID() int64 {
} }
func (trans transB) Send(method uint16, data []byte) error { func (trans transB) Send(method uint16, data []byte) error {
return encodeMessageB(trans.underlying, method, data) return encodeMessageB(trans.underlying, trans.sizeLimit, method, data)
} }
func (trans transB) Receive() (uint16, []byte, error) { func (trans transB) Receive() (uint16, []byte, error) {
return decodeMessageB(trans.underlying) return decodeMessageB(trans.underlying, trans.sizeLimit)
} }
// MultiConn represens a multiplexed stream-oriented transport for use in // MultiConn represens a multiplexed stream-oriented transport for use in
@ -84,25 +98,29 @@ type Stream interface {
ID() int64 ID() int64
} }
func encodeMessageB(writer io.Writer, method uint16, data []byte) error { func encodeMessageB(writer io.Writer, sizeLimit int, method uint16, data []byte) error {
buffer := make([]byte, 4 + len(data)) if len(data) > sizeLimit {
return ErrPayloadTooLarge
}
buffer := make([]byte, 10 + len(data))
tape.EncodeI16(buffer[:2], method) tape.EncodeI16(buffer[:2], method)
length, ok := tape.U16CastSafe(len(data)) tape.EncodeI64(buffer[2:10], uint64(len(data)))
if !ok { return ErrPayloadTooLarge } copy(buffer[10:], data)
tape.EncodeI16(buffer[2:4], length)
copy(buffer[4:], data)
_, err := writer.Write(buffer) _, err := writer.Write(buffer)
return err return err
} }
func decodeMessageB(reader io.Reader) (uint16, []byte, error) { func decodeMessageB(reader io.Reader, sizeLimit int) (uint16, []byte, error) {
headerBuffer := [4]byte { } headerBuffer := [10]byte { }
_, err := io.ReadFull(reader, headerBuffer[:]) _, err := io.ReadFull(reader, headerBuffer[:])
if err != nil { return 0, nil, err } if err != nil { return 0, nil, err }
method, err := tape.DecodeI16[uint16](headerBuffer[:2]) method, err := tape.DecodeI16[uint16](headerBuffer[:2])
if err != nil { return 0, nil, err } if err != nil { return 0, nil, err }
length, err := tape.DecodeI16[uint16](headerBuffer[2:4]) length, err := tape.DecodeI64[uint64](headerBuffer[2:10])
if err != nil { return 0, nil, err } if err != nil { return 0, nil, err }
if length > uint64(sizeLimit) {
return 0, nil, ErrPayloadTooLarge
}
payloadBuffer := make([]byte, int(length)) payloadBuffer := make([]byte, int(length))
_, err = io.ReadFull(reader, payloadBuffer) _, err = io.ReadFull(reader, payloadBuffer)
if err != nil { return 0, nil, err } if err != nil { return 0, nil, err }

View File

@ -9,9 +9,9 @@ import "testing"
func TestEncodeMessageB(test *testing.T) { func TestEncodeMessageB(test *testing.T) {
buffer := new(bytes.Buffer) buffer := new(bytes.Buffer)
payload := []byte { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05 } payload := []byte { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05 }
err := encodeMessageB(buffer, 0x6B12, payload) err := encodeMessageB(buffer, defaultSizeLimit, 0x6B12, payload)
correct := []byte { correct := []byte {
0x6B, 0x12, 0x6B, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x06, 0x00, 0x06,
0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
} }
@ -26,7 +26,7 @@ func TestEncodeMessageB(test *testing.T) {
func TestEncodeMessageBErr(test *testing.T) { func TestEncodeMessageBErr(test *testing.T) {
buffer := new(bytes.Buffer) buffer := new(bytes.Buffer)
payload := make([]byte, 0x10000) payload := make([]byte, 0x10000)
err := encodeMessageB(buffer, 0x6B12, payload) err := encodeMessageB(buffer, 255, 0x6B12, payload)
if !errors.Is(err, ErrPayloadTooLarge) { if !errors.Is(err, ErrPayloadTooLarge) {
test.Fatalf("wrong error: %v", err) test.Fatalf("wrong error: %v", err)
} }
@ -34,10 +34,10 @@ func TestEncodeMessageBErr(test *testing.T) {
func TestDecodeMessageB(test *testing.T) { func TestDecodeMessageB(test *testing.T) {
method, payload, err := decodeMessageB(bytes.NewReader([]byte { method, payload, err := decodeMessageB(bytes.NewReader([]byte {
0x6B, 0x12, 0x6B, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x06, 0x00, 0x06,
0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
})) }), defaultSizeLimit)
if err != nil { if err != nil {
test.Fatal(err) test.Fatal(err)
} }
@ -52,10 +52,10 @@ func TestDecodeMessageB(test *testing.T) {
func TestDecodeMessageBErr(test *testing.T) { func TestDecodeMessageBErr(test *testing.T) {
_, _, err := decodeMessageB(bytes.NewReader([]byte { _, _, err := decodeMessageB(bytes.NewReader([]byte {
0x6B, 0x12, 0x6B, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x01, 0x06, 0x01, 0x06,
0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
})) }), defaultSizeLimit)
if !errors.Is(err, io.ErrUnexpectedEOF) { if !errors.Is(err, io.ErrUnexpectedEOF) {
test.Fatalf("wrong error: %v", err) test.Fatalf("wrong error: %v", err)
} }