From 174634a330b9e337ad1b028497cc13fa3edb1d27 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Fri, 4 Apr 2025 15:42:15 -0400 Subject: [PATCH 01/38] METADAPT-B can now send very large messages --- metadapta_test.go | 1 - metadaptb.go | 12 +++++------- metadaptb_test.go | 15 +++------------ 3 files changed, 8 insertions(+), 20 deletions(-) diff --git a/metadapta_test.go b/metadapta_test.go index 1b2bc44..0ecebe2 100644 --- a/metadapta_test.go +++ b/metadapta_test.go @@ -78,7 +78,6 @@ func TestConnA(test *testing.T) { test.Fatal("CLIENT wrong error:", err) } test.Log("CLIENT done") - // TODO test error from trans/connection closed by other side } func TestEncodeMessageA(test *testing.T) { diff --git a/metadaptb.go b/metadaptb.go index f8bed04..52e98fc 100644 --- a/metadaptb.go +++ b/metadaptb.go @@ -85,23 +85,21 @@ type Stream interface { } func encodeMessageB(writer io.Writer, method uint16, data []byte) error { - buffer := make([]byte, 4 + len(data)) + buffer := make([]byte, 10 + len(data)) tape.EncodeI16(buffer[:2], method) - length, ok := tape.U16CastSafe(len(data)) - if !ok { return ErrPayloadTooLarge } - tape.EncodeI16(buffer[2:4], length) - copy(buffer[4:], data) + tape.EncodeI64(buffer[2:10], uint64(len(data))) + copy(buffer[10:], data) _, err := writer.Write(buffer) return err } func decodeMessageB(reader io.Reader) (uint16, []byte, error) { - headerBuffer := [4]byte { } + headerBuffer := [10]byte { } _, err := io.ReadFull(reader, headerBuffer[:]) if err != nil { return 0, nil, err } method, err := tape.DecodeI16[uint16](headerBuffer[:2]) if err != nil { return 0, nil, err } - length, err := tape.DecodeI16[uint16](headerBuffer[2:4]) + length, err := tape.DecodeI64[uint64](headerBuffer[2:10]) if err != nil { return 0, nil, err } payloadBuffer := make([]byte, int(length)) _, err = io.ReadFull(reader, payloadBuffer) diff --git a/metadaptb_test.go b/metadaptb_test.go index 416f5fd..dc0e8d4 100644 --- a/metadaptb_test.go +++ b/metadaptb_test.go @@ -11,7 +11,7 @@ func TestEncodeMessageB(test *testing.T) { payload := []byte { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05 } err := encodeMessageB(buffer, 0x6B12, payload) correct := []byte { - 0x6B, 0x12, + 0x6B, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, } @@ -23,18 +23,9 @@ func TestEncodeMessageB(test *testing.T) { } } -func TestEncodeMessageBErr(test *testing.T) { - buffer := new(bytes.Buffer) - payload := make([]byte, 0x10000) - err := encodeMessageB(buffer, 0x6B12, payload) - if !errors.Is(err, ErrPayloadTooLarge) { - test.Fatalf("wrong error: %v", err) - } -} - func TestDecodeMessageB(test *testing.T) { method, payload, err := decodeMessageB(bytes.NewReader([]byte { - 0x6B, 0x12, + 0x6B, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, })) @@ -52,7 +43,7 @@ func TestDecodeMessageB(test *testing.T) { func TestDecodeMessageBErr(test *testing.T) { _, _, err := decodeMessageB(bytes.NewReader([]byte { - 0x6B, 0x12, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x6B, 0x12, 0x01, 0x06, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, })) -- 2.46.1 From 1ac0ed51c7769e166ab7c70794ccf6ff2273002a Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Fri, 4 Apr 2025 16:07:20 -0400 Subject: [PATCH 02/38] METADAPT-B supports setting a message length limit --- connection.go | 6 ++++++ metadaptb.go | 32 ++++++++++++++++++++++++++------ metadaptb_test.go | 17 +++++++++++++---- 3 files changed, 45 insertions(+), 10 deletions(-) diff --git a/connection.go b/connection.go index adefbdf..11f6fe9 100644 --- a/connection.go +++ b/connection.go @@ -3,6 +3,8 @@ package hopp import "net" // import "time" +const defaultSizeLimit = 1024 * 1024 // 1 megabyte + // Conn is a HOPP connection. type Conn interface { // Close closes the connection. Any blocked operations on the connection @@ -19,6 +21,10 @@ type Conn interface { // AcceptTrans accepts a transaction from the other party. This must // be called in a loop to avoid the connection locking up. AcceptTrans() (Trans, error) + + // SetSizeLimit sets a limit (in bytes) for how large messages can be. + // By default, this limit is 1 megabyte. + SetSizeLimit(limit int) } // Trans is a HOPP transaction. diff --git a/metadaptb.go b/metadaptb.go index 52e98fc..1c6eb9f 100644 --- a/metadaptb.go +++ b/metadaptb.go @@ -8,6 +8,7 @@ import "git.tebibyte.media/sashakoshka/hopp/tape" // B implements METADAPT-B over a multiplexed stream-oriented transport such as // QUIC. type b struct { + sizeLimit int underlying MultiConn } @@ -15,6 +16,7 @@ type b struct { // oriented transport such as TCP or UNIX domain stream sockets. func AdaptB(underlying MultiConn) Conn { return &b { + sizeLimit: defaultSizeLimit, underlying: underlying, } } @@ -34,16 +36,28 @@ func (this *b) RemoteAddr() net.Addr { func (this *b) OpenTrans() (Trans, error) { stream, err := this.underlying.OpenStream() if err != nil { return nil, err } - return transB { underlying: stream }, nil + return this.newTrans(stream), nil } func (this *b) AcceptTrans() (Trans, error) { stream, err := this.underlying.AcceptStream(context.Background()) if err != nil { return nil, err } - return transB { underlying: stream }, nil + return this.newTrans(stream), nil +} + +func (this *b) SetSizeLimit(limit int) { + this.sizeLimit = limit +} + +func (this *b) newTrans(underlying Stream) transB { + return transB { + sizeLimit: this.sizeLimit, + underlying: underlying, + } } type transB struct { + sizeLimit int underlying Stream } @@ -56,11 +70,11 @@ func (trans transB) ID() int64 { } func (trans transB) Send(method uint16, data []byte) error { - return encodeMessageB(trans.underlying, method, data) + return encodeMessageB(trans.underlying, trans.sizeLimit, method, data) } func (trans transB) Receive() (uint16, []byte, error) { - return decodeMessageB(trans.underlying) + return decodeMessageB(trans.underlying, trans.sizeLimit) } // MultiConn represens a multiplexed stream-oriented transport for use in @@ -84,7 +98,10 @@ type Stream interface { ID() int64 } -func encodeMessageB(writer io.Writer, method uint16, data []byte) error { +func encodeMessageB(writer io.Writer, sizeLimit int, method uint16, data []byte) error { + if len(data) > sizeLimit { + return ErrPayloadTooLarge + } buffer := make([]byte, 10 + len(data)) tape.EncodeI16(buffer[:2], method) tape.EncodeI64(buffer[2:10], uint64(len(data))) @@ -93,7 +110,7 @@ func encodeMessageB(writer io.Writer, method uint16, data []byte) error { return err } -func decodeMessageB(reader io.Reader) (uint16, []byte, error) { +func decodeMessageB(reader io.Reader, sizeLimit int) (uint16, []byte, error) { headerBuffer := [10]byte { } _, err := io.ReadFull(reader, headerBuffer[:]) if err != nil { return 0, nil, err } @@ -101,6 +118,9 @@ func decodeMessageB(reader io.Reader) (uint16, []byte, error) { if err != nil { return 0, nil, err } length, err := tape.DecodeI64[uint64](headerBuffer[2:10]) if err != nil { return 0, nil, err } + if length > uint64(sizeLimit) { + return 0, nil, ErrPayloadTooLarge + } payloadBuffer := make([]byte, int(length)) _, err = io.ReadFull(reader, payloadBuffer) if err != nil { return 0, nil, err } diff --git a/metadaptb_test.go b/metadaptb_test.go index dc0e8d4..add8043 100644 --- a/metadaptb_test.go +++ b/metadaptb_test.go @@ -9,7 +9,7 @@ import "testing" func TestEncodeMessageB(test *testing.T) { buffer := new(bytes.Buffer) payload := []byte { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05 } - err := encodeMessageB(buffer, 0x6B12, payload) + err := encodeMessageB(buffer, defaultSizeLimit, 0x6B12, payload) correct := []byte { 0x6B, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, @@ -23,12 +23,21 @@ func TestEncodeMessageB(test *testing.T) { } } +func TestEncodeMessageBErr(test *testing.T) { + buffer := new(bytes.Buffer) + payload := make([]byte, 0x10000) + err := encodeMessageB(buffer, 255, 0x6B12, payload) + if !errors.Is(err, ErrPayloadTooLarge) { + test.Fatalf("wrong error: %v", err) + } +} + func TestDecodeMessageB(test *testing.T) { method, payload, err := decodeMessageB(bytes.NewReader([]byte { 0x6B, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, - })) + }), defaultSizeLimit) if err != nil { test.Fatal(err) } @@ -43,10 +52,10 @@ func TestDecodeMessageB(test *testing.T) { func TestDecodeMessageBErr(test *testing.T) { _, _, err := decodeMessageB(bytes.NewReader([]byte { - 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x6B, 0x12, + 0x6B, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x06, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, - })) + }), defaultSizeLimit) if !errors.Is(err, io.ErrUnexpectedEOF) { test.Fatalf("wrong error: %v", err) } -- 2.46.1 From 5c28510342558c68952dba25eea5a48e1df4a559 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Sat, 5 Apr 2025 21:04:45 -0400 Subject: [PATCH 03/38] Add new METADAPT protocol specifications from #2 --- design/protocol.md | 31 ++++++++++++++++++++++++------- 1 file changed, 24 insertions(+), 7 deletions(-) diff --git a/design/protocol.md b/design/protocol.md index ca37998..48e4c9b 100644 --- a/design/protocol.md +++ b/design/protocol.md @@ -129,8 +129,9 @@ on the same machine. Both protocols are supported through METADAPT. ## Message and Transaction Demarcation Protocol (METADAPT) The Message and Transaction Demarcation Protocol is used to break one or more reliable data streams into transactions, which are broken down further into -messages. A message, as well as its associated metadata (length, transaction, -method, etc.) together is referred to as METADAPT Message Block (MMB). +messages. The representation of a message (or a part thereof) on the protocol, +including its associated metadata (length, transaction, method, etc.) is +referred to as METADAPT Message Block (MMB). For transports that offer multiple multiplexed data streams that can be created and destroyed on-demand (such as QUIC) each stream is used as a transaction. If @@ -145,8 +146,12 @@ METADAPT-A requires a transport which offers a single full-duplex data stream that persists for the duration of the connection. All transactions are multiplexed onto this single stream. Each MMB contains a 12-octet long header, with the transaction ID, then the method, and then the payload size (in octets). -The transaction ID is encoded as an I64, and the method and payload size are -both encoded as U16s. The remainder of the message is the payload. Since each +The transaction ID is encoded as an I64, the method is encoded as a U16 and the +and payload size is encoded as a U64. Only the 63 least significant bits of the +payload size describe the actual size, the most significant bit controlling +chunking. See the section on chunking for more information. + +The remainder of the message is the payload. Since each MMB is self-describing, they are sent sequentially with no gaps in-between them. Transactions "open" when the first message with a given transaction ID is sent. @@ -162,13 +167,25 @@ used up, the connection must fail. Don't worry about this though, because the sun will have expanded to swallow earth by then. Your connection will not last that long. +#### Message Chunking + +The most significant bit of the payload size field of an MMB is called the Chunk +Control Bit (CCB). If the CCB of a given MMB is zero, the represented message is +interpreted as being self-contained and the data is processed immediately. If +the CCB is one, the message is interpreted as being chunked, with the data of +the current MMB being the first chunk. The data of further MMBs sent along the +transaction will be appended to the message until an MMB is read with a zero +CCB, in which case the MMB will be the last chunk and any more MMBs will be +interpreted as normal. + ### METADAPT-B METADAPT-B requires a transport which offers multiple multiplexed full-duplex data streams per connection that can be created and destroyed on-demand. Each data stream is used as an individual transaction. Each MMB contains a 4-octet -long header with the method and then the payload size (in octets) both encoded -as U16s. The remainder of the message is the payload. Since each MMB is -self-describing, they are sent sequentially with no gaps in-between them. +long header with the method and then the payload size (in octets) encoded as a +U16 and U64 respectively. The remainder of the message is the payload. Since +each MMB is self-describing, they are sent sequentially with no gaps in-between +them. The ID of any transaction will reflect the ID of its corresponding stream. The lifetime of the transaction is tied to the lifetime of the stream, that is to -- 2.46.1 From 4eae69dc94fe38c975733c8007834aaa2daf3664 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Sat, 5 Apr 2025 21:08:41 -0400 Subject: [PATCH 04/38] Add ReceiveReader to Transaction interface --- connection.go | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/connection.go b/connection.go index 11f6fe9..005d734 100644 --- a/connection.go +++ b/connection.go @@ -1,5 +1,6 @@ package hopp +import "io" import "net" // import "time" @@ -43,4 +44,7 @@ type Trans interface { Send(method uint16, data []byte) error // Receive receives a message. Receive() (method uint16, data []byte, err error) + // ReceiveReader receives a message as an [io.Reader]. Any reader + // previously opened through this function will be discarded. + ReceiveReader() (method uint16, size int64, data io.Reader, err error) } -- 2.46.1 From b07cdf088a6538625fc13828ca0f65b54595d1c7 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Sun, 6 Apr 2025 11:25:12 -0400 Subject: [PATCH 05/38] design: State support for TCP/TLS --- design/protocol.md | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/design/protocol.md b/design/protocol.md index 48e4c9b..550cc86 100644 --- a/design/protocol.md +++ b/design/protocol.md @@ -123,8 +123,9 @@ be of the same size. ## Transports A transport is a protocol that HOPP connections can run on top of. HOPP currently supports the QUIC transport protocol for communicating between -machines, and UNIX domain sockets for quicker communication among applications -on the same machine. Both protocols are supported through METADAPT. +machines, TCP/TLS for legacy systems that do not support QUIC, and UNIX domain +sockets for faster communication among applications on the same machine. Both +protocols are supported through METADAPT. ## Message and Transaction Demarcation Protocol (METADAPT) The Message and Transaction Demarcation Protocol is used to break one or more -- 2.46.1 From fe8f2fc3ea18c8c29037b8205a185a841d050b69 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Sun, 6 Apr 2025 11:25:28 -0400 Subject: [PATCH 06/38] Do not require METADAPT to return a message length when getting a reader --- connection.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/connection.go b/connection.go index 005d734..56154a7 100644 --- a/connection.go +++ b/connection.go @@ -46,5 +46,5 @@ type Trans interface { Receive() (method uint16, data []byte, err error) // ReceiveReader receives a message as an [io.Reader]. Any reader // previously opened through this function will be discarded. - ReceiveReader() (method uint16, size int64, data io.Reader, err error) + ReceiveReader() (method uint16, data io.Reader, err error) } -- 2.46.1 From f4f8039fa025c43255e32655fdc344c9b7ce090d Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Sun, 6 Apr 2025 14:17:39 -0400 Subject: [PATCH 07/38] Support getting a reader for a message in METADAPT-B --- metadaptb.go | 83 +++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 60 insertions(+), 23 deletions(-) diff --git a/metadaptb.go b/metadaptb.go index 1c6eb9f..f78bb26 100644 --- a/metadaptb.go +++ b/metadaptb.go @@ -5,6 +5,8 @@ import "net" import "context" import "git.tebibyte.media/sashakoshka/hopp/tape" +// TODO: change size limit to be int64 + // B implements METADAPT-B over a multiplexed stream-oriented transport such as // QUIC. type b struct { @@ -12,8 +14,8 @@ type b struct { underlying MultiConn } -// AdaptB returns a connection implementing METADAPT-B over a singular stream- -// oriented transport such as TCP or UNIX domain stream sockets. +// AdaptB returns a connection implementing METADAPT-B over a multiplexed +// stream-oriented transport such as QUIC. func AdaptB(underlying MultiConn) Conn { return &b { sizeLimit: defaultSizeLimit, @@ -49,8 +51,8 @@ func (this *b) SetSizeLimit(limit int) { this.sizeLimit = limit } -func (this *b) newTrans(underlying Stream) transB { - return transB { +func (this *b) newTrans(underlying Stream) *transB { + return &transB { sizeLimit: this.sizeLimit, underlying: underlying, } @@ -59,22 +61,49 @@ func (this *b) newTrans(underlying Stream) transB { type transB struct { sizeLimit int underlying Stream + currentData io.Reader } -func (trans transB) Close() error { - return trans.underlying.Close() +func (this *transB) Close() error { + return this.underlying.Close() } -func (trans transB) ID() int64 { - return trans.underlying.ID() +func (this *transB) ID() int64 { + return this.underlying.ID() } -func (trans transB) Send(method uint16, data []byte) error { - return encodeMessageB(trans.underlying, trans.sizeLimit, method, data) +func (this *transB) Send(method uint16, data []byte) error { + return encodeMessageB(this.underlying, this.sizeLimit, method, data) } -func (trans transB) Receive() (uint16, []byte, error) { - return decodeMessageB(trans.underlying, trans.sizeLimit) +func (this *transB) Receive() (uint16, []byte, error) { + // get a reader for the next message + method, size, data, err := this.receiveReader() + if err != nil { return 0, nil, err } + // read the entire thing + payloadBuffer := make([]byte, int(size)) + _, err = io.ReadFull(data, payloadBuffer) + if err != nil { return 0, nil, err } + // we have used up the reader by now so we can forget it exists + this.currentData = nil + return method, payloadBuffer, nil +} + +func (this *transB) ReceiveReader() (uint16, io.Reader, error) { + method, _, data, err := this.receiveReader() + return method, data, err +} + +func (this *transB) receiveReader() (uint16, int64, io.Reader, error) { + // decode the message + method, size, data, err := decodeMessageB(this.underlying, this.sizeLimit) + if err != nil { return 0, 0, nil, err } + // discard current reader if there is one + if this.currentData == nil { + io.Copy(io.Discard, this.currentData) + } + this.currentData = data + return method, size, data, nil } // MultiConn represens a multiplexed stream-oriented transport for use in @@ -110,19 +139,27 @@ func encodeMessageB(writer io.Writer, sizeLimit int, method uint16, data []byte) return err } -func decodeMessageB(reader io.Reader, sizeLimit int) (uint16, []byte, error) { +func decodeMessageB( + reader io.Reader, + sizeLimit int, +) ( + method uint16, + size int64, + data io.Reader, + err error, +) { headerBuffer := [10]byte { } - _, err := io.ReadFull(reader, headerBuffer[:]) - if err != nil { return 0, nil, err } - method, err := tape.DecodeI16[uint16](headerBuffer[:2]) - if err != nil { return 0, nil, err } + _, err = io.ReadFull(reader, headerBuffer[:]) + if err != nil { return 0, 0, nil, err } + method, err = tape.DecodeI16[uint16](headerBuffer[:2]) + if err != nil { return 0, 0, nil, err } length, err := tape.DecodeI64[uint64](headerBuffer[2:10]) - if err != nil { return 0, nil, err } + if err != nil { return 0, 0, nil, err } if length > uint64(sizeLimit) { - return 0, nil, ErrPayloadTooLarge + return 0, 0, nil, ErrPayloadTooLarge } - payloadBuffer := make([]byte, int(length)) - _, err = io.ReadFull(reader, payloadBuffer) - if err != nil { return 0, nil, err } - return method, payloadBuffer, nil + return method, int64(length), &io.LimitedReader { + R: reader, + N: int64(length), + }, nil } -- 2.46.1 From db10355c84ed3cefdd8427b445b4e6df9263a3d2 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Sun, 6 Apr 2025 14:19:39 -0400 Subject: [PATCH 08/38] Change the size limit type to an int64 --- connection.go | 4 ++-- metadaptb.go | 14 ++++++-------- 2 files changed, 8 insertions(+), 10 deletions(-) diff --git a/connection.go b/connection.go index 56154a7..a2f2080 100644 --- a/connection.go +++ b/connection.go @@ -4,7 +4,7 @@ import "io" import "net" // import "time" -const defaultSizeLimit = 1024 * 1024 // 1 megabyte +const defaultSizeLimit int64 = 1024 * 1024 // 1 megabyte // Conn is a HOPP connection. type Conn interface { @@ -25,7 +25,7 @@ type Conn interface { // SetSizeLimit sets a limit (in bytes) for how large messages can be. // By default, this limit is 1 megabyte. - SetSizeLimit(limit int) + SetSizeLimit(limit int64) } // Trans is a HOPP transaction. diff --git a/metadaptb.go b/metadaptb.go index f78bb26..18776da 100644 --- a/metadaptb.go +++ b/metadaptb.go @@ -5,12 +5,10 @@ import "net" import "context" import "git.tebibyte.media/sashakoshka/hopp/tape" -// TODO: change size limit to be int64 - // B implements METADAPT-B over a multiplexed stream-oriented transport such as // QUIC. type b struct { - sizeLimit int + sizeLimit int64 underlying MultiConn } @@ -47,7 +45,7 @@ func (this *b) AcceptTrans() (Trans, error) { return this.newTrans(stream), nil } -func (this *b) SetSizeLimit(limit int) { +func (this *b) SetSizeLimit(limit int64) { this.sizeLimit = limit } @@ -59,7 +57,7 @@ func (this *b) newTrans(underlying Stream) *transB { } type transB struct { - sizeLimit int + sizeLimit int64 underlying Stream currentData io.Reader } @@ -127,8 +125,8 @@ type Stream interface { ID() int64 } -func encodeMessageB(writer io.Writer, sizeLimit int, method uint16, data []byte) error { - if len(data) > sizeLimit { +func encodeMessageB(writer io.Writer, sizeLimit int64, method uint16, data []byte) error { + if len(data) > int(sizeLimit) { return ErrPayloadTooLarge } buffer := make([]byte, 10 + len(data)) @@ -141,7 +139,7 @@ func encodeMessageB(writer io.Writer, sizeLimit int, method uint16, data []byte) func decodeMessageB( reader io.Reader, - sizeLimit int, + sizeLimit int64, ) ( method uint16, size int64, -- 2.46.1 From e4f13a41429a695d4c2fd9b50d29c2cdbb4f80b9 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Sun, 6 Apr 2025 17:01:00 -0400 Subject: [PATCH 09/38] WIP METADAPT-A changes --- metadapta.go | 108 ++++++++++++++++++++++++++++++++++++++++----------- 1 file changed, 86 insertions(+), 22 deletions(-) diff --git a/metadapta.go b/metadapta.go index 1879b15..c23d579 100644 --- a/metadapta.go +++ b/metadapta.go @@ -17,6 +17,7 @@ type Party bool; const ( ) type a struct { + sizeLimit int underlying net.Conn party Party transID int64 @@ -87,6 +88,10 @@ func (this *a) AcceptTrans() (Trans, error) { } } +func (this *a) SetSizeLimit(limit int) { + this.sizeLimit = limit +} + func (this *a) unlistTransactionSafe(id int64) { this.transLock.Lock() defer this.transLock.Unlock() @@ -110,13 +115,13 @@ func (this *a) receive() { clear(this.transMap) }() for { - transID, method, payload, err := decodeMessageA(this.underlying) + transID, method, chunked, payload, err := decodeMessageA(this.underlying) if err != nil { this.err = fmt.Errorf("could not receive message: %w", err) return } - err = this.receiveMultiplex(transID, method, payload) + err = this.receiveMultiplex(transID, method, chunked, payload) if err != nil { this.err = fmt.Errorf("could not receive message: %w", err) return @@ -124,7 +129,7 @@ func (this *a) receive() { } } -func (this *a) receiveMultiplex(transID int64, method uint16, payload []byte) error { +func (this *a) receiveMultiplex(transID int64, method uint16, chunked bool, payload []byte) error { if transID == 0 { return ErrMessageMalformed } trans, err := func() (*transA, error) { @@ -152,15 +157,17 @@ func (this *a) receiveMultiplex(transID int64, method uint16, payload []byte) er trans.incoming.Send(incomingMessage { method: method, + chunked: chunked, payload: payload, }) return nil } type transA struct { - parent *a - id int64 - incoming usync.Gate[incomingMessage] + parent *a + id int64 + incoming usync.Gate[incomingMessage] + currentReader io.Reader } func (this *transA) Close() error { @@ -183,26 +190,78 @@ func (this *transA) Send(method uint16, data []byte) error { } func (this *transA) Receive() (method uint16, data []byte, err error) { - receive := this.incoming.Receive() + method, reader, err := this.ReceiveReader() + if err != nil { return 0, nil, err } + data, err = io.ReadAll(reader) + if err != nil { return 0, nil, err } + return method, data, nil +} + +func (this *transA) ReceiveReader() (uint16, io.Reader, error) { + // drain previous reader if necessary + if this.currentReader != nil { + io.Copy(io.Discard, this.currentReader) + } + + // create new reader + reader := &readerA { + parent: this, + } + method, err := reader.pull() + if err != nil { return 0, nil, err} + this.currentReader = reader + return method, reader, nil +} + +type readerA struct { + parent *transA + leftover []byte + eof bool +} + +func (this *readerA) pull() (uint16, error) { + // if the previous message ended the chain, return an io.EOF + if this.eof { + return 0, io.EOF + } + + // get a message from the transaction we are a part of + receive := this.parent.incoming.Receive() if receive != nil { if message, ok := <- receive; ok { if message.method != closeMethod { - return message.method, message.payload, nil + this.leftover = append(this.leftover, message.payload...) + if !message.chunked { + this.eof = true + } } } } // close and return error on failure - this.Close() - if this.parent.err == nil { - return 0, nil, fmt.Errorf("could not receive message: %w", io.EOF) + this.eof = true + this.parent.Close() + if this.parent.parent.err == nil { + return 0, fmt.Errorf("could not receive message: %w", io.EOF) } else { - return 0, nil, this.parent.err + return 0, this.parent.parent.err } } +func (this *readerA) Read(buffer []byte) (int, error) { + if len(this.leftover) == 0 { + if this.eof { return 0, io.EOF } + this.pull() + } + + copied := copy(buffer, this.leftover) + this.leftover = this.leftover[copied:] + return copied, nil +} + type incomingMessage struct { method uint16 + chunked bool payload []byte } @@ -218,22 +277,27 @@ func encodeMessageA(writer io.Writer, trans int64, method uint16, data []byte) e return err } -func decodeMessageA(reader io.Reader) (int64, uint16, []byte, error) { - headerBuffer := [12]byte { } +func decodeMessageA(reader io.Reader) (int64, uint16, bool, []byte, error) { + headerBuffer := [18]byte { } _, err := io.ReadFull(reader, headerBuffer[:]) - if err != nil { return 0, 0, nil, err } + if err != nil { return 0, 0, false, nil, err } transID, err := tape.DecodeI64[int64](headerBuffer[:8]) - if err != nil { return 0, 0, nil, err } + if err != nil { return 0, 0, false, nil, err } method, err := tape.DecodeI16[uint16](headerBuffer[8:10]) - if err != nil { return 0, 0, nil, err } - length, err := tape.DecodeI16[uint16](headerBuffer[10:12]) - if err != nil { return 0, 0, nil, err } - payloadBuffer := make([]byte, int(length)) + if err != nil { return 0, 0, false, nil, err } + size, err := tape.DecodeI64[uint64](headerBuffer[10:18]) + if err != nil { return 0, 0, false, nil, err } + chunked, size := splitCCBSize(size) + payloadBuffer := make([]byte, int(size)) _, err = io.ReadFull(reader, payloadBuffer) - if err != nil { return 0, 0, nil, err } - return transID, method, payloadBuffer, nil + if err != nil { return 0, 0, false, nil, err } + return transID, method, chunked, payloadBuffer, nil } func partyFromTransID(id int64) Party { return id > 0 } + +func splitCCBSize(size uint64) (bool, uint64) { + return size >> 63 > 1, size & 0x7FFFFFFFFFFFFFFF +} -- 2.46.1 From 6de3cbbc489bf77b45dfc475cba0689ecf8fbe94 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Mon, 21 Apr 2025 19:10:45 -0400 Subject: [PATCH 10/38] Fix method signature of SetSizeLimit --- metadapta.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metadapta.go b/metadapta.go index c23d579..aeea21f 100644 --- a/metadapta.go +++ b/metadapta.go @@ -17,7 +17,7 @@ type Party bool; const ( ) type a struct { - sizeLimit int + sizeLimit int64 underlying net.Conn party Party transID int64 @@ -88,7 +88,7 @@ func (this *a) AcceptTrans() (Trans, error) { } } -func (this *a) SetSizeLimit(limit int) { +func (this *a) SetSizeLimit(limit int64) { this.sizeLimit = limit } -- 2.46.1 From 7a766b74d887a9d3165f153d10cecf20ac1b8d3c Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Mon, 21 Apr 2025 20:49:58 -0400 Subject: [PATCH 11/38] Name return values of decodeMessageA --- metadapta.go | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/metadapta.go b/metadapta.go index aeea21f..ca6a629 100644 --- a/metadapta.go +++ b/metadapta.go @@ -277,18 +277,18 @@ func encodeMessageA(writer io.Writer, trans int64, method uint16, data []byte) e return err } -func decodeMessageA(reader io.Reader) (int64, uint16, bool, []byte, error) { +func decodeMessageA(reader io.Reader) (transID int64, method uint16, chunked bool, payloadBuffer []byte, err error) { headerBuffer := [18]byte { } - _, err := io.ReadFull(reader, headerBuffer[:]) + _, err = io.ReadFull(reader, headerBuffer[:]) if err != nil { return 0, 0, false, nil, err } - transID, err := tape.DecodeI64[int64](headerBuffer[:8]) + transID, err = tape.DecodeI64[int64](headerBuffer[:8]) if err != nil { return 0, 0, false, nil, err } - method, err := tape.DecodeI16[uint16](headerBuffer[8:10]) + method, err = tape.DecodeI16[uint16](headerBuffer[8:10]) if err != nil { return 0, 0, false, nil, err } size, err := tape.DecodeI64[uint64](headerBuffer[10:18]) if err != nil { return 0, 0, false, nil, err } - chunked, size := splitCCBSize(size) - payloadBuffer := make([]byte, int(size)) + chunked, size = splitCCBSize(size) + payloadBuffer = make([]byte, int(size)) _, err = io.ReadFull(reader, payloadBuffer) if err != nil { return 0, 0, false, nil, err } return transID, method, chunked, payloadBuffer, nil -- 2.46.1 From f34620c4345cc3fbcd6355798e61123598ab24f3 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Mon, 21 Apr 2025 20:50:33 -0400 Subject: [PATCH 12/38] METADAPT-A tests run --- metadapta_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metadapta_test.go b/metadapta_test.go index 0ecebe2..93ed006 100644 --- a/metadapta_test.go +++ b/metadapta_test.go @@ -108,7 +108,7 @@ func TestEncodeMessageAErr(test *testing.T) { } func TestDecodeMessageA(test *testing.T) { - transID, method, payload, err := decodeMessageA(bytes.NewReader([]byte { + transID, method, _, payload, err := decodeMessageA(bytes.NewReader([]byte { 0x58, 0x00, 0xFE, 0xAB, 0xC3, 0x10, 0x4F, 0x04, 0x6B, 0x12, 0x00, 0x06, @@ -130,7 +130,7 @@ func TestDecodeMessageA(test *testing.T) { } func TestDecodeMessageAErr(test *testing.T) { - _, _, _, err := decodeMessageA(bytes.NewReader([]byte { + _, _, _, _, err := decodeMessageA(bytes.NewReader([]byte { 0x58, 0x00, 0xFE, 0xAB, 0xC3, 0x10, 0x4F, 0x04, 0x6B, 0x12, 0x01, 0x06, -- 2.46.1 From 945d81c5058f0ff2e1c7bb5cc378159685890688 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Mon, 21 Apr 2025 20:51:02 -0400 Subject: [PATCH 13/38] METADAPT-B tests run --- metadaptb_test.go | 5 +++-- 1 file changed, 3 insertions(+), 2 deletions(-) diff --git a/metadaptb_test.go b/metadaptb_test.go index add8043..f40f25a 100644 --- a/metadaptb_test.go +++ b/metadaptb_test.go @@ -33,7 +33,7 @@ func TestEncodeMessageBErr(test *testing.T) { } func TestDecodeMessageB(test *testing.T) { - method, payload, err := decodeMessageB(bytes.NewReader([]byte { + method, _, data, err := decodeMessageB(bytes.NewReader([]byte { 0x6B, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, @@ -44,6 +44,7 @@ func TestDecodeMessageB(test *testing.T) { if got, correct := method, uint16(0x6B12); got != correct { test.Fatalf("not equal: %v %v", got, correct) } + payload, _ := io.ReadAll(data) correctPayload := []byte { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05 } if got, correct := payload, correctPayload; !slices.Equal(got, correct) { test.Fatalf("not equal: %v %v", got, correct) @@ -51,7 +52,7 @@ func TestDecodeMessageB(test *testing.T) { } func TestDecodeMessageBErr(test *testing.T) { - _, _, err := decodeMessageB(bytes.NewReader([]byte { + _, _, _, err := decodeMessageB(bytes.NewReader([]byte { 0x6B, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x06, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, -- 2.46.1 From fac0c4e31da8f606e2c9dcdfcf63d29015e7a313 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Tue, 22 Apr 2025 19:49:24 -0400 Subject: [PATCH 14/38] Actually use defaultSizeLimit --- metadapta.go | 18 ++++++++++++++++-- 1 file changed, 16 insertions(+), 2 deletions(-) diff --git a/metadapta.go b/metadapta.go index ca6a629..e9b8e56 100644 --- a/metadapta.go +++ b/metadapta.go @@ -10,6 +10,7 @@ import "git.tebibyte.media/sashakoshka/go-util/sync" const closeMethod = 0xFFFF const int64Max = int64((^uint64(0)) >> 1) + // Party represents a side of a connection. type Party bool; const ( ServerSide Party = false @@ -33,6 +34,7 @@ type a struct { // oriented transport such as TCP or UNIX domain stream sockets. func AdaptA(underlying net.Conn, party Party) Conn { conn := &a { + sizeLimit: defaultSizeLimit, underlying: underlying, party: party, transMap: make(map[int64] *transA), @@ -115,7 +117,7 @@ func (this *a) receive() { clear(this.transMap) }() for { - transID, method, chunked, payload, err := decodeMessageA(this.underlying) + transID, method, chunked, payload, err := decodeMessageA(this.underlying, this.sizeLimit) if err != nil { this.err = fmt.Errorf("could not receive message: %w", err) return @@ -277,7 +279,16 @@ func encodeMessageA(writer io.Writer, trans int64, method uint16, data []byte) e return err } -func decodeMessageA(reader io.Reader) (transID int64, method uint16, chunked bool, payloadBuffer []byte, err error) { +func decodeMessageA( + reader io.Reader, + sizeLimit int64, +) ( + transID int64, + method uint16, + chunked bool, + payloadBuffer []byte, + err error, +) { headerBuffer := [18]byte { } _, err = io.ReadFull(reader, headerBuffer[:]) if err != nil { return 0, 0, false, nil, err } @@ -288,6 +299,9 @@ func decodeMessageA(reader io.Reader) (transID int64, method uint16, chunked boo size, err := tape.DecodeI64[uint64](headerBuffer[10:18]) if err != nil { return 0, 0, false, nil, err } chunked, size = splitCCBSize(size) + if size > uint64(sizeLimit) { + return 0, 0, false, nil, ErrPayloadTooLarge + } payloadBuffer = make([]byte, int(size)) _, err = io.ReadFull(reader, payloadBuffer) if err != nil { return 0, 0, false, nil, err } -- 2.46.1 From 46c6361602de96578c53b98aab8ac7062e831390 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Tue, 22 Apr 2025 20:10:57 -0400 Subject: [PATCH 15/38] Encode METADAPT-A MMBs properly lmao --- metadapta.go | 8 +++----- 1 file changed, 3 insertions(+), 5 deletions(-) diff --git a/metadapta.go b/metadapta.go index e9b8e56..265b027 100644 --- a/metadapta.go +++ b/metadapta.go @@ -268,13 +268,11 @@ type incomingMessage struct { } func encodeMessageA(writer io.Writer, trans int64, method uint16, data []byte) error { - buffer := make([]byte, 12 + len(data)) + buffer := make([]byte, 18 + len(data)) tape.EncodeI64(buffer[:8], trans) tape.EncodeI16(buffer[8:10], method) - length, ok := tape.U16CastSafe(len(data)) - if !ok { return ErrPayloadTooLarge } - tape.EncodeI16(buffer[10:12], length) - copy(buffer[12:], data) + tape.EncodeI64(buffer[10:18], uint64(len(data))) + copy(buffer[18:], data) _, err := writer.Write(buffer) return err } -- 2.46.1 From cbaff8b593fb4c6db83f4c1f9cb086e0a66be574 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Fri, 25 Apr 2025 14:15:53 -0400 Subject: [PATCH 16/38] Allow readerA.pull to return an actual result --- metadapta.go | 1 + 1 file changed, 1 insertion(+) diff --git a/metadapta.go b/metadapta.go index 265b027..aa941e3 100644 --- a/metadapta.go +++ b/metadapta.go @@ -236,6 +236,7 @@ func (this *readerA) pull() (uint16, error) { if !message.chunked { this.eof = true } + return message.method, nil } } } -- 2.46.1 From 8fe3ba8d4f05e92d31264314b725a93c7e5019a9 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Fri, 25 Apr 2025 15:07:47 -0400 Subject: [PATCH 17/38] Close METADAPT-A transaction channel --- metadapta.go | 1 + 1 file changed, 1 insertion(+) diff --git a/metadapta.go b/metadapta.go index aa941e3..20074b3 100644 --- a/metadapta.go +++ b/metadapta.go @@ -109,6 +109,7 @@ func (this *a) sendMessageSafe(trans int64, method uint16, data []byte) error { func (this *a) receive() { defer func() { this.underlying.Close() + close(this.transChan) this.transLock.Lock() defer this.transLock.Unlock() for _, trans := range this.transMap { -- 2.46.1 From 86cf3ee89dc72af6e9125f88acc9dff1f93faa2a Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Fri, 25 Apr 2025 15:08:31 -0400 Subject: [PATCH 18/38] Make the TestConnA pass --- metadapta_test.go | 26 +++++++++++++++----------- 1 file changed, 15 insertions(+), 11 deletions(-) diff --git a/metadapta_test.go b/metadapta_test.go index 93ed006..d3741ca 100644 --- a/metadapta_test.go +++ b/metadapta_test.go @@ -31,21 +31,23 @@ func TestConnA(test *testing.T) { // server listener, err := net.Listen(network, addr) if err != nil { test.Fatal(err) } - defer listener.Close() + test.Cleanup(func() { listener.Close() }) go func() { test.Log("SERVER listening") conn, err := listener.Accept() if err != nil { test.Error("SERVER", err); return } defer conn.Close() + test.Cleanup(func() { conn.Close() }) a := AdaptA(conn, ServerSide) trans, err := a.OpenTrans() if err != nil { test.Error("SERVER", err); return } - defer trans.Close() + test.Cleanup(func() { trans.Close() }) for method, payload := range payloads { - test.Log("SERVER", method, payload) + test.Log("SERVER m:", method, "p:", payload) err := trans.Send(uint16(method), []byte(payload)) if err != nil { test.Error("SERVER", err); return } } + test.Log("SERVER closing connection") }() // client @@ -54,18 +56,18 @@ func TestConnA(test *testing.T) { if err != nil { test.Fatal("CLIENT", err) } test.Log("CLIENT dialed") a := AdaptA(conn, ClientSide) - defer a.Close() + test.Cleanup(func() { a.Close() }) test.Log("CLIENT accepting transaction") trans, err := a.AcceptTrans() if err != nil { test.Fatal("CLIENT", err) } test.Log("CLIENT accepted transaction") - defer trans.Close() + test.Cleanup(func() { trans.Close() }) for method, payload := range payloads { test.Log("CLIENT waiting...") gotMethod, gotPayloadBytes, err := trans.Receive() if err != nil { test.Fatal("CLIENT", err) } gotPayload := string(gotPayloadBytes) - test.Log("CLIENT", gotMethod, gotPayload) + test.Log("CLIENT m:", gotMethod, "p:", gotPayload) if int(gotMethod) != method { test.Errorf("CLIENT method not equal") } @@ -73,11 +75,13 @@ func TestConnA(test *testing.T) { test.Errorf("CLIENT payload not equal") } } + test.Log("CLIENT waiting for connection close...") _, _, err = trans.Receive() if !errors.Is(err, io.EOF) { test.Fatal("CLIENT wrong error:", err) } test.Log("CLIENT done") + conn.Close() } func TestEncodeMessageA(test *testing.T) { @@ -87,7 +91,7 @@ func TestEncodeMessageA(test *testing.T) { correct := []byte { 0x58, 0x00, 0xFE, 0xAB, 0xC3, 0x10, 0x4F, 0x04, 0x6B, 0x12, - 0x00, 0x06, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, } if err != nil { @@ -111,9 +115,9 @@ func TestDecodeMessageA(test *testing.T) { transID, method, _, payload, err := decodeMessageA(bytes.NewReader([]byte { 0x58, 0x00, 0xFE, 0xAB, 0xC3, 0x10, 0x4F, 0x04, 0x6B, 0x12, - 0x00, 0x06, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, - })) + }), defaultSizeLimit) if err != nil { test.Fatal(err) } @@ -133,9 +137,9 @@ func TestDecodeMessageAErr(test *testing.T) { _, _, _, _, err := decodeMessageA(bytes.NewReader([]byte { 0x58, 0x00, 0xFE, 0xAB, 0xC3, 0x10, 0x4F, 0x04, 0x6B, 0x12, - 0x01, 0x06, + 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x06, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, - })) + }), defaultSizeLimit) if !errors.Is(err, io.ErrUnexpectedEOF) { test.Fatalf("wrong error: %v", err) } -- 2.46.1 From 9bf0c596ba79f1790ecbaaee79092212175e2e2d Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Fri, 25 Apr 2025 15:12:01 -0400 Subject: [PATCH 19/38] Make TestEncodeMessageAErr pass --- metadapta.go | 13 +++++++++++-- 1 file changed, 11 insertions(+), 2 deletions(-) diff --git a/metadapta.go b/metadapta.go index 20074b3..503e8af 100644 --- a/metadapta.go +++ b/metadapta.go @@ -103,7 +103,7 @@ func (this *a) unlistTransactionSafe(id int64) { func (this *a) sendMessageSafe(trans int64, method uint16, data []byte) error { this.sendLock.Lock() defer this.sendLock.Unlock() - return encodeMessageA(this.underlying, trans, method, data) + return encodeMessageA(this.underlying, this.sizeLimit, trans, method, data) } func (this *a) receive() { @@ -269,7 +269,16 @@ type incomingMessage struct { payload []byte } -func encodeMessageA(writer io.Writer, trans int64, method uint16, data []byte) error { +func encodeMessageA( + writer io.Writer, + sizeLimit int64, + trans int64, + method uint16, + data []byte, +) error { + if int64(len(data)) > sizeLimit { + return ErrPayloadTooLarge + } buffer := make([]byte, 18 + len(data)) tape.EncodeI64(buffer[:8], trans) tape.EncodeI16(buffer[8:10], method) -- 2.46.1 From f6fe9c307d931fba212a068d48ecf7777607fcdc Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Fri, 25 Apr 2025 15:17:32 -0400 Subject: [PATCH 20/38] This should have been in the last commit --- metadapta_test.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/metadapta_test.go b/metadapta_test.go index d3741ca..4cb8fdb 100644 --- a/metadapta_test.go +++ b/metadapta_test.go @@ -87,7 +87,7 @@ func TestConnA(test *testing.T) { func TestEncodeMessageA(test *testing.T) { buffer := new(bytes.Buffer) payload := []byte { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05 } - err := encodeMessageA(buffer, 0x5800FEABC3104F04, 0x6B12, payload) + err := encodeMessageA(buffer, defaultSizeLimit, 0x5800FEABC3104F04, 0x6B12, payload) correct := []byte { 0x58, 0x00, 0xFE, 0xAB, 0xC3, 0x10, 0x4F, 0x04, 0x6B, 0x12, @@ -105,7 +105,7 @@ func TestEncodeMessageA(test *testing.T) { func TestEncodeMessageAErr(test *testing.T) { buffer := new(bytes.Buffer) payload := make([]byte, 0x10000) - err := encodeMessageA(buffer, 0x5800FEABC3104F04, 0x6B12, payload) + err := encodeMessageA(buffer, 0x20, 0x5800FEABC3104F04, 0x6B12, payload) if !errors.Is(err, ErrPayloadTooLarge) { test.Fatalf("wrong error: %v", err) } -- 2.46.1 From 87c4ac8efba8f5ce40d8f7347ee37b0d370f0629 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Fri, 25 Apr 2025 15:21:52 -0400 Subject: [PATCH 21/38] More robust integer comparison --- metadaptb.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadaptb.go b/metadaptb.go index 18776da..fdf2ba2 100644 --- a/metadaptb.go +++ b/metadaptb.go @@ -126,7 +126,7 @@ type Stream interface { } func encodeMessageB(writer io.Writer, sizeLimit int64, method uint16, data []byte) error { - if len(data) > int(sizeLimit) { + if int64(len(data)) > sizeLimit { return ErrPayloadTooLarge } buffer := make([]byte, 10 + len(data)) -- 2.46.1 From 47645a8fce7a65b7c67a349f4640f2b27a94abb0 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Fri, 25 Apr 2025 15:26:12 -0400 Subject: [PATCH 22/38] Pass TestDecodeMessageBErr --- metadaptb.go | 6 +++++- metadaptb_test.go | 4 +--- 2 files changed, 6 insertions(+), 4 deletions(-) diff --git a/metadaptb.go b/metadaptb.go index fdf2ba2..867dedb 100644 --- a/metadaptb.go +++ b/metadaptb.go @@ -2,6 +2,7 @@ package hopp import "io" import "net" +import "errors" import "context" import "git.tebibyte.media/sashakoshka/hopp/tape" @@ -148,7 +149,10 @@ func decodeMessageB( ) { headerBuffer := [10]byte { } _, err = io.ReadFull(reader, headerBuffer[:]) - if err != nil { return 0, 0, nil, err } + if err != nil { + if errors.Is(err, io.EOF) { return 0, 0, nil, io.ErrUnexpectedEOF } + return 0, 0, nil, err + } method, err = tape.DecodeI16[uint16](headerBuffer[:2]) if err != nil { return 0, 0, nil, err } length, err := tape.DecodeI64[uint64](headerBuffer[2:10]) diff --git a/metadaptb_test.go b/metadaptb_test.go index f40f25a..cad341f 100644 --- a/metadaptb_test.go +++ b/metadaptb_test.go @@ -53,9 +53,7 @@ func TestDecodeMessageB(test *testing.T) { func TestDecodeMessageBErr(test *testing.T) { _, _, _, err := decodeMessageB(bytes.NewReader([]byte { - 0x6B, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, - 0x01, 0x06, - 0x00, 0x01, 0x02, 0x03, 0x04, 0x05, + 0x6B, 0x12, 0x00, 0x00, 0x00, 0x00, }), defaultSizeLimit) if !errors.Is(err, io.ErrUnexpectedEOF) { test.Fatalf("wrong error: %v", err) -- 2.46.1 From c51a81bc134c88e665c54a1790543d5db00b98dd Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Fri, 25 Apr 2025 16:02:09 -0400 Subject: [PATCH 23/38] Add a SendWriter method to Trans --- connection.go | 3 +++ 1 file changed, 3 insertions(+) diff --git a/connection.go b/connection.go index a2f2080..e1b07c4 100644 --- a/connection.go +++ b/connection.go @@ -42,6 +42,9 @@ type Trans interface { // Send sends a message. Send(method uint16, data []byte) error + // SendWriter sends data written to an [io.Writer]. Any writer + // previously opened through this function will be discarded. + SendWriter(method uint16) (io.Writer, error) // Receive receives a message. Receive() (method uint16, data []byte, err error) // ReceiveReader receives a message as an [io.Reader]. Any reader -- 2.46.1 From 8a3df95491f47b1f85d5e38b5f78f31e24ccec65 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Fri, 25 Apr 2025 16:06:17 -0400 Subject: [PATCH 24/38] Clarify concurrency in Trans methods --- connection.go | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/connection.go b/connection.go index e1b07c4..05dec8a 100644 --- a/connection.go +++ b/connection.go @@ -28,26 +28,31 @@ type Conn interface { SetSizeLimit(limit int64) } -// Trans is a HOPP transaction. +// Trans is a HOPP transaction. Methods of this interface are not safe for +// concurrent use with the exception of the Close and ID methods. The +// recommended use case is one goroutine per transaction. type Trans interface { // Close closes the transaction. Any blocked operations will be - // unblocked and return errors. + // unblocked and return errors. This method is safe for concurrent use. Close() error // ID returns the transaction ID. This must not change, and it must be - // unique within the connection. + // unique within the connection. This method is safe for concurrent use. ID() int64 // TODO: add methods for setting send and receive deadlines - // Send sends a message. + // Send sends a message. This method is not safe for concurrent use. Send(method uint16, data []byte) error // SendWriter sends data written to an [io.Writer]. Any writer - // previously opened through this function will be discarded. + // previously opened through this function will be discarded. This + // method is not safe for concurrent use, and neither is its result. SendWriter(method uint16) (io.Writer, error) - // Receive receives a message. + // Receive receives a message. This method is not safe for concurrent + // use. Receive() (method uint16, data []byte, err error) // ReceiveReader receives a message as an [io.Reader]. Any reader - // previously opened through this function will be discarded. + // previously opened through this function will be discarded. This + // method is not safe for concurrent use, and neither is its result. ReceiveReader() (method uint16, data io.Reader, err error) } -- 2.46.1 From 41f5cfefabd828496147c10c6224eed0eb406227 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Fri, 25 Apr 2025 17:53:12 -0400 Subject: [PATCH 25/38] Implement SendWriter for METADAPT-A --- metadapta.go | 76 +++++++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 75 insertions(+), 1 deletion(-) diff --git a/metadapta.go b/metadapta.go index 503e8af..03bef8e 100644 --- a/metadapta.go +++ b/metadapta.go @@ -9,7 +9,7 @@ import "git.tebibyte.media/sashakoshka/go-util/sync" const closeMethod = 0xFFFF const int64Max = int64((^uint64(0)) >> 1) - +const defaultChunkSize = 0x1000 // Party represents a side of a connection. type Party bool; const ( @@ -171,6 +171,8 @@ type transA struct { id int64 incoming usync.Gate[incomingMessage] currentReader io.Reader + currentWriter io.Closer + writeBuffer []byte } func (this *transA) Close() error { @@ -192,6 +194,27 @@ func (this *transA) Send(method uint16, data []byte) error { return this.parent.sendMessageSafe(this.id, method, data) } +func (this *transA) SendWriter(method uint16) (io.Writer, error) { + // close previous writer if necessary + if this.currentWriter != nil { + this.currentWriter.Close() + this.currentWriter = nil + } + + // create new writer + writer := &writerA { + parent: this, + // there is only ever one writer at a time, so they can all + // share a buffer + buffer: this.writeBuffer[:0], + method: method, + chunkSize: defaultChunkSize, + open: true, + } + this.currentWriter = writer + return writer, nil +} + func (this *transA) Receive() (method uint16, data []byte, err error) { method, reader, err := this.ReceiveReader() if err != nil { return 0, nil, err } @@ -263,6 +286,57 @@ func (this *readerA) Read(buffer []byte) (int, error) { return copied, nil } +type writerA struct { + parent *transA + buffer []byte + method uint16 + chunkSize int64 + open bool +} + +func (this *writerA) Write(data []byte) (n int, err error) { + if !this.open { return 0, io.EOF } + toSend := data + for len(toSend) > 0 { + nn, err := this.writeOne(toSend) + n += nn + toSend = toSend[nn:] + if err != nil { return n, err } + } + return n, nil +} + +func (this *writerA) Close() error { + this.open = false + return nil +} + +func (this *writerA) writeOne(data []byte) (n int, err error) { + data = data[:min(len(data), int(this.chunkSize))] + + // if there is more room, append to the buffer and exit + if int64(len(this.buffer) + len(data)) <= this.chunkSize { + this.buffer = append(this.buffer, data...) + n = len(data) + // if have a full chunk, flush + if int64(len(this.buffer)) == this.chunkSize { + err = this.flush() + if err != nil { return n, err } + } + return n, nil + } + + // if not, flush and store as much as we can in the buffer + err = this.flush() + if err != nil { return n, err } + this.buffer = append(this.buffer, data...) + return n, nil +} + +func (this *writerA) flush() error { + return this.parent.parent.sendMessageSafe(this.parent.id, this.method, this.buffer) +} + type incomingMessage struct { method uint16 chunked bool -- 2.46.1 From dd89245c34da151ae17e45af4a785a4a8ea39632 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Fri, 25 Apr 2025 18:06:00 -0400 Subject: [PATCH 26/38] Change the result of Trans.SendWriter to a WriteCloser --- connection.go | 10 ++++++---- 1 file changed, 6 insertions(+), 4 deletions(-) diff --git a/connection.go b/connection.go index 05dec8a..7c82c8f 100644 --- a/connection.go +++ b/connection.go @@ -44,10 +44,12 @@ type Trans interface { // Send sends a message. This method is not safe for concurrent use. Send(method uint16, data []byte) error - // SendWriter sends data written to an [io.Writer]. Any writer - // previously opened through this function will be discarded. This - // method is not safe for concurrent use, and neither is its result. - SendWriter(method uint16) (io.Writer, error) + // SendWriter sends data written to an [io.Writer]. The writer must be + // closed after use. Closing the writer flushes any data that hasn't + // been written yet. Any writer previously opened through this function + // will be discarded. This method is not safe for concurrent use, and + // neither is its result. + SendWriter(method uint16) (io.WriteCloser, error) // Receive receives a message. This method is not safe for concurrent // use. Receive() (method uint16, data []byte, err error) -- 2.46.1 From 9d2bbec7f9cb6c0db221499e5b3f06cd72f91c31 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Fri, 25 Apr 2025 18:14:47 -0400 Subject: [PATCH 27/38] Update METADAPT-A implementation --- metadapta.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/metadapta.go b/metadapta.go index 03bef8e..55f1c0c 100644 --- a/metadapta.go +++ b/metadapta.go @@ -194,7 +194,7 @@ func (this *transA) Send(method uint16, data []byte) error { return this.parent.sendMessageSafe(this.id, method, data) } -func (this *transA) SendWriter(method uint16) (io.Writer, error) { +func (this *transA) SendWriter(method uint16) (io.WriteCloser, error) { // close previous writer if necessary if this.currentWriter != nil { this.currentWriter.Close() -- 2.46.1 From 7a0bf64c17a4dd86e8ad518addda487805b83fe7 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Fri, 25 Apr 2025 18:15:38 -0400 Subject: [PATCH 28/38] Implement SendWriter for METADAPT-B --- metadaptb.go | 40 +++++++++++++++++++++++++++++++++++++--- 1 file changed, 37 insertions(+), 3 deletions(-) diff --git a/metadaptb.go b/metadaptb.go index 867dedb..c91db72 100644 --- a/metadaptb.go +++ b/metadaptb.go @@ -2,6 +2,7 @@ package hopp import "io" import "net" +import "bytes" import "errors" import "context" import "git.tebibyte.media/sashakoshka/hopp/tape" @@ -58,9 +59,10 @@ func (this *b) newTrans(underlying Stream) *transB { } type transB struct { - sizeLimit int64 - underlying Stream - currentData io.Reader + sizeLimit int64 + underlying Stream + currentData io.Reader + currentWriter *writerB } func (this *transB) Close() error { @@ -75,6 +77,24 @@ func (this *transB) Send(method uint16, data []byte) error { return encodeMessageB(this.underlying, this.sizeLimit, method, data) } +func (this *transB) SendWriter(method uint16) (io.WriteCloser, error) { + if this.currentWriter != nil { + this.currentWriter.Close() + } + // TODO: come up with a fix that allows us to pipe data through the + // writer. as of now, it just reads whatever is written into a buffer + // and sends the message on close. we should probably introduce chunked + // encoding to METADAPT-B to fix this. the implementation would be + // simpler than on METADAPT-A, but most of the code could just be + // copied over. + writer := &writerB { + parent: this, + method: method, + } + this.currentWriter = writer + return writer, nil +} + func (this *transB) Receive() (uint16, []byte, error) { // get a reader for the next message method, size, data, err := this.receiveReader() @@ -105,6 +125,20 @@ func (this *transB) receiveReader() (uint16, int64, io.Reader, error) { return method, size, data, nil } +type writerB struct { + parent *transB + buffer bytes.Buffer + method uint16 +} + +func (this *writerB) Write(data []byte) (int, error) { + return this.buffer.Write(data) +} + +func (this *writerB) Close() error { + return this.parent.Send(this.method, this.buffer.Bytes()) +} + // MultiConn represens a multiplexed stream-oriented transport for use in // [AdaptB]. type MultiConn interface { -- 2.46.1 From c0bfcc02f7762e8609562126a1c911d0cc78ae95 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Fri, 25 Apr 2025 18:19:43 -0400 Subject: [PATCH 29/38] Send a close message when METADAPT-A transactions close --- metadapta.go | 1 + 1 file changed, 1 insertion(+) diff --git a/metadapta.go b/metadapta.go index 55f1c0c..41ce59d 100644 --- a/metadapta.go +++ b/metadapta.go @@ -183,6 +183,7 @@ func (this *transA) Close() error { func (this *transA) closeDontUnlist() error { this.Send(closeMethod, nil) + this.parent.sendMessageSafe(this.id, 0xFFFF, nil) return this.incoming.Close() } -- 2.46.1 From a83aedc1287b066774ebd7e31f85678de6c167c6 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Fri, 25 Apr 2025 18:38:01 -0400 Subject: [PATCH 30/38] Break METADAPT-A client/server environment from TestConnA --- metadapta_test.go | 115 +++++++++++++++++++++++++++------------------- 1 file changed, 68 insertions(+), 47 deletions(-) diff --git a/metadapta_test.go b/metadapta_test.go index 4cb8fdb..c85d6d9 100644 --- a/metadapta_test.go +++ b/metadapta_test.go @@ -24,21 +24,34 @@ func TestConnA(test *testing.T) { "world", "When the impostor is sus!", } - - network := "tcp" - addr := "localhost:7959" - // server - listener, err := net.Listen(network, addr) - if err != nil { test.Fatal(err) } - test.Cleanup(func() { listener.Close() }) - go func() { - test.Log("SERVER listening") - conn, err := listener.Accept() - if err != nil { test.Error("SERVER", err); return } - defer conn.Close() - test.Cleanup(func() { conn.Close() }) - a := AdaptA(conn, ServerSide) + clientFunc := func(a Conn) { + test.Log("CLIENT accepting transaction") + trans, err := a.AcceptTrans() + if err != nil { test.Fatal("CLIENT", err) } + test.Log("CLIENT accepted transaction") + test.Cleanup(func() { trans.Close() }) + for method, payload := range payloads { + test.Log("CLIENT waiting...") + gotMethod, gotPayloadBytes, err := trans.Receive() + if err != nil { test.Fatal("CLIENT", err) } + gotPayload := string(gotPayloadBytes) + test.Log("CLIENT m:", gotMethod, "p:", gotPayload) + if int(gotMethod) != method { + test.Errorf("CLIENT method not equal") + } + if gotPayload != payload { + test.Errorf("CLIENT payload not equal") + } + } + test.Log("CLIENT waiting for transaction close...") + _, _, err = trans.Receive() + if !errors.Is(err, io.EOF) { + test.Fatal("CLIENT wrong error:", err) + } + } + + serverFunc := func(a Conn) { trans, err := a.OpenTrans() if err != nil { test.Error("SERVER", err); return } test.Cleanup(func() { trans.Close() }) @@ -48,40 +61,9 @@ func TestConnA(test *testing.T) { if err != nil { test.Error("SERVER", err); return } } test.Log("SERVER closing connection") - }() + } - // client - test.Log("CLIENT dialing") - conn, err := net.Dial(network, addr) - if err != nil { test.Fatal("CLIENT", err) } - test.Log("CLIENT dialed") - a := AdaptA(conn, ClientSide) - test.Cleanup(func() { a.Close() }) - test.Log("CLIENT accepting transaction") - trans, err := a.AcceptTrans() - if err != nil { test.Fatal("CLIENT", err) } - test.Log("CLIENT accepted transaction") - test.Cleanup(func() { trans.Close() }) - for method, payload := range payloads { - test.Log("CLIENT waiting...") - gotMethod, gotPayloadBytes, err := trans.Receive() - if err != nil { test.Fatal("CLIENT", err) } - gotPayload := string(gotPayloadBytes) - test.Log("CLIENT m:", gotMethod, "p:", gotPayload) - if int(gotMethod) != method { - test.Errorf("CLIENT method not equal") - } - if gotPayload != payload { - test.Errorf("CLIENT payload not equal") - } - } - test.Log("CLIENT waiting for connection close...") - _, _, err = trans.Receive() - if !errors.Is(err, io.EOF) { - test.Fatal("CLIENT wrong error:", err) - } - test.Log("CLIENT done") - conn.Close() + clientServerEnvironment(test, clientFunc, serverFunc) } func TestEncodeMessageA(test *testing.T) { @@ -144,3 +126,42 @@ func TestDecodeMessageAErr(test *testing.T) { test.Fatalf("wrong error: %v", err) } } + +func clientServerEnvironment(test *testing.T, clientFunc func(conn Conn), serverFunc func(conn Conn)) { + network := "tcp" + addr := "localhost:7959" + + // server + listener, err := net.Listen(network, addr) + if err != nil { test.Fatal(err) } + test.Cleanup(func() { listener.Close() }) + go func() { + test.Log("SERVER listening") + conn, err := listener.Accept() + if err != nil { test.Error("SERVER", err); return } + defer conn.Close() + test.Cleanup(func() { conn.Close() }) + a := AdaptA(conn, ServerSide) + test.Cleanup(func() { a.Close() }) + + serverFunc(a) + }() + + // client + test.Log("CLIENT dialing") + conn, err := net.Dial(network, addr) + if err != nil { test.Fatal("CLIENT", err) } + test.Log("CLIENT dialed") + a := AdaptA(conn, ClientSide) + test.Cleanup(func() { a.Close() }) + + clientFunc(a) + + test.Log("CLIENT waiting for connection close...") + _, err = a.AcceptTrans() + if !errors.Is(err, io.EOF) { + test.Fatal("CLIENT wrong error:", err) + } + test.Log("CLIENT DONE") + conn.Close() +} -- 2.46.1 From 23c37c3d1f50bbdf2503f26586f504c2a6c93cdb Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Fri, 25 Apr 2025 19:57:33 -0400 Subject: [PATCH 31/38] Fix transaction ID counting --- metadapta.go | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/metadapta.go b/metadapta.go index 41ce59d..8250478 100644 --- a/metadapta.go +++ b/metadapta.go @@ -66,18 +66,21 @@ func (this *a) RemoteAddr() net.Addr { func (this *a) OpenTrans() (Trans, error) { this.transLock.Lock() defer this.transLock.Unlock() + if this.transID == int64Max { + return nil, fmt.Errorf("could not open transaction: %w", ErrIntegerOverflow) + } id := this.transID - this.transID ++ trans := &transA { parent: this, id: id, incoming: usync.NewGate[incomingMessage](), } this.transMap[id] = trans - if this.transID == int64Max { - return nil, fmt.Errorf("could not open transaction: %w", ErrIntegerOverflow) + if this.party == ClientSide { + this.transID ++ + } else { + this.transID -- } - this.transID ++ return trans, nil } -- 2.46.1 From d60beccbcd53d86ee9373d35472be8a3e2155e1a Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Wed, 14 May 2025 13:44:06 -0400 Subject: [PATCH 32/38] Finally fix A... this took too long --- metadapta.go | 79 +++++++++++++++++++++++++++------ metadapta_test.go | 108 ++++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 169 insertions(+), 18 deletions(-) diff --git a/metadapta.go b/metadapta.go index 8250478..9739500 100644 --- a/metadapta.go +++ b/metadapta.go @@ -4,9 +4,13 @@ import "io" import "fmt" import "net" import "sync" +import "sync/atomic" import "git.tebibyte.media/sashakoshka/hopp/tape" import "git.tebibyte.media/sashakoshka/go-util/sync" +// TODO investigate why 30 never reaches the server, causing it to wait for ever +// and never close the connection, causing the client to also wait forever + const closeMethod = 0xFFFF const int64Max = int64((^uint64(0)) >> 1) const defaultChunkSize = 0x1000 @@ -17,6 +21,14 @@ type Party bool; const ( ClientSide Party = true ) +func (party Party) String() string { + if party == ServerSide { + return "server" + } else { + return "client" + } +} + type a struct { sizeLimit int64 underlying net.Conn @@ -52,7 +64,7 @@ func AdaptA(underlying net.Conn, party Party) Conn { func (this *a) Close() error { close(this.done) - return this.underlying.Close() + return nil } func (this *a) LocalAddr() net.Addr { @@ -85,11 +97,15 @@ func (this *a) OpenTrans() (Trans, error) { } func (this *a) AcceptTrans() (Trans, error) { + eof := fmt.Errorf("could not accept transaction: %w", io.EOF) select { case trans := <- this.transChan: + if trans == nil { + return nil, eof + } return trans, nil case <- this.done: - return nil, fmt.Errorf("could not accept transaction: %w", io.EOF) + return nil, eof } } @@ -119,7 +135,11 @@ func (this *a) receive() { trans.closeDontUnlist() } clear(this.transMap) + this.underlying.Close() }() + + // receive MMBs in a loop and forward them to transactions until shit + // starts closing for { transID, method, chunked, payload, err := decodeMessageA(this.underlying, this.sizeLimit) if err != nil { @@ -127,7 +147,7 @@ func (this *a) receive() { return } - err = this.receiveMultiplex(transID, method, chunked, payload) + err = this.multiplexMMB(transID, method, chunked, payload) if err != nil { this.err = fmt.Errorf("could not receive message: %w", err) return @@ -135,7 +155,7 @@ func (this *a) receive() { } } -func (this *a) receiveMultiplex(transID int64, method uint16, chunked bool, payload []byte) error { +func (this *a) multiplexMMB(transID int64, method uint16, chunked bool, payload []byte) error { if transID == 0 { return ErrMessageMalformed } trans, err := func() (*transA, error) { @@ -144,6 +164,12 @@ func (this *a) receiveMultiplex(transID int64, method uint16, chunked bool, payl trans, ok := this.transMap[transID] if !ok { + // check if this is a superfluous close message and just + // do nothing if so + if method == closeMethod { + return nil, nil + } + // it is forbidden for the other party to initiate a transaction // with an ID from this party if this.party == partyFromTransID(transID) { @@ -161,14 +187,24 @@ func (this *a) receiveMultiplex(transID int64, method uint16, chunked bool, payl }() if err != nil { return err } - trans.incoming.Send(incomingMessage { - method: method, - chunked: chunked, - payload: payload, - }) + if trans == nil { + return nil + } + + if method == closeMethod { + return trans.Close() + } else { + trans.incoming.Send(incomingMessage { + method: method, + chunked: chunked, + payload: payload, + }) + } return nil } +// most methods in transA don't need to be goroutine safe except those marked +// as such type transA struct { parent *a id int64 @@ -176,18 +212,24 @@ type transA struct { currentReader io.Reader currentWriter io.Closer writeBuffer []byte + closed atomic.Bool } func (this *transA) Close() error { + // MUST be goroutine safe err := this.closeDontUnlist() this.parent.unlistTransactionSafe(this.ID()) return err } -func (this *transA) closeDontUnlist() error { - this.Send(closeMethod, nil) - this.parent.sendMessageSafe(this.id, 0xFFFF, nil) - return this.incoming.Close() +func (this *transA) closeDontUnlist() (err error) { + // MUST be goroutine safe + this.incoming.Close() + if !this.closed.Load() { + err = this.Send(closeMethod, nil) + } + this.closed.Store(true) + return err } func (this *transA) ID() int64 { @@ -228,6 +270,11 @@ func (this *transA) Receive() (method uint16, data []byte, err error) { } func (this *transA) ReceiveReader() (uint16, io.Reader, error) { + // if the transaction has been closed, return an io.EOF + if this.closed.Load() { + return 0, nil, io.EOF + } + // drain previous reader if necessary if this.currentReader != nil { io.Copy(io.Discard, this.currentReader) @@ -249,13 +296,14 @@ type readerA struct { eof bool } +// pull pulls the next MMB in this message from the transaction. func (this *readerA) pull() (uint16, error) { // if the previous message ended the chain, return an io.EOF if this.eof { return 0, io.EOF } - // get a message from the transaction we are a part of + // get an MMB from the transaction we are a part of receive := this.parent.incoming.Receive() if receive != nil { if message, ok := <- receive; ok { @@ -265,6 +313,9 @@ func (this *readerA) pull() (uint16, error) { this.eof = true } return message.method, nil + } else { + // signal parent transaction of closure + this.parent.closed.Store(true) } } } diff --git a/metadapta_test.go b/metadapta_test.go index c85d6d9..62dfdd9 100644 --- a/metadapta_test.go +++ b/metadapta_test.go @@ -45,9 +45,12 @@ func TestConnA(test *testing.T) { } } test.Log("CLIENT waiting for transaction close...") - _, _, err = trans.Receive() + gotMethod, gotPayload, err := trans.Receive() if !errors.Is(err, io.EOF) { - test.Fatal("CLIENT wrong error:", err) + test.Error("CLIENT wrong error:", err) + test.Error("CLIENT method:", gotMethod) + test.Error("CLIENT payload:", gotPayload) + test.Fatal("CLIENT ok byeeeeeeeeeeeee") } } @@ -66,6 +69,79 @@ func TestConnA(test *testing.T) { clientServerEnvironment(test, clientFunc, serverFunc) } +func TestTransOpenCloseA(test *testing.T) { + // currently: + // + // | data sent | data recvd | close sent | close recvd + // 10 | X | X | X | server hangs + // 20 | X | X | X | client hangs + // 30 | X | | X | + // + // when a close message is recvd, it tries to push to the trans and + // hangs on trans.incoming.Send, which hangs on sending the value to the + // underlying channel. why is this? + // + // check if we are really getting values from the channel when pulling + // from the trans channel when we are expecting a close. + + clientFunc := func(conn Conn) { + // 10 + trans, err := conn.OpenTrans() + if err != nil { test.Error("CLIENT", err); return } + test.Log("CLIENT sending 10") + trans.Send(10, []byte("hi")) + trans.Close() + + // 20 + test.Log("CLIENT awaiting 20") + trans, err = conn.AcceptTrans() + if err != nil { test.Error("CLIENT", err); return } + test.Cleanup(func() { trans.Close() }) + gotMethod, gotPayload, err := trans.Receive() + if err != nil { test.Error("CLIENT", err); return } + test.Logf("CLIENT m: %d p: %s", gotMethod, gotPayload) + if gotMethod != 20 { test.Error("CLIENT wrong method")} + + // 30 + trans, err = conn.OpenTrans() + if err != nil { test.Error("CLIENT", err); return } + test.Log("CLIENT sending 30") + trans.Send(30, []byte("good")) + trans.Close() + } + + serverFunc := func(conn Conn) { + // 10 + test.Log("SERVER awaiting 10") + trans, err := conn.AcceptTrans() + if err != nil { test.Error("SERVER", err); return } + test.Cleanup(func() { trans.Close() }) + gotMethod, gotPayload, err := trans.Receive() + if err != nil { test.Error("SERVER", err); return } + test.Logf("SERVER m: %d p: %s", gotMethod, gotPayload) + if gotMethod != 10 { test.Error("SERVER wrong method")} + + // 20 + trans, err = conn.OpenTrans() + if err != nil { test.Error("SERVER", err); return } + test.Log("SERVER sending 20") + trans.Send(20, []byte("hi how r u")) + trans.Close() + + // 30 + test.Log("SERVER awaiting 30") + trans, err = conn.AcceptTrans() + if err != nil { test.Error("SERVER", err); return } + test.Cleanup(func() { trans.Close() }) + gotMethod, gotPayload, err = trans.Receive() + if err != nil { test.Error("SERVER", err); return } + test.Logf("SERVER m: %d p: %s", gotMethod, gotPayload) + if gotMethod != 30 { test.Error("SERVER wrong method")} + } + + clientServerEnvironment(test, clientFunc, serverFunc) +} + func TestEncodeMessageA(test *testing.T) { buffer := new(bytes.Buffer) payload := []byte { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05 } @@ -127,6 +203,28 @@ func TestDecodeMessageAErr(test *testing.T) { } } +func TestEncodeDecodeMessageA(test *testing.T) { + correctTransID := int64(2) + correctMethod := uint16(30) + correctPayload := []byte("good") + buffer := bytes.Buffer { } + err := encodeMessageA(&buffer, defaultSizeLimit, correctTransID, correctMethod, correctPayload) + if err != nil { test.Fatal(err) } + transID, method, chunked, payload, err := decodeMessageA(&buffer, defaultSizeLimit) + if got, correct := transID, int64(2); got != correct { + test.Fatalf("not equal: %v %v", got, correct) + } + if got, correct := method, uint16(30); got != correct { + test.Fatalf("not equal: %v %v", got, correct) + } + if chunked { + test.Fatalf("message should not be chunked") + } + if got, correct := payload, correctPayload; !slices.Equal(got, correct) { + test.Fatalf("not equal: %v %v", got, correct) + } +} + func clientServerEnvironment(test *testing.T, clientFunc func(conn Conn), serverFunc func(conn Conn)) { network := "tcp" addr := "localhost:7959" @@ -145,6 +243,7 @@ func clientServerEnvironment(test *testing.T, clientFunc func(conn Conn), server test.Cleanup(func() { a.Close() }) serverFunc(a) + test.Log("SERVER closing") }() // client @@ -158,9 +257,10 @@ func clientServerEnvironment(test *testing.T, clientFunc func(conn Conn), server clientFunc(a) test.Log("CLIENT waiting for connection close...") - _, err = a.AcceptTrans() + trans, err := a.AcceptTrans() if !errors.Is(err, io.EOF) { - test.Fatal("CLIENT wrong error:", err) + test.Error("CLIENT wrong error:", err) + test.Fatal("CLIENT trans:", trans) } test.Log("CLIENT DONE") conn.Close() -- 2.46.1 From 2fdf7d490d7fc192912f71e9d2b51e5207a8fdc0 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Wed, 14 May 2025 13:52:03 -0400 Subject: [PATCH 33/38] Remove unneeded code --- metadapta.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/metadapta.go b/metadapta.go index 9739500..dfbe9a0 100644 --- a/metadapta.go +++ b/metadapta.go @@ -313,9 +313,6 @@ func (this *readerA) pull() (uint16, error) { this.eof = true } return message.method, nil - } else { - // signal parent transaction of closure - this.parent.closed.Store(true) } } } -- 2.46.1 From 218949bd46159207b101a75228e9a1c0865ad46f Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Wed, 14 May 2025 14:39:19 -0400 Subject: [PATCH 34/38] Remove quic It's clear it won't survive this change because I can't even test it, so who knows if its good enough to have in main. --- dial.go | 18 ++--------------- go.mod | 14 -------------- go.sum | 56 ----------------------------------------------------- listen.go | 36 +++------------------------------- quicwrap.go | 54 --------------------------------------------------- 5 files changed, 5 insertions(+), 173 deletions(-) delete mode 100644 quicwrap.go diff --git a/dial.go b/dial.go index 95a24c9..a5e062f 100644 --- a/dial.go +++ b/dial.go @@ -1,9 +1,9 @@ package hopp import "net" +import "errors" import "context" import "crypto/tls" -import "github.com/quic-go/quic-go" // Dial opens a connection to a server. The network must be one of "quic", // "quic4", (IPv4-only) "quic6" (IPv6-only), or "unix". For now, "quic4" and @@ -31,12 +31,7 @@ func (diale Dialer) Dial(ctx context.Context, network, address string) (Conn, er } func (diale Dialer) dialQUIC(ctx context.Context, network, address string) (Conn, error) { - // sorry i fucking lied to you about the network parameter. for all - // quic-go's bullshit bloat, it doesnt even support that. not even when - // instantiating a transport. go figure :/ - conn, err := quic.DialAddr(ctx, address, tlsConfig(diale.TLSConfig), quicConfig()) - if err != nil { return nil, err } - return AdaptB(quicMultiConn { underlying: conn }), nil + return nil, errors.New("quic is not yet implemented") } func (diale Dialer) dialUnix(ctx context.Context, network, address string) (Conn, error) { @@ -60,15 +55,6 @@ func tlsConfig(conf *tls.Config) *tls.Config { return conf } -func quicConfig() *quic.Config { - return &quic.Config { - // TODO: perhaps we might want to put something here - // the quic config shouldn't be exported, just set up - // automatically. we can't have that strangely built quic-go - // package be part of the API, or any third-party packages for - // that matter. it must all be abstracted away. - } -} func quicNetworkToUDPNetwork(network string) (string, error) { switch network { diff --git a/go.mod b/go.mod index 1acc120..0b443b6 100644 --- a/go.mod +++ b/go.mod @@ -5,18 +5,4 @@ go 1.23.0 require ( git.tebibyte.media/sashakoshka/go-util v0.9.1 github.com/gomarkdown/markdown v0.0.0-20241205020045-f7e15b2f3e62 - github.com/quic-go/quic-go v0.48.2 -) - -require ( - github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect - github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect - github.com/onsi/ginkgo/v2 v2.9.5 // indirect - go.uber.org/mock v0.4.0 // indirect - golang.org/x/crypto v0.26.0 // indirect - golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect - golang.org/x/mod v0.17.0 // indirect - golang.org/x/net v0.28.0 // indirect - golang.org/x/sys v0.23.0 // indirect - golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect ) diff --git a/go.sum b/go.sum index 2f2e05a..bb15dd1 100644 --- a/go.sum +++ b/go.sum @@ -1,60 +1,4 @@ git.tebibyte.media/sashakoshka/go-util v0.9.1 h1:eGAbLwYhOlh4aq/0w+YnJcxT83yPhXtxnYMzz6K7xGo= git.tebibyte.media/sashakoshka/go-util v0.9.1/go.mod h1:0Q1t+PePdx6tFYkRuJNcpM1Mru7wE6X+it1kwuOH+6Y= -github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= -github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= -github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= -github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= -github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= -github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= -github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= -github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= -github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= -github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg= -github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY= github.com/gomarkdown/markdown v0.0.0-20241205020045-f7e15b2f3e62 h1:pbAFUZisjG4s6sxvRJvf2N7vhpCvx2Oxb3PmS6pDO1g= github.com/gomarkdown/markdown v0.0.0-20241205020045-f7e15b2f3e62/go.mod h1:JDGcbDT52eL4fju3sZ4TeHGsQwhG9nbDV21aMyhwPoA= -github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= -github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= -github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE= -github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE= -github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc= -github.com/onsi/ginkgo/v2 v2.9.5 h1:+6Hr4uxzP4XIUyAkg61dWBw8lb/gc4/X5luuxN/EC+Q= -github.com/onsi/ginkgo/v2 v2.9.5/go.mod h1:tvAoo1QUJwNEU2ITftXTpR7R1RbCzoZUOs3RonqW57k= -github.com/onsi/gomega v1.27.6 h1:ENqfyGeS5AX/rlXDd/ETokDz93u0YufY1Pgxuy/PvWE= -github.com/onsi/gomega v1.27.6/go.mod h1:PIQNjfQwkP3aQAH7lf7j87O/5FiNr+ZR8+ipb+qQlhg= -github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM= -github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4= -github.com/quic-go/quic-go v0.48.2 h1:wsKXZPeGWpMpCGSWqOcqpW2wZYic/8T3aqiOID0/KWE= -github.com/quic-go/quic-go v0.48.2/go.mod h1:yBgs3rWBOADpga7F+jJsb6Ybg1LSYiQvwWlLX+/6HMs= -github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= -github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= -github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= -github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY= -go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU= -go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc= -golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw= -golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54= -golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM= -golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc= -golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA= -golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c= -golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE= -golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg= -golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ= -golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk= -golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs= -golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM= -golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= -golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc= -golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY= -golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk= -golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM= -golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg= -golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk= -google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI= -google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos= -gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= -gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= -gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= -gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= diff --git a/listen.go b/listen.go index 09f1a03..723dea5 100644 --- a/listen.go +++ b/listen.go @@ -1,9 +1,8 @@ package hopp import "net" -import "context" +import "errors" import "crypto/tls" -import "github.com/quic-go/quic-go" // Listener is an object which listens for incoming HOPP connections. type Listener interface { @@ -30,19 +29,8 @@ func Listen(network, address string) (Listener, error) { // The network must be one of "quic", "quic4", (IPv4-only) or "quic6" // (IPv6-only). func ListenQUIC(network, address string, tlsConf *tls.Config) (Listener, error) { - tlsConf = tlsConfig(tlsConf) - quicConf := quicConfig() - udpNetwork, err := quicNetworkToUDPNetwork(network) - if err != nil { return nil, err } - addr, err := net.ResolveUDPAddr(udpNetwork, address) - if err != nil { return nil, err } - udpListener, err := net.ListenUDP(udpNetwork, addr) - if err != nil { return nil, err } - quicListener, err := quic.Listen(udpListener, tlsConf, quicConf) - if err != nil { return nil, err } - return &listenerQUIC { - underlying: quicListener, - }, nil + // tlsConf = tlsConfig(tlsConf) + return nil, errors.New("quic is not yet implemented") } // ListenUnix listens for incoming HOPP connections using a Unix domain socket @@ -58,24 +46,6 @@ func ListenUnix(network, address string) (Listener, error) { }, nil } -type listenerQUIC struct { - underlying *quic.Listener -} - -func (this *listenerQUIC) Accept() (Conn, error) { - conn, err := this.underlying.Accept(context.Background()) - if err != nil { return nil, err } - return AdaptB(quicMultiConn { underlying: conn }), nil -} - -func (this *listenerQUIC) Close() error { - return this.underlying.Close() -} - -func (this *listenerQUIC) Addr() net.Addr { - return this.underlying.Addr() -} - type listenerUnix struct { underlying *net.UnixListener } diff --git a/quicwrap.go b/quicwrap.go deleted file mode 100644 index 45b00b3..0000000 --- a/quicwrap.go +++ /dev/null @@ -1,54 +0,0 @@ -package hopp - -import "net" -import "context" -import "github.com/quic-go/quic-go" - -var _ MultiConn = quicMultiConn { } -type quicMultiConn struct { - underlying quic.Connection -} - -func (conn quicMultiConn) Close() error { - return conn.underlying.CloseWithError(0, "good bye") -} - -func (conn quicMultiConn) LocalAddr() net.Addr { - return conn.underlying.LocalAddr() -} - -func (conn quicMultiConn) RemoteAddr() net.Addr { - return conn.underlying.RemoteAddr() -} - -func (conn quicMultiConn) AcceptStream(ctx context.Context) (Stream, error) { - strea, err := conn.underlying.AcceptStream(ctx) - if err != nil { return nil, err } - return quicStream { underlying: strea }, nil -} - -func (conn quicMultiConn) OpenStream() (Stream, error) { - strea, err := conn.underlying.OpenStream() - if err != nil { return nil, err } - return quicStream { underlying: strea }, nil -} - -type quicStream struct { - underlying quic.Stream -} - -func (strea quicStream) Read(buffer []byte) (n int, err error) { - return strea.underlying.Read(buffer) -} - -func (strea quicStream) Write(buffer []byte) (n int, err error) { - return strea.underlying.Read(buffer) -} - -func (strea quicStream) Close() error { - return strea.underlying.Close() -} - -func (strea quicStream) ID() int64 { - return int64(strea.underlying.StreamID()) -} -- 2.46.1 From 0b98c768b37bb42fa0e864a9d1db28ab540abc4c Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Wed, 14 May 2025 14:44:27 -0400 Subject: [PATCH 35/38] Fix some outdated doc comments --- dial.go | 5 ++--- listen.go | 3 ++- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/dial.go b/dial.go index a5e062f..9b9d3d8 100644 --- a/dial.go +++ b/dial.go @@ -19,9 +19,8 @@ type Dialer struct { } // Dial opens a connection to a server. The network must be one of "quic", -// "quic4", (IPv4-only) "quic6" (IPv6-only), or "unix". For now, "quic4" and -// "quic6" don't do anything as the quic-go package doesn't seem to support this -// behavior. +// "quic4", (IPv4-only) "quic6" (IPv6-only), or "unix". For now, quic is not +// supported. func (diale Dialer) Dial(ctx context.Context, network, address string) (Conn, error) { switch network { case "quic", "quic4", "quic6": return diale.dialQUIC(ctx, network, address) diff --git a/listen.go b/listen.go index 723dea5..4c0681c 100644 --- a/listen.go +++ b/listen.go @@ -16,7 +16,8 @@ type Listener interface { } // Listen listens for incoming HOPP connections. The network must be one of -// "quic", "quic4", (IPv4-only) "quic6" (IPv6-only), or "unix". +// "quic", "quic4", (IPv4-only) "quic6" (IPv6-only), or "unix". For now, quic is +// not supported. func Listen(network, address string) (Listener, error) { switch network { case "quic", "quic4", "quic6": return ListenQUIC(network, address, nil) -- 2.46.1 From 83443b8c8870c3baed1078146eda3c71aeecd18d Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Wed, 14 May 2025 15:15:03 -0400 Subject: [PATCH 36/38] design: Fix documentation on message payload length --- design/protocol.md | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/design/protocol.md b/design/protocol.md index 550cc86..33daca2 100644 --- a/design/protocol.md +++ b/design/protocol.md @@ -18,12 +18,10 @@ dependant on which transport is being used. A message refers to a block of octets sent within a transaction, paired with an unsigned 16-bit method code. The order of messages within a given transaction is preserved, but the order of messages accross the entire connection is not -guaranteed. - -The message payload must be 65,535 (unsigned 16-bit integer limit) octets or -smaller in length. This does not include the method code. Applications are free -to send whatever data they wish as the payload, but TAPE is recommended for -encoding it. +guaranteed. There is no functional limit on the size of a message payload, but +there may be one depending on which +[METADAPT sub-protocol](#message-and-transaction-demarcation-protocol-metadapt) +is in use. Method codes should be written in upper-case base 16 with the prefix "M" in logs, error messages, documentation, etc. For example, the method code 62,206 in -- 2.46.1 From 835d6230873c1d89a986ff6d0d9b03d43a379baf Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Thu, 15 May 2025 17:49:29 -0400 Subject: [PATCH 37/38] Change the protocol definition for tape to conform to #2 --- design/protocol.md | 68 +++++++++++++++++++++++++++------------------- 1 file changed, 40 insertions(+), 28 deletions(-) diff --git a/design/protocol.md b/design/protocol.md index 33daca2..ab21173 100644 --- a/design/protocol.md +++ b/design/protocol.md @@ -35,25 +35,17 @@ fucking with you. ## Table Pair Encoding (TAPE) The Table Pair Encoding (TAPE) scheme is a method for encoding structured data within HOPP messages. It defines standard binary encoding methods for common -data types, as well as a corruption-resistant table structure that maps numeric -IDs to values. It is designed to allow applications to be presented with data -they are not equipped to handle while continuing to function normally. This -enables backwards compatibile application protocol changes. +data types, as well as aggregate data types such as tables and arrays. It is +designed to allow applications to be presented with data they are not equipped +to handle while continuing to function normally. This enables backwards +compatibile application protocol changes. -### Table Structure -A table is divided into two sections: the header, and the values. The header -begins with the number (U16) of pairs in the table, which is then followed by -that many tag-offset pairs. A tag-offset pair consists of a numerical (U16) tag, -followed the position (U16) of the value relative to the start of the values -section. The values section contains the value data for each pair, where the -start of each value is determined by its offset, and the end is determined by -the offset of the next value, or the end of the message if there is no value -after it. - -Both sections must be in the same order, and because of this, each value offset -must be greater than or equal to the last. If a message has erratic structure -(such as unordered or out-of-bounds offsets), implementations may opt to discard -only the erratic pairs, as well as the pairs directly before those. +The length of a TAPE structure is assumed to be given by the surrounding +protocol, which is usually METADAPT-A or B. The root of a TAPE structure can be +any data value, but is usually a table, which can contain several values that +each have a numeric key. Values can also be nested. Both sides of the connection +must agree on what data type should be the root value, the data type of each +known table value, etc. ### Data Value Types The table below lists all data value types supported by TAPE. @@ -68,9 +60,10 @@ The table below lists all data value types supported by TAPE. | U16 | 2 | An unsigned 16-bit integer | BEU | U32 | 4 | An unsigned 32-bit integer | BEU | U64 | 8 | An unsigned 64-bit integer | BEU -| Array[^1] | SOP[^2] | An array of any above type | PASTA -| String | N/A | A UTF-8 string | UTF-8 -| StringArray | n * 2 + SOP[^2] | An array the String type | VILA +| Array[^1] | | An array of any above type | PASTA +| String | | A UTF-8 string | UTF-8 +| StringArray | | An array the String type | VILA +| Table | | A table of any type | TTLV [^1]: Array types are written as Array, where is the element type. For example, an array of I32 would be written as I32Array. StringArray still follows @@ -95,6 +88,15 @@ Big-Endian, Unsigned integer. The size is defined as the least amount of whole octets which can fit all bits in the integer, regardless if the bits are on or off. Therefore, the size cannot change at runtime. +#### GBEU +Growing Big-Endian, Unsigned integer. The integer is broken up into 8-bit +chunks, where the first bit of each chunk is a CCB. The chunk with its CCB set +to zero instead of one is the last chunk in the integer. Chunks are ordered from +most significant to least significant (big endian). The size is defined as the +least amount of whole octets which can fit all chunks of the integer. The size +of this type is not fixed and may change at runtime, so this needs to be +accounted for during use. + #### PASTA Packed Single-Type Array. The size is defined as the size of an individual item times the number of items. Items are placed one after the other with no gaps @@ -110,13 +112,23 @@ for during use. #### VILA Variable Item Length Array. The size is defined as the least amount of whole -octets which can fit each item plus one U16 per item. The size of this type is -not fixed and may change at runtime, so this needs to be accounted for during -use. The amount of items must be greater than zero. Items are each prefixed by -their size (in octets) encoded as a U16, and they are placed one after the other -with no gaps in-between them, except as required to align the start of each item -to the nearest whole octet. Items should be of the same type but do not need to -be of the same size. +octets which can fit each item plus one GBEU per item describing that item's +size. The size of this type is not fixed and may change at runtime, so this +needs to be accounted for during use. The amount of items must be greater than +zero. Items are each prefixed by their size (in octets) encoded as a GBEU, and +they are placed one after the other with no gaps in-between them, except as +required to align the start of each item to the nearest whole octet. Items +should be of the same type but do not need to be of the same size. + +#### TTLV +TAPE Tag Length Value. The size is defined as the least amount of whole octets +which can fit each item plus one U16 and one GBEU per item, where the latter of +which describes that item's size. The size of this type is not fixed and may +change at runtime, so this needs to be accounted for during use. Items are each +prefixed by their numerical tag encoded as a U16, and their size (in octets) +encoded as a GBEU. Items are placed one after the other with no gaps in-between +them, except as required to align the start of each item to the nearest whole +octet. Items need not be of the same type nor the same size. ## Transports A transport is a protocol that HOPP connections can run on top of. HOPP -- 2.46.1 From dd5e7e96d5262c529e0f3c81e589befca56cd614 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Thu, 15 May 2025 17:56:41 -0400 Subject: [PATCH 38/38] design: Remove note about this limitation --- design/protocol.md | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/design/protocol.md b/design/protocol.md index ab21173..d5e41e2 100644 --- a/design/protocol.md +++ b/design/protocol.md @@ -67,10 +67,7 @@ The table below lists all data value types supported by TAPE. [^1]: Array types are written as Array, where is the element type. For example, an array of I32 would be written as I32Array. StringArray still follows -this rule, even though it is encoded differently from other arrays. Nesting -arrays inside of arrays is prohibited. This problem can be avoided in most cases -by effectively utilizing the table structure, or by improving the design of -your protocol. +this rule, even though it is encoded differently from other arrays. [^2]: SOP (sum of parts) refers to the sum of the size of every item in a data structure. -- 2.46.1