message-size-increase #3

Open
sashakoshka wants to merge 38 commits from message-size-increase into main
11 changed files with 674 additions and 370 deletions

View File

@ -1,8 +1,11 @@
package hopp package hopp
import "io"
import "net" import "net"
// import "time" // import "time"
const defaultSizeLimit int64 = 1024 * 1024 // 1 megabyte
// Conn is a HOPP connection. // Conn is a HOPP connection.
type Conn interface { type Conn interface {
// Close closes the connection. Any blocked operations on the connection // Close closes the connection. Any blocked operations on the connection
@ -19,22 +22,39 @@ type Conn interface {
// AcceptTrans accepts a transaction from the other party. This must // AcceptTrans accepts a transaction from the other party. This must
// be called in a loop to avoid the connection locking up. // be called in a loop to avoid the connection locking up.
AcceptTrans() (Trans, error) AcceptTrans() (Trans, error)
// SetSizeLimit sets a limit (in bytes) for how large messages can be.
// By default, this limit is 1 megabyte.
SetSizeLimit(limit int64)
} }
// Trans is a HOPP transaction. // Trans is a HOPP transaction. Methods of this interface are not safe for
// concurrent use with the exception of the Close and ID methods. The
// recommended use case is one goroutine per transaction.
type Trans interface { type Trans interface {
// Close closes the transaction. Any blocked operations will be // Close closes the transaction. Any blocked operations will be
// unblocked and return errors. // unblocked and return errors. This method is safe for concurrent use.
Close() error Close() error
// ID returns the transaction ID. This must not change, and it must be // ID returns the transaction ID. This must not change, and it must be
// unique within the connection. // unique within the connection. This method is safe for concurrent use.
ID() int64 ID() int64
// TODO: add methods for setting send and receive deadlines // TODO: add methods for setting send and receive deadlines
// Send sends a message. // Send sends a message. This method is not safe for concurrent use.
Send(method uint16, data []byte) error Send(method uint16, data []byte) error
// Receive receives a message. // SendWriter sends data written to an [io.Writer]. The writer must be
// closed after use. Closing the writer flushes any data that hasn't
// been written yet. Any writer previously opened through this function
// will be discarded. This method is not safe for concurrent use, and
// neither is its result.
SendWriter(method uint16) (io.WriteCloser, error)
// Receive receives a message. This method is not safe for concurrent
// use.
Receive() (method uint16, data []byte, err error) Receive() (method uint16, data []byte, err error)
// ReceiveReader receives a message as an [io.Reader]. Any reader
// previously opened through this function will be discarded. This
// method is not safe for concurrent use, and neither is its result.
ReceiveReader() (method uint16, data io.Reader, err error)
} }

View File

@ -18,12 +18,10 @@ dependant on which transport is being used.
A message refers to a block of octets sent within a transaction, paired with an A message refers to a block of octets sent within a transaction, paired with an
unsigned 16-bit method code. The order of messages within a given transaction is unsigned 16-bit method code. The order of messages within a given transaction is
preserved, but the order of messages accross the entire connection is not preserved, but the order of messages accross the entire connection is not
guaranteed. guaranteed. There is no functional limit on the size of a message payload, but
there may be one depending on which
The message payload must be 65,535 (unsigned 16-bit integer limit) octets or [METADAPT sub-protocol](#message-and-transaction-demarcation-protocol-metadapt)
smaller in length. This does not include the method code. Applications are free is in use.
to send whatever data they wish as the payload, but TAPE is recommended for
encoding it.
Method codes should be written in upper-case base 16 with the prefix "M" in Method codes should be written in upper-case base 16 with the prefix "M" in
logs, error messages, documentation, etc. For example, the method code 62,206 in logs, error messages, documentation, etc. For example, the method code 62,206 in
@ -37,25 +35,17 @@ fucking with you.
## Table Pair Encoding (TAPE) ## Table Pair Encoding (TAPE)
The Table Pair Encoding (TAPE) scheme is a method for encoding structured data The Table Pair Encoding (TAPE) scheme is a method for encoding structured data
within HOPP messages. It defines standard binary encoding methods for common within HOPP messages. It defines standard binary encoding methods for common
data types, as well as a corruption-resistant table structure that maps numeric data types, as well as aggregate data types such as tables and arrays. It is
IDs to values. It is designed to allow applications to be presented with data designed to allow applications to be presented with data they are not equipped
they are not equipped to handle while continuing to function normally. This to handle while continuing to function normally. This enables backwards
enables backwards compatibile application protocol changes. compatibile application protocol changes.
### Table Structure The length of a TAPE structure is assumed to be given by the surrounding
A table is divided into two sections: the header, and the values. The header protocol, which is usually METADAPT-A or B. The root of a TAPE structure can be
begins with the number (U16) of pairs in the table, which is then followed by any data value, but is usually a table, which can contain several values that
that many tag-offset pairs. A tag-offset pair consists of a numerical (U16) tag, each have a numeric key. Values can also be nested. Both sides of the connection
followed the position (U16) of the value relative to the start of the values must agree on what data type should be the root value, the data type of each
section. The values section contains the value data for each pair, where the known table value, etc.
start of each value is determined by its offset, and the end is determined by
the offset of the next value, or the end of the message if there is no value
after it.
Both sections must be in the same order, and because of this, each value offset
must be greater than or equal to the last. If a message has erratic structure
(such as unordered or out-of-bounds offsets), implementations may opt to discard
only the erratic pairs, as well as the pairs directly before those.
### Data Value Types ### Data Value Types
The table below lists all data value types supported by TAPE. The table below lists all data value types supported by TAPE.
@ -70,16 +60,14 @@ The table below lists all data value types supported by TAPE.
| U16 | 2 | An unsigned 16-bit integer | BEU | U16 | 2 | An unsigned 16-bit integer | BEU
| U32 | 4 | An unsigned 32-bit integer | BEU | U32 | 4 | An unsigned 32-bit integer | BEU
| U64 | 8 | An unsigned 64-bit integer | BEU | U64 | 8 | An unsigned 64-bit integer | BEU
| Array[^1] | SOP[^2] | An array of any above type | PASTA | Array[^1] | | An array of any above type | PASTA
| String | N/A | A UTF-8 string | UTF-8 | String | | A UTF-8 string | UTF-8
| StringArray | n * 2 + SOP[^2] | An array the String type | VILA | StringArray | | An array the String type | VILA
| Table | | A table of any type | TTLV
[^1]: Array types are written as <E>Array, where <E> is the element type. For [^1]: Array types are written as <E>Array, where <E> is the element type. For
example, an array of I32 would be written as I32Array. StringArray still follows example, an array of I32 would be written as I32Array. StringArray still follows
this rule, even though it is encoded differently from other arrays. Nesting this rule, even though it is encoded differently from other arrays.
arrays inside of arrays is prohibited. This problem can be avoided in most cases
by effectively utilizing the table structure, or by improving the design of
your protocol.
[^2]: SOP (sum of parts) refers to the sum of the size of every item in a data [^2]: SOP (sum of parts) refers to the sum of the size of every item in a data
structure. structure.
@ -97,6 +85,15 @@ Big-Endian, Unsigned integer. The size is defined as the least amount of whole
octets which can fit all bits in the integer, regardless if the bits are on or octets which can fit all bits in the integer, regardless if the bits are on or
off. Therefore, the size cannot change at runtime. off. Therefore, the size cannot change at runtime.
#### GBEU
Growing Big-Endian, Unsigned integer. The integer is broken up into 8-bit
chunks, where the first bit of each chunk is a CCB. The chunk with its CCB set
to zero instead of one is the last chunk in the integer. Chunks are ordered from
most significant to least significant (big endian). The size is defined as the
least amount of whole octets which can fit all chunks of the integer. The size
of this type is not fixed and may change at runtime, so this needs to be
accounted for during use.
#### PASTA #### PASTA
Packed Single-Type Array. The size is defined as the size of an individual item Packed Single-Type Array. The size is defined as the size of an individual item
times the number of items. Items are placed one after the other with no gaps times the number of items. Items are placed one after the other with no gaps
@ -112,25 +109,37 @@ for during use.
#### VILA #### VILA
Variable Item Length Array. The size is defined as the least amount of whole Variable Item Length Array. The size is defined as the least amount of whole
octets which can fit each item plus one U16 per item. The size of this type is octets which can fit each item plus one GBEU per item describing that item's
not fixed and may change at runtime, so this needs to be accounted for during size. The size of this type is not fixed and may change at runtime, so this
use. The amount of items must be greater than zero. Items are each prefixed by needs to be accounted for during use. The amount of items must be greater than
their size (in octets) encoded as a U16, and they are placed one after the other zero. Items are each prefixed by their size (in octets) encoded as a GBEU, and
with no gaps in-between them, except as required to align the start of each item they are placed one after the other with no gaps in-between them, except as
to the nearest whole octet. Items should be of the same type but do not need to required to align the start of each item to the nearest whole octet. Items
be of the same size. should be of the same type but do not need to be of the same size.
#### TTLV
TAPE Tag Length Value. The size is defined as the least amount of whole octets
which can fit each item plus one U16 and one GBEU per item, where the latter of
which describes that item's size. The size of this type is not fixed and may
change at runtime, so this needs to be accounted for during use. Items are each
prefixed by their numerical tag encoded as a U16, and their size (in octets)
encoded as a GBEU. Items are placed one after the other with no gaps in-between
them, except as required to align the start of each item to the nearest whole
octet. Items need not be of the same type nor the same size.
## Transports ## Transports
A transport is a protocol that HOPP connections can run on top of. HOPP A transport is a protocol that HOPP connections can run on top of. HOPP
currently supports the QUIC transport protocol for communicating between currently supports the QUIC transport protocol for communicating between
machines, and UNIX domain sockets for quicker communication among applications machines, TCP/TLS for legacy systems that do not support QUIC, and UNIX domain
on the same machine. Both protocols are supported through METADAPT. sockets for faster communication among applications on the same machine. Both
protocols are supported through METADAPT.
## Message and Transaction Demarcation Protocol (METADAPT) ## Message and Transaction Demarcation Protocol (METADAPT)
The Message and Transaction Demarcation Protocol is used to break one or more The Message and Transaction Demarcation Protocol is used to break one or more
reliable data streams into transactions, which are broken down further into reliable data streams into transactions, which are broken down further into
messages. A message, as well as its associated metadata (length, transaction, messages. The representation of a message (or a part thereof) on the protocol,
method, etc.) together is referred to as METADAPT Message Block (MMB). including its associated metadata (length, transaction, method, etc.) is
referred to as METADAPT Message Block (MMB).
For transports that offer multiple multiplexed data streams that can be created For transports that offer multiple multiplexed data streams that can be created
and destroyed on-demand (such as QUIC) each stream is used as a transaction. If and destroyed on-demand (such as QUIC) each stream is used as a transaction. If
@ -145,8 +154,12 @@ METADAPT-A requires a transport which offers a single full-duplex data stream
that persists for the duration of the connection. All transactions are that persists for the duration of the connection. All transactions are
multiplexed onto this single stream. Each MMB contains a 12-octet long header, multiplexed onto this single stream. Each MMB contains a 12-octet long header,
with the transaction ID, then the method, and then the payload size (in octets). with the transaction ID, then the method, and then the payload size (in octets).
The transaction ID is encoded as an I64, and the method and payload size are The transaction ID is encoded as an I64, the method is encoded as a U16 and the
both encoded as U16s. The remainder of the message is the payload. Since each and payload size is encoded as a U64. Only the 63 least significant bits of the
payload size describe the actual size, the most significant bit controlling
chunking. See the section on chunking for more information.
The remainder of the message is the payload. Since each
MMB is self-describing, they are sent sequentially with no gaps in-between them. MMB is self-describing, they are sent sequentially with no gaps in-between them.
Transactions "open" when the first message with a given transaction ID is sent. Transactions "open" when the first message with a given transaction ID is sent.
@ -162,13 +175,25 @@ used up, the connection must fail. Don't worry about this though, because the
sun will have expanded to swallow earth by then. Your connection will not last sun will have expanded to swallow earth by then. Your connection will not last
that long. that long.
#### Message Chunking
The most significant bit of the payload size field of an MMB is called the Chunk
Control Bit (CCB). If the CCB of a given MMB is zero, the represented message is
interpreted as being self-contained and the data is processed immediately. If
the CCB is one, the message is interpreted as being chunked, with the data of
the current MMB being the first chunk. The data of further MMBs sent along the
transaction will be appended to the message until an MMB is read with a zero
CCB, in which case the MMB will be the last chunk and any more MMBs will be
interpreted as normal.
### METADAPT-B ### METADAPT-B
METADAPT-B requires a transport which offers multiple multiplexed full-duplex METADAPT-B requires a transport which offers multiple multiplexed full-duplex
data streams per connection that can be created and destroyed on-demand. Each data streams per connection that can be created and destroyed on-demand. Each
data stream is used as an individual transaction. Each MMB contains a 4-octet data stream is used as an individual transaction. Each MMB contains a 4-octet
long header with the method and then the payload size (in octets) both encoded long header with the method and then the payload size (in octets) encoded as a
as U16s. The remainder of the message is the payload. Since each MMB is U16 and U64 respectively. The remainder of the message is the payload. Since
self-describing, they are sent sequentially with no gaps in-between them. each MMB is self-describing, they are sent sequentially with no gaps in-between
them.
The ID of any transaction will reflect the ID of its corresponding stream. The The ID of any transaction will reflect the ID of its corresponding stream. The
lifetime of the transaction is tied to the lifetime of the stream, that is to lifetime of the transaction is tied to the lifetime of the stream, that is to

23
dial.go
View File

@ -1,9 +1,9 @@
package hopp package hopp
import "net" import "net"
import "errors"
import "context" import "context"
import "crypto/tls" import "crypto/tls"
import "github.com/quic-go/quic-go"
// Dial opens a connection to a server. The network must be one of "quic", // Dial opens a connection to a server. The network must be one of "quic",
// "quic4", (IPv4-only) "quic6" (IPv6-only), or "unix". For now, "quic4" and // "quic4", (IPv4-only) "quic6" (IPv6-only), or "unix". For now, "quic4" and
@ -19,9 +19,8 @@ type Dialer struct {
} }
// Dial opens a connection to a server. The network must be one of "quic", // Dial opens a connection to a server. The network must be one of "quic",
// "quic4", (IPv4-only) "quic6" (IPv6-only), or "unix". For now, "quic4" and // "quic4", (IPv4-only) "quic6" (IPv6-only), or "unix". For now, quic is not
// "quic6" don't do anything as the quic-go package doesn't seem to support this // supported.
// behavior.
func (diale Dialer) Dial(ctx context.Context, network, address string) (Conn, error) { func (diale Dialer) Dial(ctx context.Context, network, address string) (Conn, error) {
switch network { switch network {
case "quic", "quic4", "quic6": return diale.dialQUIC(ctx, network, address) case "quic", "quic4", "quic6": return diale.dialQUIC(ctx, network, address)
@ -31,12 +30,7 @@ func (diale Dialer) Dial(ctx context.Context, network, address string) (Conn, er
} }
func (diale Dialer) dialQUIC(ctx context.Context, network, address string) (Conn, error) { func (diale Dialer) dialQUIC(ctx context.Context, network, address string) (Conn, error) {
// sorry i fucking lied to you about the network parameter. for all return nil, errors.New("quic is not yet implemented")
// quic-go's bullshit bloat, it doesnt even support that. not even when
// instantiating a transport. go figure :/
conn, err := quic.DialAddr(ctx, address, tlsConfig(diale.TLSConfig), quicConfig())
if err != nil { return nil, err }
return AdaptB(quicMultiConn { underlying: conn }), nil
} }
func (diale Dialer) dialUnix(ctx context.Context, network, address string) (Conn, error) { func (diale Dialer) dialUnix(ctx context.Context, network, address string) (Conn, error) {
@ -60,15 +54,6 @@ func tlsConfig(conf *tls.Config) *tls.Config {
return conf return conf
} }
func quicConfig() *quic.Config {
return &quic.Config {
// TODO: perhaps we might want to put something here
// the quic config shouldn't be exported, just set up
// automatically. we can't have that strangely built quic-go
// package be part of the API, or any third-party packages for
// that matter. it must all be abstracted away.
}
}
func quicNetworkToUDPNetwork(network string) (string, error) { func quicNetworkToUDPNetwork(network string) (string, error) {
switch network { switch network {

14
go.mod
View File

@ -5,18 +5,4 @@ go 1.23.0
require ( require (
git.tebibyte.media/sashakoshka/go-util v0.9.1 git.tebibyte.media/sashakoshka/go-util v0.9.1
github.com/gomarkdown/markdown v0.0.0-20241205020045-f7e15b2f3e62 github.com/gomarkdown/markdown v0.0.0-20241205020045-f7e15b2f3e62
github.com/quic-go/quic-go v0.48.2
)
require (
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 // indirect
github.com/onsi/ginkgo/v2 v2.9.5 // indirect
go.uber.org/mock v0.4.0 // indirect
golang.org/x/crypto v0.26.0 // indirect
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 // indirect
golang.org/x/mod v0.17.0 // indirect
golang.org/x/net v0.28.0 // indirect
golang.org/x/sys v0.23.0 // indirect
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d // indirect
) )

56
go.sum
View File

@ -1,60 +1,4 @@
git.tebibyte.media/sashakoshka/go-util v0.9.1 h1:eGAbLwYhOlh4aq/0w+YnJcxT83yPhXtxnYMzz6K7xGo= git.tebibyte.media/sashakoshka/go-util v0.9.1 h1:eGAbLwYhOlh4aq/0w+YnJcxT83yPhXtxnYMzz6K7xGo=
git.tebibyte.media/sashakoshka/go-util v0.9.1/go.mod h1:0Q1t+PePdx6tFYkRuJNcpM1Mru7wE6X+it1kwuOH+6Y= git.tebibyte.media/sashakoshka/go-util v0.9.1/go.mod h1:0Q1t+PePdx6tFYkRuJNcpM1Mru7wE6X+it1kwuOH+6Y=
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ=
github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI=
github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls=
github.com/golang/protobuf v1.5.3 h1:KhyjKVUg7Usr/dYsdSqoFveMYd5ko72D+zANwlG1mmg=
github.com/golang/protobuf v1.5.3/go.mod h1:XVQd3VNwM+JqD3oG2Ue2ip4fOMUkwXdXDdiuN0vRsmY=
github.com/gomarkdown/markdown v0.0.0-20241205020045-f7e15b2f3e62 h1:pbAFUZisjG4s6sxvRJvf2N7vhpCvx2Oxb3PmS6pDO1g= github.com/gomarkdown/markdown v0.0.0-20241205020045-f7e15b2f3e62 h1:pbAFUZisjG4s6sxvRJvf2N7vhpCvx2Oxb3PmS6pDO1g=
github.com/gomarkdown/markdown v0.0.0-20241205020045-f7e15b2f3e62/go.mod h1:JDGcbDT52eL4fju3sZ4TeHGsQwhG9nbDV21aMyhwPoA= github.com/gomarkdown/markdown v0.0.0-20241205020045-f7e15b2f3e62/go.mod h1:JDGcbDT52eL4fju3sZ4TeHGsQwhG9nbDV21aMyhwPoA=
github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI=
github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY=
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38 h1:yAJXTCF9TqKcTiHJAE8dj7HMvPfh66eeA2JYW7eFpSE=
github.com/google/pprof v0.0.0-20210407192527-94a9f03dee38/go.mod h1:kpwsk12EmLew5upagYY7GY0pfYCcupk39gWOCRROcvE=
github.com/ianlancetaylor/demangle v0.0.0-20200824232613-28f6c0f3b639/go.mod h1:aSSvb/t6k1mPoxDqO4vJh6VOCGPwU4O0C2/Eqndh1Sc=
github.com/onsi/ginkgo/v2 v2.9.5 h1:+6Hr4uxzP4XIUyAkg61dWBw8lb/gc4/X5luuxN/EC+Q=
github.com/onsi/ginkgo/v2 v2.9.5/go.mod h1:tvAoo1QUJwNEU2ITftXTpR7R1RbCzoZUOs3RonqW57k=
github.com/onsi/gomega v1.27.6 h1:ENqfyGeS5AX/rlXDd/ETokDz93u0YufY1Pgxuy/PvWE=
github.com/onsi/gomega v1.27.6/go.mod h1:PIQNjfQwkP3aQAH7lf7j87O/5FiNr+ZR8+ipb+qQlhg=
github.com/pmezard/go-difflib v1.0.0 h1:4DBwDE0NGyQoBHbLQYPwSUPoCMWR5BEzIk/f1lZbAQM=
github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZNVY4sRDYZ/4=
github.com/quic-go/quic-go v0.48.2 h1:wsKXZPeGWpMpCGSWqOcqpW2wZYic/8T3aqiOID0/KWE=
github.com/quic-go/quic-go v0.48.2/go.mod h1:yBgs3rWBOADpga7F+jJsb6Ybg1LSYiQvwWlLX+/6HMs=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
github.com/stretchr/testify v1.9.0/go.mod h1:r2ic/lqez/lEtzL7wO/rwa5dbSLXVDPFyf8C91i36aY=
go.uber.org/mock v0.4.0 h1:VcM4ZOtdbR4f6VXfiOpwpVJDL6lCReaZ6mw31wqh7KU=
go.uber.org/mock v0.4.0/go.mod h1:a6FSlNadKUHUa9IP5Vyt1zh4fC7uAwxMutEAscFbkZc=
golang.org/x/crypto v0.26.0 h1:RrRspgV4mU+YwB4FYnuBoKsUapNIL5cohGAmSH3azsw=
golang.org/x/crypto v0.26.0/go.mod h1:GY7jblb9wI+FOo5y8/S2oY4zWP07AkOJ4+jxCqdqn54=
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842 h1:vr/HnozRka3pE4EsMEg1lgkXJkTFJCVUX+S/ZT6wYzM=
golang.org/x/exp v0.0.0-20240506185415-9bf2ced13842/go.mod h1:XtvwrStGgqGPLc4cjQfWqZHG1YFdYs6swckp8vpsjnc=
golang.org/x/mod v0.17.0 h1:zY54UmvipHiNd+pm+m0x9KhZ9hl1/7QNMyxXbc6ICqA=
golang.org/x/mod v0.17.0/go.mod h1:hTbmBsO62+eylJbnUtE2MGJUyE7QWk4xUqPFrRgJ+7c=
golang.org/x/net v0.28.0 h1:a9JDOJc5GMUJ0+UDqmLT86WiEy7iWyIhz8gz8E4e5hE=
golang.org/x/net v0.28.0/go.mod h1:yqtgsTWOOnlGLG9GFRrK3++bGOUEkNBoHZc8MEDWPNg=
golang.org/x/sync v0.8.0 h1:3NFvSEYkUoMifnESzZl15y791HH1qU2xm6eCJU5ZPXQ=
golang.org/x/sync v0.8.0/go.mod h1:Czt+wKu1gCyEFDUtn0jG5QVvpJ6rzVqr5aXyt9drQfk=
golang.org/x/sys v0.0.0-20191204072324-ce4227a45e2e/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.23.0 h1:YfKFowiIMvtgl1UERQoTPPToxltDeZfbj4H7dVUCwmM=
golang.org/x/sys v0.23.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.17.0 h1:XtiM5bkSOt+ewxlOE/aE/AKEHibwj/6gvWMl9Rsh0Qc=
golang.org/x/text v0.17.0/go.mod h1:BuEKDfySbSR4drPmRPG/7iBdf8hvFMuRexcpahXilzY=
golang.org/x/time v0.5.0 h1:o7cqy6amK/52YcAKIPlM3a+Fpj35zvRj2TP+e1xFSfk=
golang.org/x/time v0.5.0/go.mod h1:3BpzKBy/shNhVucY/MWOyx10tF3SFh9QdLuxbVysPQM=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d h1:vU5i/LfpvrRCpgM/VPfJLg5KjxD3E+hfT1SH+d9zLwg=
golang.org/x/tools v0.21.1-0.20240508182429-e35e4ccd0d2d/go.mod h1:aiJjzUbINMkxbQROHiO6hDPo2LHcIPhhQsa9DLh0yGk=
google.golang.org/protobuf v1.33.0 h1:uNO2rsAINq/JlFpSdYEKIZ0uKD/R9cpdv0T+yoGwGmI=
google.golang.org/protobuf v1.33.0/go.mod h1:c6P6GXX6sHbq/GpV6MGZEdwhWPcYBgnhAHhKbcUYpos=
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=

View File

@ -1,9 +1,8 @@
package hopp package hopp
import "net" import "net"
import "context" import "errors"
import "crypto/tls" import "crypto/tls"
import "github.com/quic-go/quic-go"
// Listener is an object which listens for incoming HOPP connections. // Listener is an object which listens for incoming HOPP connections.
type Listener interface { type Listener interface {
@ -17,7 +16,8 @@ type Listener interface {
} }
// Listen listens for incoming HOPP connections. The network must be one of // Listen listens for incoming HOPP connections. The network must be one of
// "quic", "quic4", (IPv4-only) "quic6" (IPv6-only), or "unix". // "quic", "quic4", (IPv4-only) "quic6" (IPv6-only), or "unix". For now, quic is
// not supported.
func Listen(network, address string) (Listener, error) { func Listen(network, address string) (Listener, error) {
switch network { switch network {
case "quic", "quic4", "quic6": return ListenQUIC(network, address, nil) case "quic", "quic4", "quic6": return ListenQUIC(network, address, nil)
@ -30,19 +30,8 @@ func Listen(network, address string) (Listener, error) {
// The network must be one of "quic", "quic4", (IPv4-only) or "quic6" // The network must be one of "quic", "quic4", (IPv4-only) or "quic6"
// (IPv6-only). // (IPv6-only).
func ListenQUIC(network, address string, tlsConf *tls.Config) (Listener, error) { func ListenQUIC(network, address string, tlsConf *tls.Config) (Listener, error) {
tlsConf = tlsConfig(tlsConf) // tlsConf = tlsConfig(tlsConf)
quicConf := quicConfig() return nil, errors.New("quic is not yet implemented")
udpNetwork, err := quicNetworkToUDPNetwork(network)
if err != nil { return nil, err }
addr, err := net.ResolveUDPAddr(udpNetwork, address)
if err != nil { return nil, err }
udpListener, err := net.ListenUDP(udpNetwork, addr)
if err != nil { return nil, err }
quicListener, err := quic.Listen(udpListener, tlsConf, quicConf)
if err != nil { return nil, err }
return &listenerQUIC {
underlying: quicListener,
}, nil
} }
// ListenUnix listens for incoming HOPP connections using a Unix domain socket // ListenUnix listens for incoming HOPP connections using a Unix domain socket
@ -58,24 +47,6 @@ func ListenUnix(network, address string) (Listener, error) {
}, nil }, nil
} }
type listenerQUIC struct {
underlying *quic.Listener
}
func (this *listenerQUIC) Accept() (Conn, error) {
conn, err := this.underlying.Accept(context.Background())
if err != nil { return nil, err }
return AdaptB(quicMultiConn { underlying: conn }), nil
}
func (this *listenerQUIC) Close() error {
return this.underlying.Close()
}
func (this *listenerQUIC) Addr() net.Addr {
return this.underlying.Addr()
}
type listenerUnix struct { type listenerUnix struct {
underlying *net.UnixListener underlying *net.UnixListener
} }

View File

@ -4,11 +4,16 @@ import "io"
import "fmt" import "fmt"
import "net" import "net"
import "sync" import "sync"
import "sync/atomic"
import "git.tebibyte.media/sashakoshka/hopp/tape" import "git.tebibyte.media/sashakoshka/hopp/tape"
import "git.tebibyte.media/sashakoshka/go-util/sync" import "git.tebibyte.media/sashakoshka/go-util/sync"
// TODO investigate why 30 never reaches the server, causing it to wait for ever
// and never close the connection, causing the client to also wait forever
const closeMethod = 0xFFFF const closeMethod = 0xFFFF
const int64Max = int64((^uint64(0)) >> 1) const int64Max = int64((^uint64(0)) >> 1)
const defaultChunkSize = 0x1000
// Party represents a side of a connection. // Party represents a side of a connection.
type Party bool; const ( type Party bool; const (
@ -16,7 +21,16 @@ type Party bool; const (
ClientSide Party = true ClientSide Party = true
) )
func (party Party) String() string {
if party == ServerSide {
return "server"
} else {
return "client"
}
}
type a struct { type a struct {
sizeLimit int64
underlying net.Conn underlying net.Conn
party Party party Party
transID int64 transID int64
@ -32,6 +46,7 @@ type a struct {
// oriented transport such as TCP or UNIX domain stream sockets. // oriented transport such as TCP or UNIX domain stream sockets.
func AdaptA(underlying net.Conn, party Party) Conn { func AdaptA(underlying net.Conn, party Party) Conn {
conn := &a { conn := &a {
sizeLimit: defaultSizeLimit,
underlying: underlying, underlying: underlying,
party: party, party: party,
transMap: make(map[int64] *transA), transMap: make(map[int64] *transA),
@ -49,7 +64,7 @@ func AdaptA(underlying net.Conn, party Party) Conn {
func (this *a) Close() error { func (this *a) Close() error {
close(this.done) close(this.done)
return this.underlying.Close() return nil
} }
func (this *a) LocalAddr() net.Addr { func (this *a) LocalAddr() net.Addr {
@ -63,30 +78,41 @@ func (this *a) RemoteAddr() net.Addr {
func (this *a) OpenTrans() (Trans, error) { func (this *a) OpenTrans() (Trans, error) {
this.transLock.Lock() this.transLock.Lock()
defer this.transLock.Unlock() defer this.transLock.Unlock()
if this.transID == int64Max {
return nil, fmt.Errorf("could not open transaction: %w", ErrIntegerOverflow)
}
id := this.transID id := this.transID
this.transID ++
trans := &transA { trans := &transA {
parent: this, parent: this,
id: id, id: id,
incoming: usync.NewGate[incomingMessage](), incoming: usync.NewGate[incomingMessage](),
} }
this.transMap[id] = trans this.transMap[id] = trans
if this.transID == int64Max { if this.party == ClientSide {
return nil, fmt.Errorf("could not open transaction: %w", ErrIntegerOverflow)
}
this.transID ++ this.transID ++
} else {
this.transID --
}
return trans, nil return trans, nil
} }
func (this *a) AcceptTrans() (Trans, error) { func (this *a) AcceptTrans() (Trans, error) {
eof := fmt.Errorf("could not accept transaction: %w", io.EOF)
select { select {
case trans := <- this.transChan: case trans := <- this.transChan:
if trans == nil {
return nil, eof
}
return trans, nil return trans, nil
case <- this.done: case <- this.done:
return nil, fmt.Errorf("could not accept transaction: %w", io.EOF) return nil, eof
} }
} }
func (this *a) SetSizeLimit(limit int64) {
this.sizeLimit = limit
}
func (this *a) unlistTransactionSafe(id int64) { func (this *a) unlistTransactionSafe(id int64) {
this.transLock.Lock() this.transLock.Lock()
defer this.transLock.Unlock() defer this.transLock.Unlock()
@ -96,27 +122,32 @@ func (this *a) unlistTransactionSafe(id int64) {
func (this *a) sendMessageSafe(trans int64, method uint16, data []byte) error { func (this *a) sendMessageSafe(trans int64, method uint16, data []byte) error {
this.sendLock.Lock() this.sendLock.Lock()
defer this.sendLock.Unlock() defer this.sendLock.Unlock()
return encodeMessageA(this.underlying, trans, method, data) return encodeMessageA(this.underlying, this.sizeLimit, trans, method, data)
} }
func (this *a) receive() { func (this *a) receive() {
defer func() { defer func() {
this.underlying.Close() this.underlying.Close()
close(this.transChan)
this.transLock.Lock() this.transLock.Lock()
defer this.transLock.Unlock() defer this.transLock.Unlock()
for _, trans := range this.transMap { for _, trans := range this.transMap {
trans.closeDontUnlist() trans.closeDontUnlist()
} }
clear(this.transMap) clear(this.transMap)
this.underlying.Close()
}() }()
// receive MMBs in a loop and forward them to transactions until shit
// starts closing
for { for {
transID, method, payload, err := decodeMessageA(this.underlying) transID, method, chunked, payload, err := decodeMessageA(this.underlying, this.sizeLimit)
if err != nil { if err != nil {
this.err = fmt.Errorf("could not receive message: %w", err) this.err = fmt.Errorf("could not receive message: %w", err)
return return
} }
err = this.receiveMultiplex(transID, method, payload) err = this.multiplexMMB(transID, method, chunked, payload)
if err != nil { if err != nil {
this.err = fmt.Errorf("could not receive message: %w", err) this.err = fmt.Errorf("could not receive message: %w", err)
return return
@ -124,7 +155,7 @@ func (this *a) receive() {
} }
} }
func (this *a) receiveMultiplex(transID int64, method uint16, payload []byte) error { func (this *a) multiplexMMB(transID int64, method uint16, chunked bool, payload []byte) error {
if transID == 0 { return ErrMessageMalformed } if transID == 0 { return ErrMessageMalformed }
trans, err := func() (*transA, error) { trans, err := func() (*transA, error) {
@ -133,6 +164,12 @@ func (this *a) receiveMultiplex(transID int64, method uint16, payload []byte) er
trans, ok := this.transMap[transID] trans, ok := this.transMap[transID]
if !ok { if !ok {
// check if this is a superfluous close message and just
// do nothing if so
if method == closeMethod {
return nil, nil
}
// it is forbidden for the other party to initiate a transaction // it is forbidden for the other party to initiate a transaction
// with an ID from this party // with an ID from this party
if this.party == partyFromTransID(transID) { if this.party == partyFromTransID(transID) {
@ -150,28 +187,49 @@ func (this *a) receiveMultiplex(transID int64, method uint16, payload []byte) er
}() }()
if err != nil { return err } if err != nil { return err }
trans.incoming.Send(incomingMessage { if trans == nil {
method: method,
payload: payload,
})
return nil return nil
} }
if method == closeMethod {
return trans.Close()
} else {
trans.incoming.Send(incomingMessage {
method: method,
chunked: chunked,
payload: payload,
})
}
return nil
}
// most methods in transA don't need to be goroutine safe except those marked
// as such
type transA struct { type transA struct {
parent *a parent *a
id int64 id int64
incoming usync.Gate[incomingMessage] incoming usync.Gate[incomingMessage]
currentReader io.Reader
currentWriter io.Closer
writeBuffer []byte
closed atomic.Bool
} }
func (this *transA) Close() error { func (this *transA) Close() error {
// MUST be goroutine safe
err := this.closeDontUnlist() err := this.closeDontUnlist()
this.parent.unlistTransactionSafe(this.ID()) this.parent.unlistTransactionSafe(this.ID())
return err return err
} }
func (this *transA) closeDontUnlist() error { func (this *transA) closeDontUnlist() (err error) {
this.Send(closeMethod, nil) // MUST be goroutine safe
return this.incoming.Close() this.incoming.Close()
if !this.closed.Load() {
err = this.Send(closeMethod, nil)
}
this.closed.Store(true)
return err
} }
func (this *transA) ID() int64 { func (this *transA) ID() int64 {
@ -182,58 +240,213 @@ func (this *transA) Send(method uint16, data []byte) error {
return this.parent.sendMessageSafe(this.id, method, data) return this.parent.sendMessageSafe(this.id, method, data)
} }
func (this *transA) SendWriter(method uint16) (io.WriteCloser, error) {
// close previous writer if necessary
if this.currentWriter != nil {
this.currentWriter.Close()
this.currentWriter = nil
}
// create new writer
writer := &writerA {
parent: this,
// there is only ever one writer at a time, so they can all
// share a buffer
buffer: this.writeBuffer[:0],
method: method,
chunkSize: defaultChunkSize,
open: true,
}
this.currentWriter = writer
return writer, nil
}
func (this *transA) Receive() (method uint16, data []byte, err error) { func (this *transA) Receive() (method uint16, data []byte, err error) {
receive := this.incoming.Receive() method, reader, err := this.ReceiveReader()
if err != nil { return 0, nil, err }
data, err = io.ReadAll(reader)
if err != nil { return 0, nil, err }
return method, data, nil
}
func (this *transA) ReceiveReader() (uint16, io.Reader, error) {
// if the transaction has been closed, return an io.EOF
if this.closed.Load() {
return 0, nil, io.EOF
}
// drain previous reader if necessary
if this.currentReader != nil {
io.Copy(io.Discard, this.currentReader)
}
// create new reader
reader := &readerA {
parent: this,
}
method, err := reader.pull()
if err != nil { return 0, nil, err}
this.currentReader = reader
return method, reader, nil
}
type readerA struct {
parent *transA
leftover []byte
eof bool
}
// pull pulls the next MMB in this message from the transaction.
func (this *readerA) pull() (uint16, error) {
// if the previous message ended the chain, return an io.EOF
if this.eof {
return 0, io.EOF
}
// get an MMB from the transaction we are a part of
receive := this.parent.incoming.Receive()
if receive != nil { if receive != nil {
if message, ok := <- receive; ok { if message, ok := <- receive; ok {
if message.method != closeMethod { if message.method != closeMethod {
return message.method, message.payload, nil this.leftover = append(this.leftover, message.payload...)
if !message.chunked {
this.eof = true
}
return message.method, nil
} }
} }
} }
// close and return error on failure // close and return error on failure
this.Close() this.eof = true
if this.parent.err == nil { this.parent.Close()
return 0, nil, fmt.Errorf("could not receive message: %w", io.EOF) if this.parent.parent.err == nil {
return 0, fmt.Errorf("could not receive message: %w", io.EOF)
} else { } else {
return 0, nil, this.parent.err return 0, this.parent.parent.err
} }
} }
func (this *readerA) Read(buffer []byte) (int, error) {
if len(this.leftover) == 0 {
if this.eof { return 0, io.EOF }
this.pull()
}
copied := copy(buffer, this.leftover)
this.leftover = this.leftover[copied:]
return copied, nil
}
type writerA struct {
parent *transA
buffer []byte
method uint16
chunkSize int64
open bool
}
func (this *writerA) Write(data []byte) (n int, err error) {
if !this.open { return 0, io.EOF }
toSend := data
for len(toSend) > 0 {
nn, err := this.writeOne(toSend)
n += nn
toSend = toSend[nn:]
if err != nil { return n, err }
}
return n, nil
}
func (this *writerA) Close() error {
this.open = false
return nil
}
func (this *writerA) writeOne(data []byte) (n int, err error) {
data = data[:min(len(data), int(this.chunkSize))]
// if there is more room, append to the buffer and exit
if int64(len(this.buffer) + len(data)) <= this.chunkSize {
this.buffer = append(this.buffer, data...)
n = len(data)
// if have a full chunk, flush
if int64(len(this.buffer)) == this.chunkSize {
err = this.flush()
if err != nil { return n, err }
}
return n, nil
}
// if not, flush and store as much as we can in the buffer
err = this.flush()
if err != nil { return n, err }
this.buffer = append(this.buffer, data...)
return n, nil
}
func (this *writerA) flush() error {
return this.parent.parent.sendMessageSafe(this.parent.id, this.method, this.buffer)
}
type incomingMessage struct { type incomingMessage struct {
method uint16 method uint16
chunked bool
payload []byte payload []byte
} }
func encodeMessageA(writer io.Writer, trans int64, method uint16, data []byte) error { func encodeMessageA(
buffer := make([]byte, 12 + len(data)) writer io.Writer,
sizeLimit int64,
trans int64,
method uint16,
data []byte,
) error {
if int64(len(data)) > sizeLimit {
return ErrPayloadTooLarge
}
buffer := make([]byte, 18 + len(data))
tape.EncodeI64(buffer[:8], trans) tape.EncodeI64(buffer[:8], trans)
tape.EncodeI16(buffer[8:10], method) tape.EncodeI16(buffer[8:10], method)
length, ok := tape.U16CastSafe(len(data)) tape.EncodeI64(buffer[10:18], uint64(len(data)))
if !ok { return ErrPayloadTooLarge } copy(buffer[18:], data)
tape.EncodeI16(buffer[10:12], length)
copy(buffer[12:], data)
_, err := writer.Write(buffer) _, err := writer.Write(buffer)
return err return err
} }
func decodeMessageA(reader io.Reader) (int64, uint16, []byte, error) { func decodeMessageA(
headerBuffer := [12]byte { } reader io.Reader,
_, err := io.ReadFull(reader, headerBuffer[:]) sizeLimit int64,
if err != nil { return 0, 0, nil, err } ) (
transID, err := tape.DecodeI64[int64](headerBuffer[:8]) transID int64,
if err != nil { return 0, 0, nil, err } method uint16,
method, err := tape.DecodeI16[uint16](headerBuffer[8:10]) chunked bool,
if err != nil { return 0, 0, nil, err } payloadBuffer []byte,
length, err := tape.DecodeI16[uint16](headerBuffer[10:12]) err error,
if err != nil { return 0, 0, nil, err } ) {
payloadBuffer := make([]byte, int(length)) headerBuffer := [18]byte { }
_, err = io.ReadFull(reader, headerBuffer[:])
if err != nil { return 0, 0, false, nil, err }
transID, err = tape.DecodeI64[int64](headerBuffer[:8])
if err != nil { return 0, 0, false, nil, err }
method, err = tape.DecodeI16[uint16](headerBuffer[8:10])
if err != nil { return 0, 0, false, nil, err }
size, err := tape.DecodeI64[uint64](headerBuffer[10:18])
if err != nil { return 0, 0, false, nil, err }
chunked, size = splitCCBSize(size)
if size > uint64(sizeLimit) {
return 0, 0, false, nil, ErrPayloadTooLarge
}
payloadBuffer = make([]byte, int(size))
_, err = io.ReadFull(reader, payloadBuffer) _, err = io.ReadFull(reader, payloadBuffer)
if err != nil { return 0, 0, nil, err } if err != nil { return 0, 0, false, nil, err }
return transID, method, payloadBuffer, nil return transID, method, chunked, payloadBuffer, nil
} }
func partyFromTransID(id int64) Party { func partyFromTransID(id int64) Party {
return id > 0 return id > 0
} }
func splitCCBSize(size uint64) (bool, uint64) {
return size >> 63 > 1, size & 0x7FFFFFFFFFFFFFFF
}

View File

@ -25,47 +25,18 @@ func TestConnA(test *testing.T) {
"When the impostor is sus!", "When the impostor is sus!",
} }
network := "tcp" clientFunc := func(a Conn) {
addr := "localhost:7959"
// server
listener, err := net.Listen(network, addr)
if err != nil { test.Fatal(err) }
defer listener.Close()
go func() {
test.Log("SERVER listening")
conn, err := listener.Accept()
if err != nil { test.Error("SERVER", err); return }
defer conn.Close()
a := AdaptA(conn, ServerSide)
trans, err := a.OpenTrans()
if err != nil { test.Error("SERVER", err); return }
defer trans.Close()
for method, payload := range payloads {
test.Log("SERVER", method, payload)
err := trans.Send(uint16(method), []byte(payload))
if err != nil { test.Error("SERVER", err); return }
}
}()
// client
test.Log("CLIENT dialing")
conn, err := net.Dial(network, addr)
if err != nil { test.Fatal("CLIENT", err) }
test.Log("CLIENT dialed")
a := AdaptA(conn, ClientSide)
defer a.Close()
test.Log("CLIENT accepting transaction") test.Log("CLIENT accepting transaction")
trans, err := a.AcceptTrans() trans, err := a.AcceptTrans()
if err != nil { test.Fatal("CLIENT", err) } if err != nil { test.Fatal("CLIENT", err) }
test.Log("CLIENT accepted transaction") test.Log("CLIENT accepted transaction")
defer trans.Close() test.Cleanup(func() { trans.Close() })
for method, payload := range payloads { for method, payload := range payloads {
test.Log("CLIENT waiting...") test.Log("CLIENT waiting...")
gotMethod, gotPayloadBytes, err := trans.Receive() gotMethod, gotPayloadBytes, err := trans.Receive()
if err != nil { test.Fatal("CLIENT", err) } if err != nil { test.Fatal("CLIENT", err) }
gotPayload := string(gotPayloadBytes) gotPayload := string(gotPayloadBytes)
test.Log("CLIENT", gotMethod, gotPayload) test.Log("CLIENT m:", gotMethod, "p:", gotPayload)
if int(gotMethod) != method { if int(gotMethod) != method {
test.Errorf("CLIENT method not equal") test.Errorf("CLIENT method not equal")
} }
@ -73,22 +44,112 @@ func TestConnA(test *testing.T) {
test.Errorf("CLIENT payload not equal") test.Errorf("CLIENT payload not equal")
} }
} }
_, _, err = trans.Receive() test.Log("CLIENT waiting for transaction close...")
gotMethod, gotPayload, err := trans.Receive()
if !errors.Is(err, io.EOF) { if !errors.Is(err, io.EOF) {
test.Fatal("CLIENT wrong error:", err) test.Error("CLIENT wrong error:", err)
test.Error("CLIENT method:", gotMethod)
test.Error("CLIENT payload:", gotPayload)
test.Fatal("CLIENT ok byeeeeeeeeeeeee")
} }
test.Log("CLIENT done") }
// TODO test error from trans/connection closed by other side
serverFunc := func(a Conn) {
trans, err := a.OpenTrans()
if err != nil { test.Error("SERVER", err); return }
test.Cleanup(func() { trans.Close() })
for method, payload := range payloads {
test.Log("SERVER m:", method, "p:", payload)
err := trans.Send(uint16(method), []byte(payload))
if err != nil { test.Error("SERVER", err); return }
}
test.Log("SERVER closing connection")
}
clientServerEnvironment(test, clientFunc, serverFunc)
}
func TestTransOpenCloseA(test *testing.T) {
// currently:
//
// | data sent | data recvd | close sent | close recvd
// 10 | X | X | X | server hangs
// 20 | X | X | X | client hangs
// 30 | X | | X |
//
// when a close message is recvd, it tries to push to the trans and
// hangs on trans.incoming.Send, which hangs on sending the value to the
// underlying channel. why is this?
//
// check if we are really getting values from the channel when pulling
// from the trans channel when we are expecting a close.
clientFunc := func(conn Conn) {
// 10
trans, err := conn.OpenTrans()
if err != nil { test.Error("CLIENT", err); return }
test.Log("CLIENT sending 10")
trans.Send(10, []byte("hi"))
trans.Close()
// 20
test.Log("CLIENT awaiting 20")
trans, err = conn.AcceptTrans()
if err != nil { test.Error("CLIENT", err); return }
test.Cleanup(func() { trans.Close() })
gotMethod, gotPayload, err := trans.Receive()
if err != nil { test.Error("CLIENT", err); return }
test.Logf("CLIENT m: %d p: %s", gotMethod, gotPayload)
if gotMethod != 20 { test.Error("CLIENT wrong method")}
// 30
trans, err = conn.OpenTrans()
if err != nil { test.Error("CLIENT", err); return }
test.Log("CLIENT sending 30")
trans.Send(30, []byte("good"))
trans.Close()
}
serverFunc := func(conn Conn) {
// 10
test.Log("SERVER awaiting 10")
trans, err := conn.AcceptTrans()
if err != nil { test.Error("SERVER", err); return }
test.Cleanup(func() { trans.Close() })
gotMethod, gotPayload, err := trans.Receive()
if err != nil { test.Error("SERVER", err); return }
test.Logf("SERVER m: %d p: %s", gotMethod, gotPayload)
if gotMethod != 10 { test.Error("SERVER wrong method")}
// 20
trans, err = conn.OpenTrans()
if err != nil { test.Error("SERVER", err); return }
test.Log("SERVER sending 20")
trans.Send(20, []byte("hi how r u"))
trans.Close()
// 30
test.Log("SERVER awaiting 30")
trans, err = conn.AcceptTrans()
if err != nil { test.Error("SERVER", err); return }
test.Cleanup(func() { trans.Close() })
gotMethod, gotPayload, err = trans.Receive()
if err != nil { test.Error("SERVER", err); return }
test.Logf("SERVER m: %d p: %s", gotMethod, gotPayload)
if gotMethod != 30 { test.Error("SERVER wrong method")}
}
clientServerEnvironment(test, clientFunc, serverFunc)
} }
func TestEncodeMessageA(test *testing.T) { func TestEncodeMessageA(test *testing.T) {
buffer := new(bytes.Buffer) buffer := new(bytes.Buffer)
payload := []byte { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05 } payload := []byte { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05 }
err := encodeMessageA(buffer, 0x5800FEABC3104F04, 0x6B12, payload) err := encodeMessageA(buffer, defaultSizeLimit, 0x5800FEABC3104F04, 0x6B12, payload)
correct := []byte { correct := []byte {
0x58, 0x00, 0xFE, 0xAB, 0xC3, 0x10, 0x4F, 0x04, 0x58, 0x00, 0xFE, 0xAB, 0xC3, 0x10, 0x4F, 0x04,
0x6B, 0x12, 0x6B, 0x12,
0x00, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06,
0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
} }
if err != nil { if err != nil {
@ -102,19 +163,19 @@ func TestEncodeMessageA(test *testing.T) {
func TestEncodeMessageAErr(test *testing.T) { func TestEncodeMessageAErr(test *testing.T) {
buffer := new(bytes.Buffer) buffer := new(bytes.Buffer)
payload := make([]byte, 0x10000) payload := make([]byte, 0x10000)
err := encodeMessageA(buffer, 0x5800FEABC3104F04, 0x6B12, payload) err := encodeMessageA(buffer, 0x20, 0x5800FEABC3104F04, 0x6B12, payload)
if !errors.Is(err, ErrPayloadTooLarge) { if !errors.Is(err, ErrPayloadTooLarge) {
test.Fatalf("wrong error: %v", err) test.Fatalf("wrong error: %v", err)
} }
} }
func TestDecodeMessageA(test *testing.T) { func TestDecodeMessageA(test *testing.T) {
transID, method, payload, err := decodeMessageA(bytes.NewReader([]byte { transID, method, _, payload, err := decodeMessageA(bytes.NewReader([]byte {
0x58, 0x00, 0xFE, 0xAB, 0xC3, 0x10, 0x4F, 0x04, 0x58, 0x00, 0xFE, 0xAB, 0xC3, 0x10, 0x4F, 0x04,
0x6B, 0x12, 0x6B, 0x12,
0x00, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06,
0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
})) }), defaultSizeLimit)
if err != nil { if err != nil {
test.Fatal(err) test.Fatal(err)
} }
@ -131,13 +192,76 @@ func TestDecodeMessageA(test *testing.T) {
} }
func TestDecodeMessageAErr(test *testing.T) { func TestDecodeMessageAErr(test *testing.T) {
_, _, _, err := decodeMessageA(bytes.NewReader([]byte { _, _, _, _, err := decodeMessageA(bytes.NewReader([]byte {
0x58, 0x00, 0xFE, 0xAB, 0xC3, 0x10, 0x4F, 0x04, 0x58, 0x00, 0xFE, 0xAB, 0xC3, 0x10, 0x4F, 0x04,
0x6B, 0x12, 0x6B, 0x12,
0x01, 0x06, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x06,
0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
})) }), defaultSizeLimit)
if !errors.Is(err, io.ErrUnexpectedEOF) { if !errors.Is(err, io.ErrUnexpectedEOF) {
test.Fatalf("wrong error: %v", err) test.Fatalf("wrong error: %v", err)
} }
} }
func TestEncodeDecodeMessageA(test *testing.T) {
correctTransID := int64(2)
correctMethod := uint16(30)
correctPayload := []byte("good")
buffer := bytes.Buffer { }
err := encodeMessageA(&buffer, defaultSizeLimit, correctTransID, correctMethod, correctPayload)
if err != nil { test.Fatal(err) }
transID, method, chunked, payload, err := decodeMessageA(&buffer, defaultSizeLimit)
if got, correct := transID, int64(2); got != correct {
test.Fatalf("not equal: %v %v", got, correct)
}
if got, correct := method, uint16(30); got != correct {
test.Fatalf("not equal: %v %v", got, correct)
}
if chunked {
test.Fatalf("message should not be chunked")
}
if got, correct := payload, correctPayload; !slices.Equal(got, correct) {
test.Fatalf("not equal: %v %v", got, correct)
}
}
func clientServerEnvironment(test *testing.T, clientFunc func(conn Conn), serverFunc func(conn Conn)) {
network := "tcp"
addr := "localhost:7959"
// server
listener, err := net.Listen(network, addr)
if err != nil { test.Fatal(err) }
test.Cleanup(func() { listener.Close() })
go func() {
test.Log("SERVER listening")
conn, err := listener.Accept()
if err != nil { test.Error("SERVER", err); return }
defer conn.Close()
test.Cleanup(func() { conn.Close() })
a := AdaptA(conn, ServerSide)
test.Cleanup(func() { a.Close() })
serverFunc(a)
test.Log("SERVER closing")
}()
// client
test.Log("CLIENT dialing")
conn, err := net.Dial(network, addr)
if err != nil { test.Fatal("CLIENT", err) }
test.Log("CLIENT dialed")
a := AdaptA(conn, ClientSide)
test.Cleanup(func() { a.Close() })
clientFunc(a)
test.Log("CLIENT waiting for connection close...")
trans, err := a.AcceptTrans()
if !errors.Is(err, io.EOF) {
test.Error("CLIENT wrong error:", err)
test.Fatal("CLIENT trans:", trans)
}
test.Log("CLIENT DONE")
conn.Close()
}

View File

@ -2,19 +2,23 @@ package hopp
import "io" import "io"
import "net" import "net"
import "bytes"
import "errors"
import "context" import "context"
import "git.tebibyte.media/sashakoshka/hopp/tape" import "git.tebibyte.media/sashakoshka/hopp/tape"
// B implements METADAPT-B over a multiplexed stream-oriented transport such as // B implements METADAPT-B over a multiplexed stream-oriented transport such as
// QUIC. // QUIC.
type b struct { type b struct {
sizeLimit int64
underlying MultiConn underlying MultiConn
} }
// AdaptB returns a connection implementing METADAPT-B over a singular stream- // AdaptB returns a connection implementing METADAPT-B over a multiplexed
// oriented transport such as TCP or UNIX domain stream sockets. // stream-oriented transport such as QUIC.
func AdaptB(underlying MultiConn) Conn { func AdaptB(underlying MultiConn) Conn {
return &b { return &b {
sizeLimit: defaultSizeLimit,
underlying: underlying, underlying: underlying,
} }
} }
@ -34,33 +38,105 @@ func (this *b) RemoteAddr() net.Addr {
func (this *b) OpenTrans() (Trans, error) { func (this *b) OpenTrans() (Trans, error) {
stream, err := this.underlying.OpenStream() stream, err := this.underlying.OpenStream()
if err != nil { return nil, err } if err != nil { return nil, err }
return transB { underlying: stream }, nil return this.newTrans(stream), nil
} }
func (this *b) AcceptTrans() (Trans, error) { func (this *b) AcceptTrans() (Trans, error) {
stream, err := this.underlying.AcceptStream(context.Background()) stream, err := this.underlying.AcceptStream(context.Background())
if err != nil { return nil, err } if err != nil { return nil, err }
return transB { underlying: stream }, nil return this.newTrans(stream), nil
}
func (this *b) SetSizeLimit(limit int64) {
this.sizeLimit = limit
}
func (this *b) newTrans(underlying Stream) *transB {
return &transB {
sizeLimit: this.sizeLimit,
underlying: underlying,
}
} }
type transB struct { type transB struct {
sizeLimit int64
underlying Stream underlying Stream
currentData io.Reader
currentWriter *writerB
} }
func (trans transB) Close() error { func (this *transB) Close() error {
return trans.underlying.Close() return this.underlying.Close()
} }
func (trans transB) ID() int64 { func (this *transB) ID() int64 {
return trans.underlying.ID() return this.underlying.ID()
} }
func (trans transB) Send(method uint16, data []byte) error { func (this *transB) Send(method uint16, data []byte) error {
return encodeMessageB(trans.underlying, method, data) return encodeMessageB(this.underlying, this.sizeLimit, method, data)
} }
func (trans transB) Receive() (uint16, []byte, error) { func (this *transB) SendWriter(method uint16) (io.WriteCloser, error) {
return decodeMessageB(trans.underlying) if this.currentWriter != nil {
this.currentWriter.Close()
}
// TODO: come up with a fix that allows us to pipe data through the
// writer. as of now, it just reads whatever is written into a buffer
// and sends the message on close. we should probably introduce chunked
// encoding to METADAPT-B to fix this. the implementation would be
// simpler than on METADAPT-A, but most of the code could just be
// copied over.
writer := &writerB {
parent: this,
method: method,
}
this.currentWriter = writer
return writer, nil
}
func (this *transB) Receive() (uint16, []byte, error) {
// get a reader for the next message
method, size, data, err := this.receiveReader()
if err != nil { return 0, nil, err }
// read the entire thing
payloadBuffer := make([]byte, int(size))
_, err = io.ReadFull(data, payloadBuffer)
if err != nil { return 0, nil, err }
// we have used up the reader by now so we can forget it exists
this.currentData = nil
return method, payloadBuffer, nil
}
func (this *transB) ReceiveReader() (uint16, io.Reader, error) {
method, _, data, err := this.receiveReader()
return method, data, err
}
func (this *transB) receiveReader() (uint16, int64, io.Reader, error) {
// decode the message
method, size, data, err := decodeMessageB(this.underlying, this.sizeLimit)
if err != nil { return 0, 0, nil, err }
// discard current reader if there is one
if this.currentData == nil {
io.Copy(io.Discard, this.currentData)
}
this.currentData = data
return method, size, data, nil
}
type writerB struct {
parent *transB
buffer bytes.Buffer
method uint16
}
func (this *writerB) Write(data []byte) (int, error) {
return this.buffer.Write(data)
}
func (this *writerB) Close() error {
return this.parent.Send(this.method, this.buffer.Bytes())
} }
// MultiConn represens a multiplexed stream-oriented transport for use in // MultiConn represens a multiplexed stream-oriented transport for use in
@ -84,27 +160,42 @@ type Stream interface {
ID() int64 ID() int64
} }
func encodeMessageB(writer io.Writer, method uint16, data []byte) error { func encodeMessageB(writer io.Writer, sizeLimit int64, method uint16, data []byte) error {
buffer := make([]byte, 4 + len(data)) if int64(len(data)) > sizeLimit {
return ErrPayloadTooLarge
}
buffer := make([]byte, 10 + len(data))
tape.EncodeI16(buffer[:2], method) tape.EncodeI16(buffer[:2], method)
length, ok := tape.U16CastSafe(len(data)) tape.EncodeI64(buffer[2:10], uint64(len(data)))
if !ok { return ErrPayloadTooLarge } copy(buffer[10:], data)
tape.EncodeI16(buffer[2:4], length)
copy(buffer[4:], data)
_, err := writer.Write(buffer) _, err := writer.Write(buffer)
return err return err
} }
func decodeMessageB(reader io.Reader) (uint16, []byte, error) { func decodeMessageB(
headerBuffer := [4]byte { } reader io.Reader,
_, err := io.ReadFull(reader, headerBuffer[:]) sizeLimit int64,
if err != nil { return 0, nil, err } ) (
method, err := tape.DecodeI16[uint16](headerBuffer[:2]) method uint16,
if err != nil { return 0, nil, err } size int64,
length, err := tape.DecodeI16[uint16](headerBuffer[2:4]) data io.Reader,
if err != nil { return 0, nil, err } err error,
payloadBuffer := make([]byte, int(length)) ) {
_, err = io.ReadFull(reader, payloadBuffer) headerBuffer := [10]byte { }
if err != nil { return 0, nil, err } _, err = io.ReadFull(reader, headerBuffer[:])
return method, payloadBuffer, nil if err != nil {
if errors.Is(err, io.EOF) { return 0, 0, nil, io.ErrUnexpectedEOF }
return 0, 0, nil, err
}
method, err = tape.DecodeI16[uint16](headerBuffer[:2])
if err != nil { return 0, 0, nil, err }
length, err := tape.DecodeI64[uint64](headerBuffer[2:10])
if err != nil { return 0, 0, nil, err }
if length > uint64(sizeLimit) {
return 0, 0, nil, ErrPayloadTooLarge
}
return method, int64(length), &io.LimitedReader {
R: reader,
N: int64(length),
}, nil
} }

View File

@ -9,9 +9,9 @@ import "testing"
func TestEncodeMessageB(test *testing.T) { func TestEncodeMessageB(test *testing.T) {
buffer := new(bytes.Buffer) buffer := new(bytes.Buffer)
payload := []byte { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05 } payload := []byte { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05 }
err := encodeMessageB(buffer, 0x6B12, payload) err := encodeMessageB(buffer, defaultSizeLimit, 0x6B12, payload)
correct := []byte { correct := []byte {
0x6B, 0x12, 0x6B, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x06, 0x00, 0x06,
0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
} }
@ -26,24 +26,25 @@ func TestEncodeMessageB(test *testing.T) {
func TestEncodeMessageBErr(test *testing.T) { func TestEncodeMessageBErr(test *testing.T) {
buffer := new(bytes.Buffer) buffer := new(bytes.Buffer)
payload := make([]byte, 0x10000) payload := make([]byte, 0x10000)
err := encodeMessageB(buffer, 0x6B12, payload) err := encodeMessageB(buffer, 255, 0x6B12, payload)
if !errors.Is(err, ErrPayloadTooLarge) { if !errors.Is(err, ErrPayloadTooLarge) {
test.Fatalf("wrong error: %v", err) test.Fatalf("wrong error: %v", err)
} }
} }
func TestDecodeMessageB(test *testing.T) { func TestDecodeMessageB(test *testing.T) {
method, payload, err := decodeMessageB(bytes.NewReader([]byte { method, _, data, err := decodeMessageB(bytes.NewReader([]byte {
0x6B, 0x12, 0x6B, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x06, 0x00, 0x06,
0x00, 0x01, 0x02, 0x03, 0x04, 0x05, 0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
})) }), defaultSizeLimit)
if err != nil { if err != nil {
test.Fatal(err) test.Fatal(err)
} }
if got, correct := method, uint16(0x6B12); got != correct { if got, correct := method, uint16(0x6B12); got != correct {
test.Fatalf("not equal: %v %v", got, correct) test.Fatalf("not equal: %v %v", got, correct)
} }
payload, _ := io.ReadAll(data)
correctPayload := []byte { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05 } correctPayload := []byte { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05 }
if got, correct := payload, correctPayload; !slices.Equal(got, correct) { if got, correct := payload, correctPayload; !slices.Equal(got, correct) {
test.Fatalf("not equal: %v %v", got, correct) test.Fatalf("not equal: %v %v", got, correct)
@ -51,11 +52,9 @@ func TestDecodeMessageB(test *testing.T) {
} }
func TestDecodeMessageBErr(test *testing.T) { func TestDecodeMessageBErr(test *testing.T) {
_, _, err := decodeMessageB(bytes.NewReader([]byte { _, _, _, err := decodeMessageB(bytes.NewReader([]byte {
0x6B, 0x12, 0x6B, 0x12, 0x00, 0x00, 0x00, 0x00,
0x01, 0x06, }), defaultSizeLimit)
0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
}))
if !errors.Is(err, io.ErrUnexpectedEOF) { if !errors.Is(err, io.ErrUnexpectedEOF) {
test.Fatalf("wrong error: %v", err) test.Fatalf("wrong error: %v", err)
} }

View File

@ -1,54 +0,0 @@
package hopp
import "net"
import "context"
import "github.com/quic-go/quic-go"
var _ MultiConn = quicMultiConn { }
type quicMultiConn struct {
underlying quic.Connection
}
func (conn quicMultiConn) Close() error {
return conn.underlying.CloseWithError(0, "good bye")
}
func (conn quicMultiConn) LocalAddr() net.Addr {
return conn.underlying.LocalAddr()
}
func (conn quicMultiConn) RemoteAddr() net.Addr {
return conn.underlying.RemoteAddr()
}
func (conn quicMultiConn) AcceptStream(ctx context.Context) (Stream, error) {
strea, err := conn.underlying.AcceptStream(ctx)
if err != nil { return nil, err }
return quicStream { underlying: strea }, nil
}
func (conn quicMultiConn) OpenStream() (Stream, error) {
strea, err := conn.underlying.OpenStream()
if err != nil { return nil, err }
return quicStream { underlying: strea }, nil
}
type quicStream struct {
underlying quic.Stream
}
func (strea quicStream) Read(buffer []byte) (n int, err error) {
return strea.underlying.Read(buffer)
}
func (strea quicStream) Write(buffer []byte) (n int, err error) {
return strea.underlying.Read(buffer)
}
func (strea quicStream) Close() error {
return strea.underlying.Close()
}
func (strea quicStream) ID() int64 {
return int64(strea.underlying.StreamID())
}