After the week I've had, I deserve to make a commit like this lmao
This commit is contained in:
		
							parent
							
								
									6f55ee1b45
								
							
						
					
					
						commit
						68961f8ad8
					
				| @ -6,7 +6,7 @@ import "context" | |||||||
| // Conn is a HOPP connection. | // Conn is a HOPP connection. | ||||||
| type Conn interface { | type Conn interface { | ||||||
| 	io.Closer | 	io.Closer | ||||||
| 	OpenTrans(ctx context.Context) (Trans, error) | 	OpenTrans() (Trans, error) | ||||||
| 	AcceptTrans(ctx context.Context) (Trans, error) | 	AcceptTrans(ctx context.Context) (Trans, error) | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| @ -14,8 +14,8 @@ type Conn interface { | |||||||
| type Trans interface { | type Trans interface { | ||||||
| 	io.Closer | 	io.Closer | ||||||
| 	ID() int64 | 	ID() int64 | ||||||
| 	Send(ctx context.Context, method uint16, data []byte) error | 	// Send sends a message. | ||||||
| 	Receive(ctx context.Context) (method uint16, data []byte, err error) | 	Send(method uint16, data []byte) error | ||||||
| 	SendDatagram(ctx context.Context, method uint16, data []byte) error | 	// Receive receives a message. | ||||||
| 	ReceiveDatagram(ctx context.Context) (method uint16, data []byte, err error) | 	Receive() (method uint16, data []byte, err error) | ||||||
| } | } | ||||||
|  | |||||||
| @ -22,8 +22,8 @@ guaranteed. | |||||||
| 
 | 
 | ||||||
| The message payload must be 65,535 (unsigned 16-bit integer limit) octets or | The message payload must be 65,535 (unsigned 16-bit integer limit) octets or | ||||||
| smaller in length. This does not include the method code. Applications are free | smaller in length. This does not include the method code. Applications are free | ||||||
| to send whatever data they wish as the payload, but it should be encoded using | to send whatever data they wish as the payload, but TAPE is recommended for | ||||||
| TAPE. | encoding it. | ||||||
| 
 | 
 | ||||||
| Method codes should be written in upper-case base 16 with the prefix "M" in | Method codes should be written in upper-case base 16 with the prefix "M" in | ||||||
| logs, error messages, documentation, etc. For example, the method code 62,206 in | logs, error messages, documentation, etc. For example, the method code 62,206 in | ||||||
| @ -31,7 +31,7 @@ decimal would be written as MF4CE. The application may choose any method codes, | |||||||
| but groups of similar methods should be placed at consecutive intervals of | but groups of similar methods should be placed at consecutive intervals of | ||||||
| M0100. Method codes MFF00-MFFFF are reserved for use by HOPP and its constituent | M0100. Method codes MFF00-MFFFF are reserved for use by HOPP and its constituent | ||||||
| protocols. Individuals or entities with the SWAG (secret wheel access group) | protocols. Individuals or entities with the SWAG (secret wheel access group) | ||||||
| pass are also permitted to define their own methods in this range. I'm just | pass are also permitted to define their own methods within this range. I'm just | ||||||
| fucking with you. | fucking with you. | ||||||
| 
 | 
 | ||||||
| ## Table Pair Encoding (TAPE) | ## Table Pair Encoding (TAPE) | ||||||
| @ -45,12 +45,12 @@ enables backwards compatibile application protocol changes. | |||||||
| ### Table Structure | ### Table Structure | ||||||
| A table is divided into two sections: the header, and the values. The header | A table is divided into two sections: the header, and the values. The header | ||||||
| begins with the number (U16) of pairs in the table, which is then followed by | begins with the number (U16) of pairs in the table, which is then followed by | ||||||
| that many tag-offset pairs. A tag-offset pair consists of a numerical (U16) | that many tag-offset pairs. A tag-offset pair consists of a numerical (U16) tag, | ||||||
| tag, followed the position (U16) of the value relative to the start of the | followed the position (U16) of the value relative to the start of the values | ||||||
| values section. The values section contains the value data for each pair, | section. The values section contains the value data for each pair, where the | ||||||
| where the start of each value is determined by its offset, and the end is | start of each value is determined by its offset, and the end is determined by | ||||||
| determined by the offset of the next value, or the end of the message if there | the offset of the next value, or the end of the message if there is no value | ||||||
| is no value after it. | after it. | ||||||
| 
 | 
 | ||||||
| Both sections must be in the same order, and because of this, each value offset | Both sections must be in the same order, and because of this, each value offset | ||||||
| must be greater than or equal to the last. If a message has erratic structure | must be greater than or equal to the last. If a message has erratic structure | ||||||
| @ -60,19 +60,22 @@ only the erratic pairs, as well as the pairs directly before those. | |||||||
| ### Data Value Types | ### Data Value Types | ||||||
| The table below lists all data value types supported by TAPE. | The table below lists all data value types supported by TAPE. | ||||||
| 
 | 
 | ||||||
| | Name        | Size             | Description                 | Encoding Method | | Name        | Size            | Description                 | Encoding Method | ||||||
| | ------      | ---------------: | --------------------------- | --------------- | | ----------- | --------------: | --------------------------- | --------------- | ||||||
| | I8          |                1 | A signed 8-bit integer      | BETC | | I8          |               1 | A signed 8-bit integer      | BETC | ||||||
| | I16         |                2 | A signed 16-bit integer     | BETC | | I16         |               2 | A signed 16-bit integer     | BETC | ||||||
| | I32         |                4 | A signed 32-bit integer     | BETC | | I32         |               4 | A signed 32-bit integer     | BETC | ||||||
| | I64         |                8 | A signed 64-bit integer     | BETC | | I64         |               8 | A signed 64-bit integer     | BETC | ||||||
| | U8          |                1 | An unsigned 8-bit integer   | BEU | | U8          |               1 | An unsigned 8-bit integer   | BEU | ||||||
| | U16         |                2 | An unsigned 16-bit integer  | BEU | | U16         |               2 | An unsigned 16-bit integer  | BEU | ||||||
| | U32         |                4 | An unsigned 32-bit integer  | BEU | | U32         |               4 | An unsigned 32-bit integer  | BEU | ||||||
| | U64         |                8 | An unsigned 64-bit integer  | BEU | | U64         |               8 | An unsigned 64-bit integer  | BEU | ||||||
| | Array       |         Part-sum | An array of any above type  | PASTA | | Array       |         SOP[^1] | An array of any above type  | PASTA | ||||||
| | String      |              N/A | A UTF-8 string              | UTF-8 | | String      |             N/A | A UTF-8 string              | UTF-8 | ||||||
| | StringArray | N * 2 + Part-sum | An array the String type    | VILA | | StringArray | n * 2 + SOP[^1] | An array the String type    | VILA | ||||||
|  | 
 | ||||||
|  | [^1]: SOP (sum of parts) refers to the sum of the size of every item in a data | ||||||
|  | structure. | ||||||
| 
 | 
 | ||||||
| ### Encoding Methods | ### Encoding Methods | ||||||
| Below are all encoding methods supported by TAPE. | Below are all encoding methods supported by TAPE. | ||||||
| @ -88,9 +91,9 @@ octets which can fit all bits in the integer, regardless if the bits are on or | |||||||
| off. Therefore, the size cannot change at runtime. | off. Therefore, the size cannot change at runtime. | ||||||
| 
 | 
 | ||||||
| #### PASTA | #### PASTA | ||||||
| Packed Single-Type Array. The size is defined as at the size of an individual | Packed Single-Type Array. The size is defined as the size of an individual item | ||||||
| item times the number of items. Items are placed one after the other with no | times the number of items. Items are placed one after the other with no gaps | ||||||
| gaps in-between them, except as required to align the start of each item to the | in-between them, except as required to align the start of each item to the | ||||||
| nearest whole octet. Items should be of the same type and must be of the same | nearest whole octet. Items should be of the same type and must be of the same | ||||||
| size. | size. | ||||||
| 
 | 
 | ||||||
| @ -133,7 +136,7 @@ Internet. | |||||||
| ### METADAPT-A | ### METADAPT-A | ||||||
| METADAPT-A requires a transport which offers a single full-duplex data stream | METADAPT-A requires a transport which offers a single full-duplex data stream | ||||||
| that persists for the duration of the connection. All transactions are | that persists for the duration of the connection. All transactions are | ||||||
| multiplexed onto this single stream. Each MMB contains a 12-octet long header, | multiplexed onto this single stream. Each MMB contains a 8-octet long header, | ||||||
| with the transaction ID, then the method, and then the payload size (in octets). | with the transaction ID, then the method, and then the payload size (in octets). | ||||||
| The transaction ID is encoded as an I64, and the method and payload size are | The transaction ID is encoded as an I64, and the method and payload size are | ||||||
| both encoded as U16s. The remainder of the message is the payload. Since each | both encoded as U16s. The remainder of the message is the payload. Since each | ||||||
| @ -143,6 +146,15 @@ Transactions "open" when the first message with a given transaction ID is sent. | |||||||
| They "close" when a closing message is sent by either side. A closing message | They "close" when a closing message is sent by either side. A closing message | ||||||
| has method MFFFF and should not have a payload. | has method MFFFF and should not have a payload. | ||||||
| 
 | 
 | ||||||
|  | The ID of a given transaction is counted differently depending on from which end | ||||||
|  | of the connection the transaction in question initiated from. The client (the | ||||||
|  | party which initiated the connection) uses positive transaction IDs, while the | ||||||
|  | server (the party which accepted the connection) uses negative transaction IDs. | ||||||
|  | Transaction IDs must be unique within the connection, and if all IDs have been | ||||||
|  | used up, the connection must fail. Don't worry about this though, because the | ||||||
|  | sun will have expanded to swallow earth by then. Your connection will not last | ||||||
|  | that long. | ||||||
|  | 
 | ||||||
| ### METADAPT-B | ### METADAPT-B | ||||||
| METADAPT-B requires a transport which offers multiple multiplexed full-duplex | METADAPT-B requires a transport which offers multiple multiplexed full-duplex | ||||||
| data streams per connection that can be created and destroyed on-demand. Each | data streams per connection that can be created and destroyed on-demand. Each | ||||||
|  | |||||||
							
								
								
									
										50
									
								
								dial.go
									
									
									
									
									
								
							
							
						
						
									
										50
									
								
								dial.go
									
									
									
									
									
								
							| @ -1,7 +1,9 @@ | |||||||
| package hopp | package hopp | ||||||
| 
 | 
 | ||||||
|  | import "net" | ||||||
| import "context" | import "context" | ||||||
| import "crypto/tls" | import "crypto/tls" | ||||||
|  | import "github.com/quic-go/quic-go" | ||||||
| 
 | 
 | ||||||
| // TODO: dial should be super simple like it is now, and there should be a | // TODO: dial should be super simple like it is now, and there should be a | ||||||
| // "dialer" which the dial function dial configures automaticaly, but the dialer | // "dialer" which the dial function dial configures automaticaly, but the dialer | ||||||
| @ -29,24 +31,52 @@ func (diale Dialer) Dial(ctx context.Context, network, address string) (Conn, er | |||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (diale Dialer) dialQUIC(ctx context.Context, network, address string) (Conn, error) { | func (diale Dialer) dialQUIC(ctx context.Context, network, address string) (Conn, error) { | ||||||
| 	// TODO: dial a QUIC connection and return METADAPT-B wrapping it | 	udpNetwork, err := quicNetworkToUDPNetwork(network) | ||||||
|  | 	if err != nil { return nil, err } | ||||||
|  | 	addr, err := net.ResolveUDPAddr(udpNetwork, address) | ||||||
|  | 	if err != nil { return nil, err } | ||||||
|  | 	udpConn, err := net.DialUDP(udpNetwork, nil, addr) | ||||||
|  | 	if err != nil { return nil, err } | ||||||
|  | 	conn, err := quic.Dial(ctx, udpConn, addr, diale.tlsConfig(), diale.quicConfig()) | ||||||
|  | 	if err != nil { return nil, err } | ||||||
|  | 	return AdaptB(quicMultiConn { underlying: conn }), nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (diale Dialer) dialUnix(ctx context.Context, network, address string) (Conn, error) { | func (diale Dialer) dialUnix(ctx context.Context, network, address string) (Conn, error) { | ||||||
| 	if network != "unix" { return nil, ErrUnknownNetwork } | 	if network != "unix" { return nil, ErrUnknownNetwork } | ||||||
| 	// TODO: dial a unix stream connection and return METADAPT-A wrapping it | 	addr, err := net.ResolveUnixAddr(network, address) | ||||||
|  | 	if err != nil { return nil, err } | ||||||
|  | 	conn, err := net.DialUnix(network, nil, addr) | ||||||
|  | 	if err != nil { return nil, err } | ||||||
|  | 	// REMEMBER - THIS IS VERY IMPORTANT: | ||||||
|  | 	// WHEN YOU INEVITABLY COPY PASTE THIS FOR THE SERVER-SIDE, CHANGE THE | ||||||
|  | 	// PARTY CONSTANT TO ServerSide! OTHERWISE THERE WILL BE COLLISIONS! | ||||||
|  | 	return AdaptA(conn, ClientSide), nil | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| // addrStrs implements net.Addr | func (diale Dialer) tlsConfig() *tls.Config { | ||||||
| type addrStrs struct { | 	conf := diale.TLSConfig.Clone() | ||||||
| 	net  string | 	conf.NextProtos = []string { | ||||||
| 	addr string | 		"HOPP/0", | ||||||
|  | 	} | ||||||
|  | 	return conf | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (addr addrStrs) Network() string { | func (diale Dialer) quicConfig() *quic.Config { | ||||||
| 	return addr.net | 	return &quic.Config { | ||||||
|  | 		// TODO: perhaps we might want to put something here | ||||||
|  | 		// the quic config shouldn't be exported, just set up | ||||||
|  | 		// automatically. we can't have that strangely built quic-go | ||||||
|  | 		// package be part of the API, or any third-party packages for | ||||||
|  | 		// that matter. it must all be abstracted away. | ||||||
|  | 	} | ||||||
| } | } | ||||||
| 
 | 
 | ||||||
| func (addr addrStrs) String() string { | func quicNetworkToUDPNetwork(network string) (string, error) { | ||||||
| 	return addr.addr | 	switch network { | ||||||
|  | 	case "quic4": return "udp4", nil | ||||||
|  | 	case "quic6": return "udp6", nil | ||||||
|  | 	case "quic":  return "udp",  nil | ||||||
|  | 	default:      return "", ErrUnknownNetwork | ||||||
|  | 	} | ||||||
| } | } | ||||||
|  | |||||||
							
								
								
									
										2
									
								
								error.go
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								error.go
									
									
									
									
									
								
							| @ -6,6 +6,8 @@ type Error string; const ( | |||||||
| 	ErrUnknownMethod      Error = "unknown method" | 	ErrUnknownMethod      Error = "unknown method" | ||||||
| 	ErrPayloadTooLarge    Error = "payload too large" | 	ErrPayloadTooLarge    Error = "payload too large" | ||||||
| 	ErrUnknownNetwork     Error = "unknown network" | 	ErrUnknownNetwork     Error = "unknown network" | ||||||
|  | 	ErrIntegerOverflow    Error = "integer overflow" | ||||||
|  | 	ErrMessageMalformed   Error = "message is malformed" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| // Error implements the error interface. | // Error implements the error interface. | ||||||
|  | |||||||
							
								
								
									
										1
									
								
								go.mod
									
									
									
									
									
								
							
							
						
						
									
										1
									
								
								go.mod
									
									
									
									
									
								
							| @ -3,6 +3,7 @@ module git.tebibyte.media/sashakoshka/hopp | |||||||
| go 1.23.0 | go 1.23.0 | ||||||
| 
 | 
 | ||||||
| require ( | require ( | ||||||
|  | 	git.tebibyte.media/sashakoshka/go-util v0.8.0 | ||||||
| 	github.com/gomarkdown/markdown v0.0.0-20241205020045-f7e15b2f3e62 | 	github.com/gomarkdown/markdown v0.0.0-20241205020045-f7e15b2f3e62 | ||||||
| 	github.com/quic-go/quic-go v0.48.2 | 	github.com/quic-go/quic-go v0.48.2 | ||||||
| ) | ) | ||||||
|  | |||||||
							
								
								
									
										2
									
								
								go.sum
									
									
									
									
									
								
							
							
						
						
									
										2
									
								
								go.sum
									
									
									
									
									
								
							| @ -1,3 +1,5 @@ | |||||||
|  | git.tebibyte.media/sashakoshka/go-util v0.8.0 h1:XFuZ8HQkrnibrV016rso00geCFPatKpX4jxkIVhZPaQ= | ||||||
|  | git.tebibyte.media/sashakoshka/go-util v0.8.0/go.mod h1:0Q1t+PePdx6tFYkRuJNcpM1Mru7wE6X+it1kwuOH+6Y= | ||||||
| github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= | github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI= | ||||||
| github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= | github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI= | ||||||
| github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= | github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU= | ||||||
|  | |||||||
| @ -1,21 +0,0 @@ | |||||||
| // Package metadapt implements the Message and Transaction Demarcation Protocol. |  | ||||||
| package metadapt |  | ||||||
| 
 |  | ||||||
| // TODO: create interfaces for underlying connections for A and B, also have |  | ||||||
| // A and B fulfill hopp.Conn. |  | ||||||
| 
 |  | ||||||
| // A implements METADAPT-A over a singular stream-oriented transport such as TCP |  | ||||||
| // or UNIX domain stream sockets. |  | ||||||
| type A struct { |  | ||||||
| 	// Underlying specifies the underlying connection. It must be set before |  | ||||||
| 	// calling methods on this object. |  | ||||||
| 	Underlying ATransport |  | ||||||
| } |  | ||||||
| 
 |  | ||||||
| // B implements METADAPT-B over a multiplexed stream-oriented transport such as |  | ||||||
| // QUIC. |  | ||||||
| type B struct { |  | ||||||
| 	// Underlying specifies the underlying connection. It must be set before |  | ||||||
| 	// calling methods on this object. |  | ||||||
| 	Underlying BTransport |  | ||||||
| } |  | ||||||
							
								
								
									
										203
									
								
								metadapta.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										203
									
								
								metadapta.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,203 @@ | |||||||
|  | package hopp | ||||||
|  | 
 | ||||||
|  | import "io" | ||||||
|  | import "fmt" | ||||||
|  | import "net" | ||||||
|  | import "sync" | ||||||
|  | import "context" | ||||||
|  | import "git.tebibyte.media/sashakoshka/hopp/tape" | ||||||
|  | import "git.tebibyte.media/sashakoshka/go-util/sync" | ||||||
|  | 
 | ||||||
|  | const int64Max = int64((^uint64(0)) >> 1) | ||||||
|  | 
 | ||||||
|  | // Party represents a side of a connection. | ||||||
|  | type Party bool; const ( | ||||||
|  | 	ServerSide Party = false | ||||||
|  | 	ClientSide Party = true | ||||||
|  | ) | ||||||
|  | 
 | ||||||
|  | type a struct { | ||||||
|  | 	underlying   net.Conn | ||||||
|  | 	party        Party | ||||||
|  | 	transID      int64 | ||||||
|  | 	transLock    sync.RWMutex | ||||||
|  | 	sendLock     sync.Mutex | ||||||
|  | 	transMap     map[int64] *transA | ||||||
|  | 	transChan    chan *transA | ||||||
|  | 	err          error | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // AdaptA returns a connection implementing METADAPT-A over a singular stream- | ||||||
|  | // oriented transport such as TCP or UNIX domain stream sockets. | ||||||
|  | func AdaptA(underlying net.Conn, party Party) Conn { | ||||||
|  | 	conn := &a { | ||||||
|  | 		underlying: underlying, | ||||||
|  | 		party:      party, | ||||||
|  | 		transMap:   make(map[int64] *transA), | ||||||
|  | 		transChan:  make(chan *transA), | ||||||
|  | 	} | ||||||
|  | 	go conn.receive() | ||||||
|  | 	return conn | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (this *a) Close() error { | ||||||
|  | 	return this.underlying.Close() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (this *a) OpenTrans() (Trans, error) { | ||||||
|  | 	this.transLock.Lock() | ||||||
|  | 	defer this.transLock.Unlock() | ||||||
|  | 	id := this.transID | ||||||
|  | 	this.transID ++ | ||||||
|  | 	trans := &transA { | ||||||
|  | 		id:       id, | ||||||
|  | 		incoming: usync.NewGate[incomingMessage](), | ||||||
|  | 	} | ||||||
|  | 	this.transMap[id] = trans | ||||||
|  | 	if this.transID == int64Max { | ||||||
|  | 		return nil, fmt.Errorf("could not open transaction: %w", ErrIntegerOverflow) | ||||||
|  | 	} | ||||||
|  | 	this.transID ++ | ||||||
|  | 	return trans, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (this *a) AcceptTrans(ctx context.Context) (Trans, error) { | ||||||
|  | 	select { | ||||||
|  | 	case trans := <- this.transChan: | ||||||
|  | 		return trans, nil | ||||||
|  | 	case <- ctx.Done(): | ||||||
|  | 		return nil, ctx.Err()	 | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (this *a) unlistTransactionSafe(id int64) { | ||||||
|  | 	this.transLock.Lock() | ||||||
|  | 	defer this.transLock.Unlock() | ||||||
|  | 	delete(this.transMap, id) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (this *a) sendMessageSafe(trans int64, method uint16, data []byte) error { | ||||||
|  | 	this.sendLock.Lock() | ||||||
|  | 	defer this.sendLock.Lock() | ||||||
|  | 	 | ||||||
|  | 	buffer := make([]byte, 8 + len(data)) | ||||||
|  | 	tape.EncodeI64(buffer[:4], trans) | ||||||
|  | 	tape.EncodeI16(buffer[4:6], method) | ||||||
|  | 	length, ok := tape.U16CastSafe(len(data)) | ||||||
|  | 	if !ok { return ErrPayloadTooLarge } | ||||||
|  | 	tape.EncodeI16(data[6:8], length) | ||||||
|  | 	copy(buffer[8:], data) | ||||||
|  | 	_, err := this.underlying.Write(buffer) | ||||||
|  | 	return err | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (this *a) receive() { | ||||||
|  | 	// TODO: multiplex receiving | ||||||
|  | 	// if a received transaction has a malformed ID, reject it here and | ||||||
|  | 	// cause the connection to fail | ||||||
|  | 	// at the end of this function, close all incoming channels | ||||||
|  | 	defer func() { | ||||||
|  | 		this.underlying.Close() | ||||||
|  | 		this.transLock.Lock() | ||||||
|  | 		defer this.transLock.Lock() | ||||||
|  | 		for _, trans := range this.transMap { | ||||||
|  | 			trans.Close() | ||||||
|  | 		} | ||||||
|  | 		clear(this.transMap) | ||||||
|  | 	}() | ||||||
|  | 	for { | ||||||
|  | 		transID, method, payload, err := decodeMessageA(this.underlying) | ||||||
|  | 		if err != nil { | ||||||
|  | 			this.err = fmt.Errorf("could not receive message: %w", err) | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 
 | ||||||
|  | 		err = this.receiveMultiplex(transID, method, payload) | ||||||
|  | 		if err != nil { | ||||||
|  | 			this.err = fmt.Errorf("could not receive message: %w", err) | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (this *a) receiveMultiplex(transID int64, method uint16, payload []byte) error { | ||||||
|  | 	if transID == 0 || this.party == partyFromTransID(transID) { | ||||||
|  | 		return ErrMessageMalformed | ||||||
|  | 	} | ||||||
|  | 	 | ||||||
|  | 	this.transLock.Lock() | ||||||
|  | 	defer this.transLock.Unlock() | ||||||
|  | 
 | ||||||
|  | 	trans, ok := this.transMap[transID] | ||||||
|  | 	if !ok { | ||||||
|  | 		trans = &transA { | ||||||
|  | 			parent:   this, | ||||||
|  | 			id:       transID, | ||||||
|  | 			incoming: usync.NewGate[incomingMessage](), | ||||||
|  | 		} | ||||||
|  | 		this.transChan <- trans | ||||||
|  | 	} | ||||||
|  | 
 | ||||||
|  | 	trans.incoming.Send(incomingMessage { | ||||||
|  | 		method:  method, | ||||||
|  | 		payload: payload, | ||||||
|  | 	}) | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type transA struct { | ||||||
|  | 	parent   *a | ||||||
|  | 	id       int64 | ||||||
|  | 	incoming usync.Gate[incomingMessage] | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (this *transA) Close() error { | ||||||
|  | 	this.incoming.Close() | ||||||
|  | 	this.parent.unlistTransactionSafe(this.ID()) | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (this *transA) ID() int64 { | ||||||
|  | 	return this.id | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (this *transA) Send(method uint16, data []byte) error { | ||||||
|  | 	return this.parent.sendMessageSafe(this.id, method, data) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (this *transA) Receive() (method uint16, data []byte, err error) { | ||||||
|  | 	message, ok := <- this.incoming.Receive() | ||||||
|  | 	if !ok { | ||||||
|  | 		if this.parent.err == nil { | ||||||
|  | 			return 0, nil, fmt.Errorf("could not receive message: %w", io.EOF) | ||||||
|  | 		} else { | ||||||
|  | 			return 0, nil, this.parent.err | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  | 	return message.method, message.payload, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type incomingMessage struct { | ||||||
|  | 	method  uint16 | ||||||
|  | 	payload []byte | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func decodeMessageA(reader io.Reader) (int64, uint16, []byte, error) { | ||||||
|  | 	headerBuffer := [8]byte { } | ||||||
|  | 	_, err := io.ReadFull(reader, headerBuffer[:]) | ||||||
|  | 	if err != nil { return 0, 0, nil, err } | ||||||
|  | 	transID, err := tape.DecodeI64[int64](headerBuffer[:4]) | ||||||
|  | 	if err != nil { return 0, 0, nil, err } | ||||||
|  | 	method, err := tape.DecodeI16[uint16](headerBuffer[4:6]) | ||||||
|  | 	if err != nil { return 0, 0, nil, err } | ||||||
|  | 	length, err := tape.DecodeI16[uint16](headerBuffer[6:8]) | ||||||
|  | 	if err != nil { return 0, 0, nil, err } | ||||||
|  | 	payloadBuffer := make([]byte, int(length)) | ||||||
|  | 	_, err = io.ReadFull(reader, payloadBuffer) | ||||||
|  | 	if err != nil { return 0, 0, nil, err } | ||||||
|  | 	return transID, method, payloadBuffer, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func partyFromTransID(id int64) Party { | ||||||
|  | 	return id > 0 | ||||||
|  | } | ||||||
							
								
								
									
										93
									
								
								metadaptb.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										93
									
								
								metadaptb.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,93 @@ | |||||||
|  | package hopp | ||||||
|  | 
 | ||||||
|  | import "io" | ||||||
|  | import "net" | ||||||
|  | import "context" | ||||||
|  | import "git.tebibyte.media/sashakoshka/hopp/tape" | ||||||
|  | 
 | ||||||
|  | // B implements METADAPT-B over a multiplexed stream-oriented transport such as | ||||||
|  | // QUIC. | ||||||
|  | type b struct { | ||||||
|  | 	underlying MultiConn | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // AdaptB returns a connection implementing METADAPT-B over a singular stream- | ||||||
|  | // oriented transport such as TCP or UNIX domain stream sockets. | ||||||
|  | func AdaptB(underlying MultiConn) Conn { | ||||||
|  | 	return &b { | ||||||
|  | 		underlying: underlying, | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (this *b) Close() error { | ||||||
|  | 	return this.underlying.Close() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (this *b) OpenTrans() (Trans, error) { | ||||||
|  | 	stream, err := this.underlying.OpenStream() | ||||||
|  | 	if err != nil { return nil, err } | ||||||
|  | 	return transB { underlying: stream }, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (this *b) AcceptTrans(ctx context.Context) (Trans, error) { | ||||||
|  | 	stream, err := this.underlying.AcceptStream(ctx) | ||||||
|  | 	if err != nil { return nil, err } | ||||||
|  | 	return transB { underlying: stream }, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type transB struct { | ||||||
|  | 	underlying Stream | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (trans transB) Close() error { | ||||||
|  | 	return trans.underlying.Close() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (trans transB) ID() int64 { | ||||||
|  | 	return trans.underlying.ID() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (trans transB) Send(method uint16, data []byte) error { | ||||||
|  | 	buffer := make([]byte, 4 + len(data)) | ||||||
|  | 	tape.EncodeI16(buffer[:2], method) | ||||||
|  | 	length, ok := tape.U16CastSafe(len(data)) | ||||||
|  | 	if !ok { return ErrPayloadTooLarge } | ||||||
|  | 	tape.EncodeI16(data[2:4], length) | ||||||
|  | 	copy(buffer[4:], data) | ||||||
|  | 	_, err := trans.underlying.Write(buffer) | ||||||
|  | 	return err | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (trans transB) Receive() (uint16, []byte, error) { | ||||||
|  | 	headerBuffer := [4]byte { } | ||||||
|  | 	_, err := io.ReadFull(trans.underlying, headerBuffer[:]) | ||||||
|  | 	if err != nil { return 0, nil, err } | ||||||
|  | 	method, err := tape.DecodeI16[uint16](headerBuffer[:2]) | ||||||
|  | 	if err != nil { return 0, nil, err } | ||||||
|  | 	length, err := tape.DecodeI16[uint16](headerBuffer[2:4]) | ||||||
|  | 	if err != nil { return 0, nil, err } | ||||||
|  | 	payloadBuffer := make([]byte, int(length)) | ||||||
|  | 	_, err = io.ReadFull(trans.underlying, payloadBuffer) | ||||||
|  | 	if err != nil { return 0, nil, err } | ||||||
|  | 	return method, payloadBuffer, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // MultiConn represens a multiplexed stream-oriented transport for use in [B]. | ||||||
|  | type MultiConn interface { | ||||||
|  | 	// See documentation for [net.Conn]. | ||||||
|  | 	io.Closer | ||||||
|  | 	LocalAddr() net.Addr | ||||||
|  | 	RemoteAddr() net.Addr | ||||||
|  | 	// AcceptStream accepts the next incoming stream from the other party. | ||||||
|  | 	AcceptStream(context.Context) (Stream, error) | ||||||
|  | 	// OpenStream opens a new stream. | ||||||
|  | 	OpenStream() (Stream, error) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | // Stream represents a single stream returned by a [MultiConn]. | ||||||
|  | type Stream interface { | ||||||
|  | 	// See documentation for [net.Conn]. | ||||||
|  | 	io.ReadWriteCloser | ||||||
|  | 	// ID returns the stream ID | ||||||
|  | 	ID() int64 | ||||||
|  | } | ||||||
							
								
								
									
										54
									
								
								quicwrap.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										54
									
								
								quicwrap.go
									
									
									
									
									
										Normal file
									
								
							| @ -0,0 +1,54 @@ | |||||||
|  | package hopp | ||||||
|  | 
 | ||||||
|  | import "net" | ||||||
|  | import "context" | ||||||
|  | import "github.com/quic-go/quic-go" | ||||||
|  | 
 | ||||||
|  | var _ MultiConn = quicMultiConn { } | ||||||
|  | type quicMultiConn struct { | ||||||
|  | 	underlying quic.Connection | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (conn quicMultiConn) Close() error { | ||||||
|  | 	return conn.underlying.CloseWithError(0, "good bye") | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (conn quicMultiConn) LocalAddr() net.Addr { | ||||||
|  | 	return conn.underlying.LocalAddr() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (conn quicMultiConn) RemoteAddr() net.Addr { | ||||||
|  | 	return conn.underlying.RemoteAddr() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (conn quicMultiConn) AcceptStream(ctx context.Context) (Stream, error) { | ||||||
|  | 	strea, err := conn.underlying.AcceptStream(ctx) | ||||||
|  | 	if err != nil { return nil, err } | ||||||
|  | 	return quicStream { underlying: strea }, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (conn quicMultiConn) OpenStream() (Stream, error) { | ||||||
|  | 	strea, err := conn.underlying.OpenStream() | ||||||
|  | 	if err != nil { return nil, err } | ||||||
|  | 	return quicStream { underlying: strea }, nil | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | type quicStream struct { | ||||||
|  | 	underlying quic.Stream | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (strea quicStream) Read(buffer []byte) (n int, err error) { | ||||||
|  | 	return strea.underlying.Read(buffer) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (strea quicStream) Write(buffer []byte) (n int, err error) { | ||||||
|  | 	return strea.underlying.Read(buffer) | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (strea quicStream) Close() error { | ||||||
|  | 	return strea.underlying.Close() | ||||||
|  | } | ||||||
|  | 
 | ||||||
|  | func (strea quicStream) ID() int64 { | ||||||
|  | 	return int64(strea.underlying.StreamID()) | ||||||
|  | } | ||||||
| @ -8,8 +8,8 @@ const uint16Max = 0xFFFF | |||||||
| 
 | 
 | ||||||
| // Error enumerates common errors in this package. | // Error enumerates common errors in this package. | ||||||
| type Error string; const ( | type Error string; const ( | ||||||
| 	ErrWrongBufferLength  Error = "wrong buffer length" | 	ErrWrongBufferLength Error = "wrong buffer length" | ||||||
| 	ErrDataTooLarge       Error = "data too large" | 	ErrDataTooLarge      Error = "data too large" | ||||||
| ) | ) | ||||||
| 
 | 
 | ||||||
| // Error implements the error interface. | // Error implements the error interface. | ||||||
|  | |||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user