Compare commits
44 Commits
0fed1a2451
...
quic-initi
| Author | SHA1 | Date | |
|---|---|---|---|
| 2fdf7d490d | |||
| d60beccbcd | |||
| 23c37c3d1f | |||
| a83aedc128 | |||
| c0bfcc02f7 | |||
| 7a0bf64c17 | |||
| 9d2bbec7f9 | |||
| dd89245c34 | |||
| 41f5cfefab | |||
| 8a3df95491 | |||
| c51a81bc13 | |||
| 47645a8fce | |||
| 87c4ac8efb | |||
| f6fe9c307d | |||
| 9bf0c596ba | |||
| 86cf3ee89d | |||
| 8fe3ba8d4f | |||
| cbaff8b593 | |||
| 46c6361602 | |||
| fac0c4e31d | |||
| 945d81c505 | |||
| f34620c434 | |||
| 7a766b74d8 | |||
| 6de3cbbc48 | |||
| e4f13a4142 | |||
| db10355c84 | |||
| f4f8039fa0 | |||
| fe8f2fc3ea | |||
| b07cdf088a | |||
| 4eae69dc94 | |||
| 5c28510342 | |||
| 1ac0ed51c7 | |||
| 174634a330 | |||
| a0b49e950e | |||
| 5b42030f9d | |||
| 4daccca66a | |||
| 4647f91abe | |||
| 54022e5541 | |||
| 72df1e8d11 | |||
| a47a3599d3 | |||
| b54fc02a35 | |||
| 5e885a0bd3 | |||
| 659bcecbe6 | |||
| 540c64a421 |
51
assets/icon.svg
Normal file
51
assets/icon.svg
Normal file
@@ -0,0 +1,51 @@
|
|||||||
|
<?xml version="1.0" encoding="UTF-8" standalone="no"?>
|
||||||
|
<!-- Created with Inkscape (http://www.inkscape.org/) -->
|
||||||
|
|
||||||
|
<svg
|
||||||
|
width="1024"
|
||||||
|
height="1024"
|
||||||
|
viewBox="0 0 270.93333 270.93333"
|
||||||
|
version="1.1"
|
||||||
|
id="svg5"
|
||||||
|
inkscape:version="1.2.2 (b0a8486541, 2022-12-01)"
|
||||||
|
sodipodi:docname="HOPP.svg"
|
||||||
|
xmlns:inkscape="http://www.inkscape.org/namespaces/inkscape"
|
||||||
|
xmlns:sodipodi="http://sodipodi.sourceforge.net/DTD/sodipodi-0.dtd"
|
||||||
|
xmlns="http://www.w3.org/2000/svg"
|
||||||
|
xmlns:svg="http://www.w3.org/2000/svg">
|
||||||
|
<sodipodi:namedview
|
||||||
|
id="namedview7"
|
||||||
|
pagecolor="#ffffff"
|
||||||
|
bordercolor="#000000"
|
||||||
|
borderopacity="0.25"
|
||||||
|
inkscape:showpageshadow="2"
|
||||||
|
inkscape:pageopacity="0.0"
|
||||||
|
inkscape:pagecheckerboard="0"
|
||||||
|
inkscape:deskcolor="#d1d1d1"
|
||||||
|
inkscape:document-units="mm"
|
||||||
|
showgrid="false"
|
||||||
|
inkscape:zoom="0.36614057"
|
||||||
|
inkscape:cx="957.2826"
|
||||||
|
inkscape:cy="777.02397"
|
||||||
|
inkscape:window-width="1920"
|
||||||
|
inkscape:window-height="979"
|
||||||
|
inkscape:window-x="0"
|
||||||
|
inkscape:window-y="665"
|
||||||
|
inkscape:window-maximized="1"
|
||||||
|
inkscape:current-layer="layer1" />
|
||||||
|
<defs
|
||||||
|
id="defs2" />
|
||||||
|
<g
|
||||||
|
inkscape:label="Layer 1"
|
||||||
|
inkscape:groupmode="layer"
|
||||||
|
id="layer1">
|
||||||
|
<path
|
||||||
|
id="path7330"
|
||||||
|
style="fill:#e2c558;fill-opacity:1;stroke:none;stroke-width:1.05833;stroke-linecap:round"
|
||||||
|
d="M 0 0 L 135.46667 135.46667 L 270.93333 135.46667 L 270.93333 0 L 0 0 z M 135.46667 135.46667 L 0 135.46667 L 135.46667 203.2 L 203.2 203.2 L 135.46667 135.46667 z M 135.46667 203.2 L 0 203.2 L 270.93333 270.93333 L 135.46667 203.2 z " />
|
||||||
|
<path
|
||||||
|
style="fill:#e2c558;fill-opacity:1;stroke:none;stroke-width:1.05833;stroke-linecap:round"
|
||||||
|
d="M 270.93333,270.93333 203.2,203.2 h 67.73333 z"
|
||||||
|
id="path10302" />
|
||||||
|
</g>
|
||||||
|
</svg>
|
||||||
|
After Width: | Height: | Size: 1.8 KiB |
@@ -1,8 +1,11 @@
|
|||||||
package hopp
|
package hopp
|
||||||
|
|
||||||
|
import "io"
|
||||||
import "net"
|
import "net"
|
||||||
// import "time"
|
// import "time"
|
||||||
|
|
||||||
|
const defaultSizeLimit int64 = 1024 * 1024 // 1 megabyte
|
||||||
|
|
||||||
// Conn is a HOPP connection.
|
// Conn is a HOPP connection.
|
||||||
type Conn interface {
|
type Conn interface {
|
||||||
// Close closes the connection. Any blocked operations on the connection
|
// Close closes the connection. Any blocked operations on the connection
|
||||||
@@ -19,22 +22,39 @@ type Conn interface {
|
|||||||
// AcceptTrans accepts a transaction from the other party. This must
|
// AcceptTrans accepts a transaction from the other party. This must
|
||||||
// be called in a loop to avoid the connection locking up.
|
// be called in a loop to avoid the connection locking up.
|
||||||
AcceptTrans() (Trans, error)
|
AcceptTrans() (Trans, error)
|
||||||
|
|
||||||
|
// SetSizeLimit sets a limit (in bytes) for how large messages can be.
|
||||||
|
// By default, this limit is 1 megabyte.
|
||||||
|
SetSizeLimit(limit int64)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Trans is a HOPP transaction.
|
// Trans is a HOPP transaction. Methods of this interface are not safe for
|
||||||
|
// concurrent use with the exception of the Close and ID methods. The
|
||||||
|
// recommended use case is one goroutine per transaction.
|
||||||
type Trans interface {
|
type Trans interface {
|
||||||
// Close closes the transaction. Any blocked operations will be
|
// Close closes the transaction. Any blocked operations will be
|
||||||
// unblocked and return errors.
|
// unblocked and return errors. This method is safe for concurrent use.
|
||||||
Close() error
|
Close() error
|
||||||
|
|
||||||
// ID returns the transaction ID. This must not change, and it must be
|
// ID returns the transaction ID. This must not change, and it must be
|
||||||
// unique within the connection.
|
// unique within the connection. This method is safe for concurrent use.
|
||||||
ID() int64
|
ID() int64
|
||||||
|
|
||||||
// TODO: add methods for setting send and receive deadlines
|
// TODO: add methods for setting send and receive deadlines
|
||||||
|
|
||||||
// Send sends a message.
|
// Send sends a message. This method is not safe for concurrent use.
|
||||||
Send(method uint16, data []byte) error
|
Send(method uint16, data []byte) error
|
||||||
// Receive receives a message.
|
// SendWriter sends data written to an [io.Writer]. The writer must be
|
||||||
|
// closed after use. Closing the writer flushes any data that hasn't
|
||||||
|
// been written yet. Any writer previously opened through this function
|
||||||
|
// will be discarded. This method is not safe for concurrent use, and
|
||||||
|
// neither is its result.
|
||||||
|
SendWriter(method uint16) (io.WriteCloser, error)
|
||||||
|
// Receive receives a message. This method is not safe for concurrent
|
||||||
|
// use.
|
||||||
Receive() (method uint16, data []byte, err error)
|
Receive() (method uint16, data []byte, err error)
|
||||||
|
// ReceiveReader receives a message as an [io.Reader]. Any reader
|
||||||
|
// previously opened through this function will be discarded. This
|
||||||
|
// method is not safe for concurrent use, and neither is its result.
|
||||||
|
ReceiveReader() (method uint16, data io.Reader, err error)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -123,14 +123,16 @@ be of the same size.
|
|||||||
## Transports
|
## Transports
|
||||||
A transport is a protocol that HOPP connections can run on top of. HOPP
|
A transport is a protocol that HOPP connections can run on top of. HOPP
|
||||||
currently supports the QUIC transport protocol for communicating between
|
currently supports the QUIC transport protocol for communicating between
|
||||||
machines, and UNIX domain sockets for quicker communication among applications
|
machines, TCP/TLS for legacy systems that do not support QUIC, and UNIX domain
|
||||||
on the same machine. Both protocols are supported through METADAPT.
|
sockets for faster communication among applications on the same machine. Both
|
||||||
|
protocols are supported through METADAPT.
|
||||||
|
|
||||||
## Message and Transaction Demarcation Protocol (METADAPT)
|
## Message and Transaction Demarcation Protocol (METADAPT)
|
||||||
The Message and Transaction Demarcation Protocol is used to break one or more
|
The Message and Transaction Demarcation Protocol is used to break one or more
|
||||||
reliable data streams into transactions, which are broken down further into
|
reliable data streams into transactions, which are broken down further into
|
||||||
messages. A message, as well as its associated metadata (length, transaction,
|
messages. The representation of a message (or a part thereof) on the protocol,
|
||||||
method, etc.) together is referred to as METADAPT Message Block (MMB).
|
including its associated metadata (length, transaction, method, etc.) is
|
||||||
|
referred to as METADAPT Message Block (MMB).
|
||||||
|
|
||||||
For transports that offer multiple multiplexed data streams that can be created
|
For transports that offer multiple multiplexed data streams that can be created
|
||||||
and destroyed on-demand (such as QUIC) each stream is used as a transaction. If
|
and destroyed on-demand (such as QUIC) each stream is used as a transaction. If
|
||||||
@@ -145,8 +147,12 @@ METADAPT-A requires a transport which offers a single full-duplex data stream
|
|||||||
that persists for the duration of the connection. All transactions are
|
that persists for the duration of the connection. All transactions are
|
||||||
multiplexed onto this single stream. Each MMB contains a 12-octet long header,
|
multiplexed onto this single stream. Each MMB contains a 12-octet long header,
|
||||||
with the transaction ID, then the method, and then the payload size (in octets).
|
with the transaction ID, then the method, and then the payload size (in octets).
|
||||||
The transaction ID is encoded as an I64, and the method and payload size are
|
The transaction ID is encoded as an I64, the method is encoded as a U16 and the
|
||||||
both encoded as U16s. The remainder of the message is the payload. Since each
|
and payload size is encoded as a U64. Only the 63 least significant bits of the
|
||||||
|
payload size describe the actual size, the most significant bit controlling
|
||||||
|
chunking. See the section on chunking for more information.
|
||||||
|
|
||||||
|
The remainder of the message is the payload. Since each
|
||||||
MMB is self-describing, they are sent sequentially with no gaps in-between them.
|
MMB is self-describing, they are sent sequentially with no gaps in-between them.
|
||||||
|
|
||||||
Transactions "open" when the first message with a given transaction ID is sent.
|
Transactions "open" when the first message with a given transaction ID is sent.
|
||||||
@@ -162,13 +168,25 @@ used up, the connection must fail. Don't worry about this though, because the
|
|||||||
sun will have expanded to swallow earth by then. Your connection will not last
|
sun will have expanded to swallow earth by then. Your connection will not last
|
||||||
that long.
|
that long.
|
||||||
|
|
||||||
|
#### Message Chunking
|
||||||
|
|
||||||
|
The most significant bit of the payload size field of an MMB is called the Chunk
|
||||||
|
Control Bit (CCB). If the CCB of a given MMB is zero, the represented message is
|
||||||
|
interpreted as being self-contained and the data is processed immediately. If
|
||||||
|
the CCB is one, the message is interpreted as being chunked, with the data of
|
||||||
|
the current MMB being the first chunk. The data of further MMBs sent along the
|
||||||
|
transaction will be appended to the message until an MMB is read with a zero
|
||||||
|
CCB, in which case the MMB will be the last chunk and any more MMBs will be
|
||||||
|
interpreted as normal.
|
||||||
|
|
||||||
### METADAPT-B
|
### METADAPT-B
|
||||||
METADAPT-B requires a transport which offers multiple multiplexed full-duplex
|
METADAPT-B requires a transport which offers multiple multiplexed full-duplex
|
||||||
data streams per connection that can be created and destroyed on-demand. Each
|
data streams per connection that can be created and destroyed on-demand. Each
|
||||||
data stream is used as an individual transaction. Each MMB contains a 4-octet
|
data stream is used as an individual transaction. Each MMB contains a 4-octet
|
||||||
long header with the method and then the payload size (in octets) both encoded
|
long header with the method and then the payload size (in octets) encoded as a
|
||||||
as U16s. The remainder of the message is the payload. Since each MMB is
|
U16 and U64 respectively. The remainder of the message is the payload. Since
|
||||||
self-describing, they are sent sequentially with no gaps in-between them.
|
each MMB is self-describing, they are sent sequentially with no gaps in-between
|
||||||
|
them.
|
||||||
|
|
||||||
The ID of any transaction will reflect the ID of its corresponding stream. The
|
The ID of any transaction will reflect the ID of its corresponding stream. The
|
||||||
lifetime of the transaction is tied to the lifetime of the stream, that is to
|
lifetime of the transaction is tied to the lifetime of the stream, that is to
|
||||||
|
|||||||
102
examples/chat/client/main.go
Normal file
102
examples/chat/client/main.go
Normal file
@@ -0,0 +1,102 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import "os"
|
||||||
|
import "fmt"
|
||||||
|
import "time"
|
||||||
|
import "bufio"
|
||||||
|
import "context"
|
||||||
|
import "crypto/tls"
|
||||||
|
import "git.tebibyte.media/sashakoshka/hopp"
|
||||||
|
import "git.tebibyte.media/sashakoshka/hopp/examples/chat"
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
name := os.Args[0]
|
||||||
|
if len(os.Args) != 3 && len(os.Args) != 4 {
|
||||||
|
fmt.Fprintf(os.Stderr, "Usage: %s HOST:PORT ROOM [NICKNAME]\n", name)
|
||||||
|
os.Exit(2)
|
||||||
|
}
|
||||||
|
address := os.Args[1]
|
||||||
|
room := os.Args[2]
|
||||||
|
var nickname hopp.Option[string]; if len(os.Args) >= 4 {
|
||||||
|
nickname = hopp.O(os.Args[3])
|
||||||
|
}
|
||||||
|
trans, err := join(address, room, nickname)
|
||||||
|
handleErr(1, err)
|
||||||
|
go func() {
|
||||||
|
reader := bufio.NewReader(os.Stdin)
|
||||||
|
for {
|
||||||
|
line, err := reader.ReadString('\n')
|
||||||
|
if err != nil { break }
|
||||||
|
send(trans, line)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
for {
|
||||||
|
message, err := chat.Receive(trans)
|
||||||
|
handleErr(1, err)
|
||||||
|
switch message := message.(type) {
|
||||||
|
case *chat.MessageChat:
|
||||||
|
nickname := "Anonymous"
|
||||||
|
if value, ok := message.Nickname.Get(); ok {
|
||||||
|
nickname = value
|
||||||
|
}
|
||||||
|
fmt.Fprintf(os.Stdout, "%s: %s\n", nickname, message.Content)
|
||||||
|
case *chat.MessageJoinNotify:
|
||||||
|
fmt.Fprintf(os.Stdout, "(i) %s joined the room\n", message.Nickname)
|
||||||
|
case *chat.MessageLeaveNotify:
|
||||||
|
fmt.Fprintf(os.Stdout, "(i) %s left the room\n", message.Nickname)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func join(address string, room string, nickname hopp.Option[string]) (hopp.Trans, error) {
|
||||||
|
ctx, done := context.WithTimeout(context.Background(), 16 * time.Second)
|
||||||
|
defer done()
|
||||||
|
dialer := hopp.Dialer {
|
||||||
|
TLSConfig: &tls.Config {
|
||||||
|
// don't actually do this in real life
|
||||||
|
InsecureSkipVerify: true,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
conn, err := dialer.Dial(ctx, "quic", address)
|
||||||
|
if err != nil { return nil, err }
|
||||||
|
|
||||||
|
err = updateProfile(conn, nickname)
|
||||||
|
if err != nil { return nil, err }
|
||||||
|
|
||||||
|
transRoom, err := conn.OpenTrans()
|
||||||
|
if err != nil { return nil, err }
|
||||||
|
err = chat.Send(transRoom, &chat.MessageJoin {
|
||||||
|
Room: room,
|
||||||
|
})
|
||||||
|
if err != nil { return nil, err }
|
||||||
|
return transRoom, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func send(trans hopp.Trans, content string) error {
|
||||||
|
return chat.Send(trans, &chat.MessageChat {
|
||||||
|
Content: content,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func updateProfile(conn hopp.Conn, nickname hopp.Option[string]) error {
|
||||||
|
trans, err := conn.OpenTrans()
|
||||||
|
if err != nil { return err }
|
||||||
|
defer trans.Close()
|
||||||
|
err = chat.Send(trans, &chat.MessageUpdateProfile {
|
||||||
|
Nickname: nickname,
|
||||||
|
})
|
||||||
|
if err != nil { return err }
|
||||||
|
message, err := chat.Receive(trans)
|
||||||
|
if err != nil { return err }
|
||||||
|
switch message := message.(type) {
|
||||||
|
case *chat.MessageError: return message
|
||||||
|
default: return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleErr(code int, err error) {
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "%s: %v\n", os.Args[0], err)
|
||||||
|
os.Exit(code)
|
||||||
|
}
|
||||||
|
}
|
||||||
6
examples/chat/doc.go
Normal file
6
examples/chat/doc.go
Normal file
@@ -0,0 +1,6 @@
|
|||||||
|
// Package chat implements a simple chat protocol over HOPP. To re-generate the
|
||||||
|
// source files, run this command from within the root directory of the
|
||||||
|
// repository:
|
||||||
|
//
|
||||||
|
// go run ./cmd/hopp-generate examples/chat/protocol.md examples/chat/protocol
|
||||||
|
package chat
|
||||||
11
examples/chat/error.go
Normal file
11
examples/chat/error.go
Normal file
@@ -0,0 +1,11 @@
|
|||||||
|
package chat
|
||||||
|
|
||||||
|
import "fmt"
|
||||||
|
|
||||||
|
func (msg *MessageError) Error() string {
|
||||||
|
if description, ok := msg.Description.Get(); ok {
|
||||||
|
return fmt.Sprintf("other party sent error: %d %s", msg.Error, description)
|
||||||
|
} else {
|
||||||
|
return fmt.Sprintf("other party sent error: %d", msg.Code)
|
||||||
|
}
|
||||||
|
}
|
||||||
369
examples/chat/generated.go
Normal file
369
examples/chat/generated.go
Normal file
@@ -0,0 +1,369 @@
|
|||||||
|
package chat
|
||||||
|
|
||||||
|
import "git.tebibyte.media/sashakoshka/hopp"
|
||||||
|
import "git.tebibyte.media/sashakoshka/hopp/tape"
|
||||||
|
|
||||||
|
// Send sends one message along a transaction.
|
||||||
|
func Send(trans hopp.Trans, message hopp.Message) error {
|
||||||
|
buffer, err := message.MarshalBinary()
|
||||||
|
if err != nil { return err }
|
||||||
|
return trans.Send(message.Method(), buffer)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Receive receives one message from a transaction.
|
||||||
|
func Receive(trans hopp.Trans) (hopp.Message, error) {
|
||||||
|
method, data, err := trans.Receive()
|
||||||
|
if err != nil { return nil, err }
|
||||||
|
switch method {
|
||||||
|
case 0x0000:
|
||||||
|
message := &MessageError { }
|
||||||
|
err := message.UnmarshalBinary(data)
|
||||||
|
if err != nil { return nil, err }
|
||||||
|
return message, nil
|
||||||
|
case 0x0001:
|
||||||
|
message := &MessageSuccess { }
|
||||||
|
err := message.UnmarshalBinary(data)
|
||||||
|
if err != nil { return nil, err }
|
||||||
|
return message, nil
|
||||||
|
case 0x0100:
|
||||||
|
message := &MessageUpdateProfile { }
|
||||||
|
err := message.UnmarshalBinary(data)
|
||||||
|
if err != nil { return nil, err }
|
||||||
|
return message, nil
|
||||||
|
case 0x0200:
|
||||||
|
message := &MessageJoin { }
|
||||||
|
err := message.UnmarshalBinary(data)
|
||||||
|
if err != nil { return nil, err }
|
||||||
|
return message, nil
|
||||||
|
case 0x0201:
|
||||||
|
message := &MessageChat { }
|
||||||
|
err := message.UnmarshalBinary(data)
|
||||||
|
if err != nil { return nil, err }
|
||||||
|
return message, nil
|
||||||
|
case 0x0300:
|
||||||
|
message := &MessageJoinNotify { }
|
||||||
|
err := message.UnmarshalBinary(data)
|
||||||
|
if err != nil { return nil, err }
|
||||||
|
return message, nil
|
||||||
|
case 0x0301:
|
||||||
|
message := &MessageLeaveNotify { }
|
||||||
|
err := message.UnmarshalBinary(data)
|
||||||
|
if err != nil { return nil, err }
|
||||||
|
return message, nil
|
||||||
|
default: return nil, hopp.ErrUnknownMethod
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// (0) Error is sent by a party when the other party has done something erroneous. The valid error codes are:
|
||||||
|
//
|
||||||
|
// 0: General, unspecified error
|
||||||
|
//
|
||||||
|
// The description field, if specified, determines a human-readable error to be shown to the user. The sending party must immediately close the transaction after this message is sent.
|
||||||
|
type MessageError struct {
|
||||||
|
/* 0 */ Code uint16
|
||||||
|
/* 1 */ Description hopp.Option[string]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Method returns the method number of the message.
|
||||||
|
func (msg MessageError) Method() uint16 {
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalBinary encodes the data in this message into a buffer.
|
||||||
|
func (msg *MessageError) MarshalBinary() ([]byte, error) {
|
||||||
|
size := 0
|
||||||
|
count := 1
|
||||||
|
offsetCode := size
|
||||||
|
{ value := msg.Code
|
||||||
|
size += 2; _ = value }
|
||||||
|
offsetDescription := size
|
||||||
|
if value, ok := msg.Description.Get(); ok {
|
||||||
|
count ++
|
||||||
|
size += len(value) }
|
||||||
|
if size > 0xFFFF { return nil, hopp.ErrPayloadTooLarge}
|
||||||
|
if count > 0xFFFF { return nil, hopp.ErrPayloadTooLarge}
|
||||||
|
buffer := make([]byte, 2 + 4 * count + size)
|
||||||
|
tape.EncodeI16(buffer[:2], uint16(count))
|
||||||
|
{ value := msg.Code
|
||||||
|
tape.EncodeI16(buffer[offsetCode:], value)}
|
||||||
|
if value, ok := msg.Description.Get(); ok {
|
||||||
|
tape.EncodeString(buffer[offsetDescription:], value)}
|
||||||
|
return buffer, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalBinary dencodes the data from a buffer int this message.
|
||||||
|
func (msg *MessageError) UnmarshalBinary(buffer []byte) error {
|
||||||
|
pairs, err := tape.DecodePairs(buffer)
|
||||||
|
if err != nil { return err }
|
||||||
|
foundRequired := 0
|
||||||
|
for tag, data := range pairs {
|
||||||
|
switch tag {
|
||||||
|
case 0:
|
||||||
|
value, err := tape.DecodeI16[uint16](data)
|
||||||
|
if err != nil { return err }
|
||||||
|
msg.Code = value
|
||||||
|
foundRequired ++
|
||||||
|
case 1:
|
||||||
|
value, err := tape.DecodeString[string](data)
|
||||||
|
if err != nil { return err }
|
||||||
|
msg.Description = hopp.O(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if foundRequired != 1 { return hopp.ErrTablePairMissing }
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// (1) Success is sent by a party when it has successfully completed a task given to it by the other party. The sending party must immediately close the transaction after this message is sent.
|
||||||
|
type MessageSuccess struct {
|
||||||
|
}
|
||||||
|
|
||||||
|
// Method returns the method number of the message.
|
||||||
|
func (msg MessageSuccess) Method() uint16 {
|
||||||
|
return 1
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalBinary encodes the data in this message into a buffer.
|
||||||
|
func (msg *MessageSuccess) MarshalBinary() ([]byte, error) {
|
||||||
|
size := 0
|
||||||
|
count := 0
|
||||||
|
if size > 0xFFFF { return nil, hopp.ErrPayloadTooLarge}
|
||||||
|
if count > 0xFFFF { return nil, hopp.ErrPayloadTooLarge}
|
||||||
|
buffer := make([]byte, 2 + 4 * count + size)
|
||||||
|
tape.EncodeI16(buffer[:2], uint16(count))
|
||||||
|
return buffer, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalBinary dencodes the data from a buffer int this message.
|
||||||
|
func (msg *MessageSuccess) UnmarshalBinary(buffer []byte) error {
|
||||||
|
// no fields
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// (256) UpdateProfile is sent by the client in a new transaction to update the profile details that will be shown to other connected clients.
|
||||||
|
type MessageUpdateProfile struct {
|
||||||
|
/* 0 */ Nickname hopp.Option[string]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Method returns the method number of the message.
|
||||||
|
func (msg MessageUpdateProfile) Method() uint16 {
|
||||||
|
return 256
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalBinary encodes the data in this message into a buffer.
|
||||||
|
func (msg *MessageUpdateProfile) MarshalBinary() ([]byte, error) {
|
||||||
|
size := 0
|
||||||
|
count := 0
|
||||||
|
offsetNickname := size
|
||||||
|
if value, ok := msg.Nickname.Get(); ok {
|
||||||
|
count ++
|
||||||
|
size += len(value) }
|
||||||
|
if size > 0xFFFF { return nil, hopp.ErrPayloadTooLarge}
|
||||||
|
if count > 0xFFFF { return nil, hopp.ErrPayloadTooLarge}
|
||||||
|
buffer := make([]byte, 2 + 4 * count + size)
|
||||||
|
tape.EncodeI16(buffer[:2], uint16(count))
|
||||||
|
if value, ok := msg.Nickname.Get(); ok {
|
||||||
|
tape.EncodeString(buffer[offsetNickname:], value)}
|
||||||
|
return buffer, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalBinary dencodes the data from a buffer int this message.
|
||||||
|
func (msg *MessageUpdateProfile) UnmarshalBinary(buffer []byte) error {
|
||||||
|
pairs, err := tape.DecodePairs(buffer)
|
||||||
|
if err != nil { return err }
|
||||||
|
foundRequired := 0
|
||||||
|
for tag, data := range pairs {
|
||||||
|
switch tag {
|
||||||
|
case 0:
|
||||||
|
value, err := tape.DecodeString[string](data)
|
||||||
|
if err != nil { return err }
|
||||||
|
msg.Nickname = hopp.O(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if foundRequired != 1 { return hopp.ErrTablePairMissing }
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// (512) Join is sent by the client when it wishes to join a room. It must begin a new transaction, and that transaction will persist while the user is in that room. Messages having to do with the room will be sent along this transaction. To leave the room, the client must close the transaction.
|
||||||
|
type MessageJoin struct {
|
||||||
|
/* 0 */ Room string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Method returns the method number of the message.
|
||||||
|
func (msg MessageJoin) Method() uint16 {
|
||||||
|
return 512
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalBinary encodes the data in this message into a buffer.
|
||||||
|
func (msg *MessageJoin) MarshalBinary() ([]byte, error) {
|
||||||
|
size := 0
|
||||||
|
count := 1
|
||||||
|
offsetRoom := size
|
||||||
|
{ value := msg.Room
|
||||||
|
size += len(value) }
|
||||||
|
if size > 0xFFFF { return nil, hopp.ErrPayloadTooLarge}
|
||||||
|
if count > 0xFFFF { return nil, hopp.ErrPayloadTooLarge}
|
||||||
|
buffer := make([]byte, 2 + 4 * count + size)
|
||||||
|
tape.EncodeI16(buffer[:2], uint16(count))
|
||||||
|
{ value := msg.Room
|
||||||
|
tape.EncodeString(buffer[offsetRoom:], value)}
|
||||||
|
return buffer, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalBinary dencodes the data from a buffer int this message.
|
||||||
|
func (msg *MessageJoin) UnmarshalBinary(buffer []byte) error {
|
||||||
|
pairs, err := tape.DecodePairs(buffer)
|
||||||
|
if err != nil { return err }
|
||||||
|
for tag, data := range pairs {
|
||||||
|
switch tag {
|
||||||
|
case 0:
|
||||||
|
value, err := tape.DecodeString[string](data)
|
||||||
|
if err != nil { return err }
|
||||||
|
msg.Room = value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// (513) Chat is sent by the client when it wishes to post a message to the room. It is also relayed by the server to other clients to notify them of the message. It must be sent within a room transaction.
|
||||||
|
type MessageChat struct {
|
||||||
|
/* 0 */ Nickname hopp.Option[string]
|
||||||
|
/* 1 */ Content string
|
||||||
|
}
|
||||||
|
|
||||||
|
// Method returns the method number of the message.
|
||||||
|
func (msg MessageChat) Method() uint16 {
|
||||||
|
return 513
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalBinary encodes the data in this message into a buffer.
|
||||||
|
func (msg *MessageChat) MarshalBinary() ([]byte, error) {
|
||||||
|
size := 0
|
||||||
|
count := 1
|
||||||
|
offsetNickname := size
|
||||||
|
if value, ok := msg.Nickname.Get(); ok {
|
||||||
|
count ++
|
||||||
|
size += len(value) }
|
||||||
|
offsetContent := size
|
||||||
|
{ value := msg.Content
|
||||||
|
size += len(value) }
|
||||||
|
if size > 0xFFFF { return nil, hopp.ErrPayloadTooLarge}
|
||||||
|
if count > 0xFFFF { return nil, hopp.ErrPayloadTooLarge}
|
||||||
|
buffer := make([]byte, 2 + 4 * count + size)
|
||||||
|
tape.EncodeI16(buffer[:2], uint16(count))
|
||||||
|
if value, ok := msg.Nickname.Get(); ok {
|
||||||
|
tape.EncodeString(buffer[offsetNickname:], value)}
|
||||||
|
{ value := msg.Content
|
||||||
|
tape.EncodeString(buffer[offsetContent:], value)}
|
||||||
|
return buffer, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalBinary dencodes the data from a buffer int this message.
|
||||||
|
func (msg *MessageChat) UnmarshalBinary(buffer []byte) error {
|
||||||
|
pairs, err := tape.DecodePairs(buffer)
|
||||||
|
if err != nil { return err }
|
||||||
|
foundRequired := 0
|
||||||
|
for tag, data := range pairs {
|
||||||
|
switch tag {
|
||||||
|
case 0:
|
||||||
|
value, err := tape.DecodeString[string](data)
|
||||||
|
if err != nil { return err }
|
||||||
|
msg.Nickname = hopp.O(value)
|
||||||
|
case 1:
|
||||||
|
value, err := tape.DecodeString[string](data)
|
||||||
|
if err != nil { return err }
|
||||||
|
msg.Content = value
|
||||||
|
foundRequired ++
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if foundRequired != 1 { return hopp.ErrTablePairMissing }
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// (768) JoinNotify is sent by the server when another client joins the room. It must be sent within a room transaction.
|
||||||
|
type MessageJoinNotify struct {
|
||||||
|
/* 0 */ Nickname hopp.Option[string]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Method returns the method number of the message.
|
||||||
|
func (msg MessageJoinNotify) Method() uint16 {
|
||||||
|
return 768
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalBinary encodes the data in this message into a buffer.
|
||||||
|
func (msg *MessageJoinNotify) MarshalBinary() ([]byte, error) {
|
||||||
|
size := 0
|
||||||
|
count := 0
|
||||||
|
offsetNickname := size
|
||||||
|
if value, ok := msg.Nickname.Get(); ok {
|
||||||
|
count ++
|
||||||
|
size += len(value) }
|
||||||
|
if size > 0xFFFF { return nil, hopp.ErrPayloadTooLarge}
|
||||||
|
if count > 0xFFFF { return nil, hopp.ErrPayloadTooLarge}
|
||||||
|
buffer := make([]byte, 2 + 4 * count + size)
|
||||||
|
tape.EncodeI16(buffer[:2], uint16(count))
|
||||||
|
if value, ok := msg.Nickname.Get(); ok {
|
||||||
|
tape.EncodeString(buffer[offsetNickname:], value)}
|
||||||
|
return buffer, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalBinary dencodes the data from a buffer int this message.
|
||||||
|
func (msg *MessageJoinNotify) UnmarshalBinary(buffer []byte) error {
|
||||||
|
pairs, err := tape.DecodePairs(buffer)
|
||||||
|
if err != nil { return err }
|
||||||
|
foundRequired := 0
|
||||||
|
for tag, data := range pairs {
|
||||||
|
switch tag {
|
||||||
|
case 0:
|
||||||
|
value, err := tape.DecodeString[string](data)
|
||||||
|
if err != nil { return err }
|
||||||
|
msg.Nickname = hopp.O(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if foundRequired != 1 { return hopp.ErrTablePairMissing }
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// (769) LeaveNotify is sent by the server when another client leaves the room. It must be sent within a room transaction.
|
||||||
|
type MessageLeaveNotify struct {
|
||||||
|
/* 0 */ Nickname hopp.Option[string]
|
||||||
|
}
|
||||||
|
|
||||||
|
// Method returns the method number of the message.
|
||||||
|
func (msg MessageLeaveNotify) Method() uint16 {
|
||||||
|
return 769
|
||||||
|
}
|
||||||
|
|
||||||
|
// MarshalBinary encodes the data in this message into a buffer.
|
||||||
|
func (msg *MessageLeaveNotify) MarshalBinary() ([]byte, error) {
|
||||||
|
size := 0
|
||||||
|
count := 0
|
||||||
|
offsetNickname := size
|
||||||
|
if value, ok := msg.Nickname.Get(); ok {
|
||||||
|
count ++
|
||||||
|
size += len(value) }
|
||||||
|
if size > 0xFFFF { return nil, hopp.ErrPayloadTooLarge}
|
||||||
|
if count > 0xFFFF { return nil, hopp.ErrPayloadTooLarge}
|
||||||
|
buffer := make([]byte, 2 + 4 * count + size)
|
||||||
|
tape.EncodeI16(buffer[:2], uint16(count))
|
||||||
|
if value, ok := msg.Nickname.Get(); ok {
|
||||||
|
tape.EncodeString(buffer[offsetNickname:], value)}
|
||||||
|
return buffer, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// UnmarshalBinary dencodes the data from a buffer int this message.
|
||||||
|
func (msg *MessageLeaveNotify) UnmarshalBinary(buffer []byte) error {
|
||||||
|
pairs, err := tape.DecodePairs(buffer)
|
||||||
|
if err != nil { return err }
|
||||||
|
foundRequired := 0
|
||||||
|
for tag, data := range pairs {
|
||||||
|
switch tag {
|
||||||
|
case 0:
|
||||||
|
value, err := tape.DecodeString[string](data)
|
||||||
|
if err != nil { return err }
|
||||||
|
msg.Nickname = hopp.O(value)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
if foundRequired != 1 { return hopp.ErrTablePairMissing }
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
70
examples/chat/protocol.md
Normal file
70
examples/chat/protocol.md
Normal file
@@ -0,0 +1,70 @@
|
|||||||
|
# Chat Protocol
|
||||||
|
|
||||||
|
This document describes a simple chat protocol. To re-generate the source files,
|
||||||
|
run `go run ./cmd/hopp-generate examples/chat/protocol.md examples/chat/protocol`
|
||||||
|
|
||||||
|
## Messages
|
||||||
|
|
||||||
|
### 0000 Error
|
||||||
|
| Tag | Name | Type | Required |
|
||||||
|
| --: | ----------- | ------ | -------- |
|
||||||
|
| 0 | Code | U16 | Yes |
|
||||||
|
| 1 | Description | String | No |
|
||||||
|
|
||||||
|
Error is sent by a party when the other party has done something erroneous. The
|
||||||
|
valid error codes are:
|
||||||
|
|
||||||
|
- 0: General, unspecified error
|
||||||
|
|
||||||
|
The description field, if specified, determines a human-readable error to be
|
||||||
|
shown to the user. The sending party must immediately close the transaction
|
||||||
|
after this message is sent.
|
||||||
|
|
||||||
|
### 0001 Success
|
||||||
|
Success is sent by a party when it has successfully completed a task given to it
|
||||||
|
by the other party. The sending party must immediately close the transaction
|
||||||
|
after this message is sent.
|
||||||
|
|
||||||
|
### 0100 UpdateProfile
|
||||||
|
| Tag | Name | Type | Required |
|
||||||
|
| --: | -------- | ------ | -------- |
|
||||||
|
| 0 | Nickname | String | No |
|
||||||
|
|
||||||
|
UpdateProfile is sent by the client in a new transaction to update the profile
|
||||||
|
details that will be shown to other connected clients.
|
||||||
|
|
||||||
|
### 0200 Join
|
||||||
|
| Tag | Name | Type | Required |
|
||||||
|
| --: | -------- | ------ | -------- |
|
||||||
|
| 0 | Room | String | Yes |
|
||||||
|
|
||||||
|
Join is sent by the client when it wishes to join a room. It must begin a new
|
||||||
|
transaction, and that transaction will persist while the user is in that room.
|
||||||
|
Messages having to do with the room will be sent along this transaction. To
|
||||||
|
leave the room, the client must close the transaction.
|
||||||
|
|
||||||
|
### 0201 Chat
|
||||||
|
| Tag | Name | Type | Required |
|
||||||
|
| --: | -------- | ------ | -------- |
|
||||||
|
| 0 | Nickname | String | No |
|
||||||
|
| 1 | Content | String | Yes |
|
||||||
|
|
||||||
|
Chat is sent by the client when it wishes to post a message to the room. It is
|
||||||
|
also relayed by the server to other clients to notify them of the message. It
|
||||||
|
must be sent within a room transaction.
|
||||||
|
|
||||||
|
### 0300 JoinNotify
|
||||||
|
| Tag | Name | Type | Required |
|
||||||
|
| --: | -------- | ------ | -------- |
|
||||||
|
| 0 | Nickname | String | No |
|
||||||
|
|
||||||
|
JoinNotify is sent by the server when another client joins the room. It must be
|
||||||
|
sent within a room transaction.
|
||||||
|
|
||||||
|
### 0301 LeaveNotify
|
||||||
|
| Tag | Name | Type | Required |
|
||||||
|
| --: | -------- | ------ | -------- |
|
||||||
|
| 0 | Nickname | String | No |
|
||||||
|
|
||||||
|
LeaveNotify is sent by the server when another client leaves the room. It must
|
||||||
|
be sent within a room transaction.
|
||||||
164
examples/chat/server/main.go
Normal file
164
examples/chat/server/main.go
Normal file
@@ -0,0 +1,164 @@
|
|||||||
|
package main
|
||||||
|
|
||||||
|
import "os"
|
||||||
|
import "fmt"
|
||||||
|
import "log"
|
||||||
|
import "errors"
|
||||||
|
import "crypto/tls"
|
||||||
|
import "git.tebibyte.media/sashakoshka/hopp"
|
||||||
|
import "git.tebibyte.media/sashakoshka/go-util/sync"
|
||||||
|
import "git.tebibyte.media/sashakoshka/go-util/container"
|
||||||
|
import "git.tebibyte.media/sashakoshka/hopp/examples/chat"
|
||||||
|
|
||||||
|
var clients usync.RWMonitor[ucontainer.Set[*client]]
|
||||||
|
|
||||||
|
func main() {
|
||||||
|
name := os.Args[0]
|
||||||
|
if len(os.Args) != 4 {
|
||||||
|
fmt.Fprintf(os.Stderr, "Usage: %s HOST:PORT CERT KEY\n", name)
|
||||||
|
os.Exit(2)
|
||||||
|
}
|
||||||
|
address := os.Args[1]
|
||||||
|
certPath := os.Args[2]
|
||||||
|
keyPath := os.Args[3]
|
||||||
|
err := host(address, certPath, keyPath)
|
||||||
|
handleErr(1, err)
|
||||||
|
}
|
||||||
|
|
||||||
|
func host(address string, certPath, keyPath string) error {
|
||||||
|
keyPair, err := tls.LoadX509KeyPair(certPath, keyPath)
|
||||||
|
if err != nil { return err }
|
||||||
|
listener, err := hopp.ListenQUIC("quic", address, &tls.Config {
|
||||||
|
InsecureSkipVerify: true,
|
||||||
|
Certificates: []tls.Certificate { keyPair },
|
||||||
|
})
|
||||||
|
clients.Set(ucontainer.NewSet[*client]())
|
||||||
|
if err != nil { return err }
|
||||||
|
log.Printf("(i) hosting on %s", address)
|
||||||
|
for {
|
||||||
|
conn, err := listener.Accept()
|
||||||
|
if err != nil { return err }
|
||||||
|
client := &client {
|
||||||
|
conn: conn,
|
||||||
|
rooms: usync.NewRWMonitor(make(map[string] hopp.Trans)),
|
||||||
|
}
|
||||||
|
go client.run()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
type client struct {
|
||||||
|
conn hopp.Conn
|
||||||
|
nickname hopp.Option[string]
|
||||||
|
rooms usync.RWMonitor[map[string] hopp.Trans]
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *client) run() {
|
||||||
|
log.Printf("-=E %v connected", this.conn.RemoteAddr())
|
||||||
|
defer log.Printf("X=- %v disconnected", this.conn.RemoteAddr())
|
||||||
|
defer this.conn.Close()
|
||||||
|
|
||||||
|
for {
|
||||||
|
log.Println("accepting transaction")
|
||||||
|
trans, err := this.conn.AcceptTrans()
|
||||||
|
log.Println("accepted transaction")
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("XXX %v failed: %v", this.conn.RemoteAddr(), err)
|
||||||
|
}
|
||||||
|
go this.runTrans(trans)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *client) runTrans(trans hopp.Trans) {
|
||||||
|
defer trans.Close()
|
||||||
|
message, err := chat.Receive(trans)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf(
|
||||||
|
"XXX %v transaction failed: %v",
|
||||||
|
this.conn.RemoteAddr(), err)
|
||||||
|
}
|
||||||
|
switch message := message.(type) {
|
||||||
|
case *chat.MessageJoin:
|
||||||
|
err = this.transTalk(trans, message)
|
||||||
|
}
|
||||||
|
if err != nil {
|
||||||
|
var actual *chat.MessageError
|
||||||
|
if !errors.As(err, &actual) {
|
||||||
|
chat.Send(trans, &chat.MessageError {
|
||||||
|
Description: hopp.O(fmt.Sprint(err)),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
log.Printf("XXX %v transaction failed: %v", this.conn.RemoteAddr(), err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *client) transTalk(trans hopp.Trans, initial *chat.MessageJoin) error {
|
||||||
|
room := initial.Room
|
||||||
|
err := this.joinRoom(trans, room)
|
||||||
|
if err != nil { return err }
|
||||||
|
defer this.leaveRoom(trans, room)
|
||||||
|
for {
|
||||||
|
message, err := chat.Receive(trans)
|
||||||
|
if err != nil { return err }
|
||||||
|
switch message := message.(type) {
|
||||||
|
case *chat.MessageChat:
|
||||||
|
err := this.handleMessageChat(trans, room, message)
|
||||||
|
if err != nil { return err }
|
||||||
|
case *chat.MessageError:
|
||||||
|
return message
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *client) handleMessageChat(trans hopp.Trans, room string, message *chat.MessageChat) error {
|
||||||
|
log.Println("(). %s #%s: %s", this.nickname.Default("Anonymous"), room, message.Content)
|
||||||
|
clients, done := clients.RBorrow()
|
||||||
|
defer done()
|
||||||
|
for client := range clients {
|
||||||
|
err := client.relayMessage(room, message)
|
||||||
|
if err != nil {
|
||||||
|
log.Printf("!!! %v", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *client) relayMessage(room string, message *chat.MessageChat) error {
|
||||||
|
rooms, done := this.rooms.RBorrow()
|
||||||
|
defer done()
|
||||||
|
if trans, ok := rooms[room]; ok {
|
||||||
|
err := chat.Send(trans, message)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("could not relay message: %w", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *client) joinRoom(trans hopp.Trans, room string) error {
|
||||||
|
rooms, done := this.rooms.Borrow()
|
||||||
|
defer done()
|
||||||
|
if _, exists := rooms[room]; exists {
|
||||||
|
return fmt.Errorf("already joined %s", room)
|
||||||
|
}
|
||||||
|
rooms[room] = trans
|
||||||
|
log.Printf("--> user %s joined #%s", this.nickname.Default("Anonymous"), room)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *client) leaveRoom(trans hopp.Trans, room string) error {
|
||||||
|
rooms, done := this.rooms.Borrow()
|
||||||
|
defer done()
|
||||||
|
if _, exists := rooms[room]; !exists {
|
||||||
|
return fmt.Errorf("not in %s", room)
|
||||||
|
}
|
||||||
|
delete(rooms, room)
|
||||||
|
log.Printf("<-- user %s left #%s", this.nickname.Default("Anonymous"), room)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func handleErr(code int, err error) {
|
||||||
|
if err != nil {
|
||||||
|
fmt.Fprintf(os.Stderr, "%s: %v\n", os.Args[0], err)
|
||||||
|
os.Exit(code)
|
||||||
|
}
|
||||||
|
}
|
||||||
2
go.mod
2
go.mod
@@ -3,7 +3,7 @@ module git.tebibyte.media/sashakoshka/hopp
|
|||||||
go 1.23.0
|
go 1.23.0
|
||||||
|
|
||||||
require (
|
require (
|
||||||
git.tebibyte.media/sashakoshka/go-util v0.9.0
|
git.tebibyte.media/sashakoshka/go-util v0.9.1
|
||||||
github.com/gomarkdown/markdown v0.0.0-20241205020045-f7e15b2f3e62
|
github.com/gomarkdown/markdown v0.0.0-20241205020045-f7e15b2f3e62
|
||||||
github.com/quic-go/quic-go v0.48.2
|
github.com/quic-go/quic-go v0.48.2
|
||||||
)
|
)
|
||||||
|
|||||||
4
go.sum
4
go.sum
@@ -1,5 +1,5 @@
|
|||||||
git.tebibyte.media/sashakoshka/go-util v0.9.0 h1:s4u6USI1yRqTFNv52qJlEy1qO9pfF2+U6IklxkSLckY=
|
git.tebibyte.media/sashakoshka/go-util v0.9.1 h1:eGAbLwYhOlh4aq/0w+YnJcxT83yPhXtxnYMzz6K7xGo=
|
||||||
git.tebibyte.media/sashakoshka/go-util v0.9.0/go.mod h1:0Q1t+PePdx6tFYkRuJNcpM1Mru7wE6X+it1kwuOH+6Y=
|
git.tebibyte.media/sashakoshka/go-util v0.9.1/go.mod h1:0Q1t+PePdx6tFYkRuJNcpM1Mru7wE6X+it1kwuOH+6Y=
|
||||||
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
|
github.com/chzyer/logex v1.1.10/go.mod h1:+Ywpsq7O8HXn0nuIou7OrIPyXbp3wmkHB+jjWRnGsAI=
|
||||||
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
|
github.com/chzyer/readline v0.0.0-20180603132655-2972be24d48e/go.mod h1:nSuG5e5PlCu98SY8svDHJxuZscDgtXS6KTTbou5AhLI=
|
||||||
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
|
github.com/chzyer/test v0.0.0-20180213035817-a1ea475d72b1/go.mod h1:Q3SI9o4m/ZMnBNeIyt5eFwwo7qiLfzFZmjNmxjkiQlU=
|
||||||
|
|||||||
356
metadapta.go
356
metadapta.go
@@ -4,10 +4,16 @@ import "io"
|
|||||||
import "fmt"
|
import "fmt"
|
||||||
import "net"
|
import "net"
|
||||||
import "sync"
|
import "sync"
|
||||||
|
import "sync/atomic"
|
||||||
import "git.tebibyte.media/sashakoshka/hopp/tape"
|
import "git.tebibyte.media/sashakoshka/hopp/tape"
|
||||||
import "git.tebibyte.media/sashakoshka/go-util/sync"
|
import "git.tebibyte.media/sashakoshka/go-util/sync"
|
||||||
|
|
||||||
|
// TODO investigate why 30 never reaches the server, causing it to wait for ever
|
||||||
|
// and never close the connection, causing the client to also wait forever
|
||||||
|
|
||||||
|
const closeMethod = 0xFFFF
|
||||||
const int64Max = int64((^uint64(0)) >> 1)
|
const int64Max = int64((^uint64(0)) >> 1)
|
||||||
|
const defaultChunkSize = 0x1000
|
||||||
|
|
||||||
// Party represents a side of a connection.
|
// Party represents a side of a connection.
|
||||||
type Party bool; const (
|
type Party bool; const (
|
||||||
@@ -15,7 +21,16 @@ type Party bool; const (
|
|||||||
ClientSide Party = true
|
ClientSide Party = true
|
||||||
)
|
)
|
||||||
|
|
||||||
|
func (party Party) String() string {
|
||||||
|
if party == ServerSide {
|
||||||
|
return "server"
|
||||||
|
} else {
|
||||||
|
return "client"
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
type a struct {
|
type a struct {
|
||||||
|
sizeLimit int64
|
||||||
underlying net.Conn
|
underlying net.Conn
|
||||||
party Party
|
party Party
|
||||||
transID int64
|
transID int64
|
||||||
@@ -31,6 +46,7 @@ type a struct {
|
|||||||
// oriented transport such as TCP or UNIX domain stream sockets.
|
// oriented transport such as TCP or UNIX domain stream sockets.
|
||||||
func AdaptA(underlying net.Conn, party Party) Conn {
|
func AdaptA(underlying net.Conn, party Party) Conn {
|
||||||
conn := &a {
|
conn := &a {
|
||||||
|
sizeLimit: defaultSizeLimit,
|
||||||
underlying: underlying,
|
underlying: underlying,
|
||||||
party: party,
|
party: party,
|
||||||
transMap: make(map[int64] *transA),
|
transMap: make(map[int64] *transA),
|
||||||
@@ -48,7 +64,7 @@ func AdaptA(underlying net.Conn, party Party) Conn {
|
|||||||
|
|
||||||
func (this *a) Close() error {
|
func (this *a) Close() error {
|
||||||
close(this.done)
|
close(this.done)
|
||||||
return this.underlying.Close()
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *a) LocalAddr() net.Addr {
|
func (this *a) LocalAddr() net.Addr {
|
||||||
@@ -62,30 +78,41 @@ func (this *a) RemoteAddr() net.Addr {
|
|||||||
func (this *a) OpenTrans() (Trans, error) {
|
func (this *a) OpenTrans() (Trans, error) {
|
||||||
this.transLock.Lock()
|
this.transLock.Lock()
|
||||||
defer this.transLock.Unlock()
|
defer this.transLock.Unlock()
|
||||||
|
if this.transID == int64Max {
|
||||||
|
return nil, fmt.Errorf("could not open transaction: %w", ErrIntegerOverflow)
|
||||||
|
}
|
||||||
id := this.transID
|
id := this.transID
|
||||||
this.transID ++
|
|
||||||
trans := &transA {
|
trans := &transA {
|
||||||
parent: this,
|
parent: this,
|
||||||
id: id,
|
id: id,
|
||||||
incoming: usync.NewGate[incomingMessage](),
|
incoming: usync.NewGate[incomingMessage](),
|
||||||
}
|
}
|
||||||
this.transMap[id] = trans
|
this.transMap[id] = trans
|
||||||
if this.transID == int64Max {
|
if this.party == ClientSide {
|
||||||
return nil, fmt.Errorf("could not open transaction: %w", ErrIntegerOverflow)
|
this.transID ++
|
||||||
|
} else {
|
||||||
|
this.transID --
|
||||||
}
|
}
|
||||||
this.transID ++
|
|
||||||
return trans, nil
|
return trans, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *a) AcceptTrans() (Trans, error) {
|
func (this *a) AcceptTrans() (Trans, error) {
|
||||||
|
eof := fmt.Errorf("could not accept transaction: %w", io.EOF)
|
||||||
select {
|
select {
|
||||||
case trans := <- this.transChan:
|
case trans := <- this.transChan:
|
||||||
|
if trans == nil {
|
||||||
|
return nil, eof
|
||||||
|
}
|
||||||
return trans, nil
|
return trans, nil
|
||||||
case <- this.done:
|
case <- this.done:
|
||||||
return nil, fmt.Errorf("could not accept transaction: %w", io.EOF)
|
return nil, eof
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *a) SetSizeLimit(limit int64) {
|
||||||
|
this.sizeLimit = limit
|
||||||
|
}
|
||||||
|
|
||||||
func (this *a) unlistTransactionSafe(id int64) {
|
func (this *a) unlistTransactionSafe(id int64) {
|
||||||
this.transLock.Lock()
|
this.transLock.Lock()
|
||||||
defer this.transLock.Unlock()
|
defer this.transLock.Unlock()
|
||||||
@@ -94,28 +121,33 @@ func (this *a) unlistTransactionSafe(id int64) {
|
|||||||
|
|
||||||
func (this *a) sendMessageSafe(trans int64, method uint16, data []byte) error {
|
func (this *a) sendMessageSafe(trans int64, method uint16, data []byte) error {
|
||||||
this.sendLock.Lock()
|
this.sendLock.Lock()
|
||||||
defer this.sendLock.Lock()
|
defer this.sendLock.Unlock()
|
||||||
return encodeMessageA(this.underlying, trans, method, data)
|
return encodeMessageA(this.underlying, this.sizeLimit, trans, method, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *a) receive() {
|
func (this *a) receive() {
|
||||||
defer func() {
|
defer func() {
|
||||||
this.underlying.Close()
|
this.underlying.Close()
|
||||||
|
close(this.transChan)
|
||||||
this.transLock.Lock()
|
this.transLock.Lock()
|
||||||
defer this.transLock.Lock()
|
defer this.transLock.Unlock()
|
||||||
for _, trans := range this.transMap {
|
for _, trans := range this.transMap {
|
||||||
trans.Close()
|
trans.closeDontUnlist()
|
||||||
}
|
}
|
||||||
clear(this.transMap)
|
clear(this.transMap)
|
||||||
|
this.underlying.Close()
|
||||||
}()
|
}()
|
||||||
|
|
||||||
|
// receive MMBs in a loop and forward them to transactions until shit
|
||||||
|
// starts closing
|
||||||
for {
|
for {
|
||||||
transID, method, payload, err := decodeMessageA(this.underlying)
|
transID, method, chunked, payload, err := decodeMessageA(this.underlying, this.sizeLimit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
this.err = fmt.Errorf("could not receive message: %w", err)
|
this.err = fmt.Errorf("could not receive message: %w", err)
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
err = this.receiveMultiplex(transID, method, payload)
|
err = this.multiplexMMB(transID, method, chunked, payload)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
this.err = fmt.Errorf("could not receive message: %w", err)
|
this.err = fmt.Errorf("could not receive message: %w", err)
|
||||||
return
|
return
|
||||||
@@ -123,41 +155,81 @@ func (this *a) receive() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *a) receiveMultiplex(transID int64, method uint16, payload []byte) error {
|
func (this *a) multiplexMMB(transID int64, method uint16, chunked bool, payload []byte) error {
|
||||||
if transID == 0 || this.party == partyFromTransID(transID) {
|
if transID == 0 { return ErrMessageMalformed }
|
||||||
return ErrMessageMalformed
|
|
||||||
}
|
|
||||||
|
|
||||||
this.transLock.Lock()
|
|
||||||
defer this.transLock.Unlock()
|
|
||||||
|
|
||||||
trans, ok := this.transMap[transID]
|
trans, err := func() (*transA, error) {
|
||||||
if !ok {
|
this.transLock.Lock()
|
||||||
trans = &transA {
|
defer this.transLock.Unlock()
|
||||||
parent: this,
|
|
||||||
id: transID,
|
trans, ok := this.transMap[transID]
|
||||||
incoming: usync.NewGate[incomingMessage](),
|
if !ok {
|
||||||
|
// check if this is a superfluous close message and just
|
||||||
|
// do nothing if so
|
||||||
|
if method == closeMethod {
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// it is forbidden for the other party to initiate a transaction
|
||||||
|
// with an ID from this party
|
||||||
|
if this.party == partyFromTransID(transID) {
|
||||||
|
return nil, ErrMessageMalformed
|
||||||
|
}
|
||||||
|
trans = &transA {
|
||||||
|
parent: this,
|
||||||
|
id: transID,
|
||||||
|
incoming: usync.NewGate[incomingMessage](),
|
||||||
|
}
|
||||||
|
this.transMap[transID] = trans
|
||||||
|
this.transChan <- trans
|
||||||
}
|
}
|
||||||
this.transChan <- trans
|
return trans, nil
|
||||||
|
}()
|
||||||
|
if err != nil { return err }
|
||||||
|
|
||||||
|
if trans == nil {
|
||||||
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
trans.incoming.Send(incomingMessage {
|
if method == closeMethod {
|
||||||
method: method,
|
return trans.Close()
|
||||||
payload: payload,
|
} else {
|
||||||
})
|
trans.incoming.Send(incomingMessage {
|
||||||
|
method: method,
|
||||||
|
chunked: chunked,
|
||||||
|
payload: payload,
|
||||||
|
})
|
||||||
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// most methods in transA don't need to be goroutine safe except those marked
|
||||||
|
// as such
|
||||||
type transA struct {
|
type transA struct {
|
||||||
parent *a
|
parent *a
|
||||||
id int64
|
id int64
|
||||||
incoming usync.Gate[incomingMessage]
|
incoming usync.Gate[incomingMessage]
|
||||||
|
currentReader io.Reader
|
||||||
|
currentWriter io.Closer
|
||||||
|
writeBuffer []byte
|
||||||
|
closed atomic.Bool
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *transA) Close() error {
|
func (this *transA) Close() error {
|
||||||
this.incoming.Close()
|
// MUST be goroutine safe
|
||||||
|
err := this.closeDontUnlist()
|
||||||
this.parent.unlistTransactionSafe(this.ID())
|
this.parent.unlistTransactionSafe(this.ID())
|
||||||
return nil
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *transA) closeDontUnlist() (err error) {
|
||||||
|
// MUST be goroutine safe
|
||||||
|
this.incoming.Close()
|
||||||
|
if !this.closed.Load() {
|
||||||
|
err = this.Send(closeMethod, nil)
|
||||||
|
}
|
||||||
|
this.closed.Store(true)
|
||||||
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *transA) ID() int64 {
|
func (this *transA) ID() int64 {
|
||||||
@@ -168,51 +240,213 @@ func (this *transA) Send(method uint16, data []byte) error {
|
|||||||
return this.parent.sendMessageSafe(this.id, method, data)
|
return this.parent.sendMessageSafe(this.id, method, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (this *transA) SendWriter(method uint16) (io.WriteCloser, error) {
|
||||||
|
// close previous writer if necessary
|
||||||
|
if this.currentWriter != nil {
|
||||||
|
this.currentWriter.Close()
|
||||||
|
this.currentWriter = nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// create new writer
|
||||||
|
writer := &writerA {
|
||||||
|
parent: this,
|
||||||
|
// there is only ever one writer at a time, so they can all
|
||||||
|
// share a buffer
|
||||||
|
buffer: this.writeBuffer[:0],
|
||||||
|
method: method,
|
||||||
|
chunkSize: defaultChunkSize,
|
||||||
|
open: true,
|
||||||
|
}
|
||||||
|
this.currentWriter = writer
|
||||||
|
return writer, nil
|
||||||
|
}
|
||||||
|
|
||||||
func (this *transA) Receive() (method uint16, data []byte, err error) {
|
func (this *transA) Receive() (method uint16, data []byte, err error) {
|
||||||
message, ok := <- this.incoming.Receive()
|
method, reader, err := this.ReceiveReader()
|
||||||
if !ok {
|
if err != nil { return 0, nil, err }
|
||||||
if this.parent.err == nil {
|
data, err = io.ReadAll(reader)
|
||||||
return 0, nil, fmt.Errorf("could not receive message: %w", io.EOF)
|
if err != nil { return 0, nil, err }
|
||||||
} else {
|
return method, data, nil
|
||||||
return 0, nil, this.parent.err
|
}
|
||||||
|
|
||||||
|
func (this *transA) ReceiveReader() (uint16, io.Reader, error) {
|
||||||
|
// if the transaction has been closed, return an io.EOF
|
||||||
|
if this.closed.Load() {
|
||||||
|
return 0, nil, io.EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
// drain previous reader if necessary
|
||||||
|
if this.currentReader != nil {
|
||||||
|
io.Copy(io.Discard, this.currentReader)
|
||||||
|
}
|
||||||
|
|
||||||
|
// create new reader
|
||||||
|
reader := &readerA {
|
||||||
|
parent: this,
|
||||||
|
}
|
||||||
|
method, err := reader.pull()
|
||||||
|
if err != nil { return 0, nil, err}
|
||||||
|
this.currentReader = reader
|
||||||
|
return method, reader, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type readerA struct {
|
||||||
|
parent *transA
|
||||||
|
leftover []byte
|
||||||
|
eof bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// pull pulls the next MMB in this message from the transaction.
|
||||||
|
func (this *readerA) pull() (uint16, error) {
|
||||||
|
// if the previous message ended the chain, return an io.EOF
|
||||||
|
if this.eof {
|
||||||
|
return 0, io.EOF
|
||||||
|
}
|
||||||
|
|
||||||
|
// get an MMB from the transaction we are a part of
|
||||||
|
receive := this.parent.incoming.Receive()
|
||||||
|
if receive != nil {
|
||||||
|
if message, ok := <- receive; ok {
|
||||||
|
if message.method != closeMethod {
|
||||||
|
this.leftover = append(this.leftover, message.payload...)
|
||||||
|
if !message.chunked {
|
||||||
|
this.eof = true
|
||||||
|
}
|
||||||
|
return message.method, nil
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return message.method, message.payload, nil
|
|
||||||
|
// close and return error on failure
|
||||||
|
this.eof = true
|
||||||
|
this.parent.Close()
|
||||||
|
if this.parent.parent.err == nil {
|
||||||
|
return 0, fmt.Errorf("could not receive message: %w", io.EOF)
|
||||||
|
} else {
|
||||||
|
return 0, this.parent.parent.err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *readerA) Read(buffer []byte) (int, error) {
|
||||||
|
if len(this.leftover) == 0 {
|
||||||
|
if this.eof { return 0, io.EOF }
|
||||||
|
this.pull()
|
||||||
|
}
|
||||||
|
|
||||||
|
copied := copy(buffer, this.leftover)
|
||||||
|
this.leftover = this.leftover[copied:]
|
||||||
|
return copied, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type writerA struct {
|
||||||
|
parent *transA
|
||||||
|
buffer []byte
|
||||||
|
method uint16
|
||||||
|
chunkSize int64
|
||||||
|
open bool
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *writerA) Write(data []byte) (n int, err error) {
|
||||||
|
if !this.open { return 0, io.EOF }
|
||||||
|
toSend := data
|
||||||
|
for len(toSend) > 0 {
|
||||||
|
nn, err := this.writeOne(toSend)
|
||||||
|
n += nn
|
||||||
|
toSend = toSend[nn:]
|
||||||
|
if err != nil { return n, err }
|
||||||
|
}
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *writerA) Close() error {
|
||||||
|
this.open = false
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *writerA) writeOne(data []byte) (n int, err error) {
|
||||||
|
data = data[:min(len(data), int(this.chunkSize))]
|
||||||
|
|
||||||
|
// if there is more room, append to the buffer and exit
|
||||||
|
if int64(len(this.buffer) + len(data)) <= this.chunkSize {
|
||||||
|
this.buffer = append(this.buffer, data...)
|
||||||
|
n = len(data)
|
||||||
|
// if have a full chunk, flush
|
||||||
|
if int64(len(this.buffer)) == this.chunkSize {
|
||||||
|
err = this.flush()
|
||||||
|
if err != nil { return n, err }
|
||||||
|
}
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// if not, flush and store as much as we can in the buffer
|
||||||
|
err = this.flush()
|
||||||
|
if err != nil { return n, err }
|
||||||
|
this.buffer = append(this.buffer, data...)
|
||||||
|
return n, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *writerA) flush() error {
|
||||||
|
return this.parent.parent.sendMessageSafe(this.parent.id, this.method, this.buffer)
|
||||||
}
|
}
|
||||||
|
|
||||||
type incomingMessage struct {
|
type incomingMessage struct {
|
||||||
method uint16
|
method uint16
|
||||||
|
chunked bool
|
||||||
payload []byte
|
payload []byte
|
||||||
}
|
}
|
||||||
|
|
||||||
func encodeMessageA(writer io.Writer, trans int64, method uint16, data []byte) error {
|
func encodeMessageA(
|
||||||
buffer := make([]byte, 12 + len(data))
|
writer io.Writer,
|
||||||
|
sizeLimit int64,
|
||||||
|
trans int64,
|
||||||
|
method uint16,
|
||||||
|
data []byte,
|
||||||
|
) error {
|
||||||
|
if int64(len(data)) > sizeLimit {
|
||||||
|
return ErrPayloadTooLarge
|
||||||
|
}
|
||||||
|
buffer := make([]byte, 18 + len(data))
|
||||||
tape.EncodeI64(buffer[:8], trans)
|
tape.EncodeI64(buffer[:8], trans)
|
||||||
tape.EncodeI16(buffer[8:10], method)
|
tape.EncodeI16(buffer[8:10], method)
|
||||||
length, ok := tape.U16CastSafe(len(data))
|
tape.EncodeI64(buffer[10:18], uint64(len(data)))
|
||||||
if !ok { return ErrPayloadTooLarge }
|
copy(buffer[18:], data)
|
||||||
tape.EncodeI16(buffer[10:12], length)
|
|
||||||
copy(buffer[12:], data)
|
|
||||||
_, err := writer.Write(buffer)
|
_, err := writer.Write(buffer)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func decodeMessageA(reader io.Reader) (int64, uint16, []byte, error) {
|
func decodeMessageA(
|
||||||
headerBuffer := [12]byte { }
|
reader io.Reader,
|
||||||
_, err := io.ReadFull(reader, headerBuffer[:])
|
sizeLimit int64,
|
||||||
if err != nil { return 0, 0, nil, err }
|
) (
|
||||||
transID, err := tape.DecodeI64[int64](headerBuffer[:8])
|
transID int64,
|
||||||
if err != nil { return 0, 0, nil, err }
|
method uint16,
|
||||||
method, err := tape.DecodeI16[uint16](headerBuffer[8:10])
|
chunked bool,
|
||||||
if err != nil { return 0, 0, nil, err }
|
payloadBuffer []byte,
|
||||||
length, err := tape.DecodeI16[uint16](headerBuffer[10:12])
|
err error,
|
||||||
if err != nil { return 0, 0, nil, err }
|
) {
|
||||||
payloadBuffer := make([]byte, int(length))
|
headerBuffer := [18]byte { }
|
||||||
|
_, err = io.ReadFull(reader, headerBuffer[:])
|
||||||
|
if err != nil { return 0, 0, false, nil, err }
|
||||||
|
transID, err = tape.DecodeI64[int64](headerBuffer[:8])
|
||||||
|
if err != nil { return 0, 0, false, nil, err }
|
||||||
|
method, err = tape.DecodeI16[uint16](headerBuffer[8:10])
|
||||||
|
if err != nil { return 0, 0, false, nil, err }
|
||||||
|
size, err := tape.DecodeI64[uint64](headerBuffer[10:18])
|
||||||
|
if err != nil { return 0, 0, false, nil, err }
|
||||||
|
chunked, size = splitCCBSize(size)
|
||||||
|
if size > uint64(sizeLimit) {
|
||||||
|
return 0, 0, false, nil, ErrPayloadTooLarge
|
||||||
|
}
|
||||||
|
payloadBuffer = make([]byte, int(size))
|
||||||
_, err = io.ReadFull(reader, payloadBuffer)
|
_, err = io.ReadFull(reader, payloadBuffer)
|
||||||
if err != nil { return 0, 0, nil, err }
|
if err != nil { return 0, 0, false, nil, err }
|
||||||
return transID, method, payloadBuffer, nil
|
return transID, method, chunked, payloadBuffer, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func partyFromTransID(id int64) Party {
|
func partyFromTransID(id int64) Party {
|
||||||
return id > 0
|
return id > 0
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func splitCCBSize(size uint64) (bool, uint64) {
|
||||||
|
return size >> 63 > 1, size & 0x7FFFFFFFFFFFFFFF
|
||||||
|
}
|
||||||
|
|||||||
@@ -1,19 +1,155 @@
|
|||||||
package hopp
|
package hopp
|
||||||
|
|
||||||
import "io"
|
import "io"
|
||||||
|
import "net"
|
||||||
import "bytes"
|
import "bytes"
|
||||||
import "errors"
|
import "errors"
|
||||||
import "slices"
|
import "slices"
|
||||||
import "testing"
|
import "testing"
|
||||||
|
|
||||||
|
// some of these tests spawn goroutines that can signal a failure.
|
||||||
|
// abide by the documentation for testing.T (https://pkg.go.dev/testing#T):
|
||||||
|
//
|
||||||
|
// A test ends when its Test function returns or calls any of the methods
|
||||||
|
// FailNow, Fatal, Fatalf, SkipNow, Skip, or Skipf. Those methods, as well as
|
||||||
|
// the Parallel method, must be called only from the goroutine running the
|
||||||
|
// Test function.
|
||||||
|
//
|
||||||
|
// The other reporting methods, such as the variations of Log and Error, may
|
||||||
|
// be called simultaneously from multiple goroutines.
|
||||||
|
|
||||||
|
func TestConnA(test *testing.T) {
|
||||||
|
payloads := []string {
|
||||||
|
"hello",
|
||||||
|
"world",
|
||||||
|
"When the impostor is sus!",
|
||||||
|
}
|
||||||
|
|
||||||
|
clientFunc := func(a Conn) {
|
||||||
|
test.Log("CLIENT accepting transaction")
|
||||||
|
trans, err := a.AcceptTrans()
|
||||||
|
if err != nil { test.Fatal("CLIENT", err) }
|
||||||
|
test.Log("CLIENT accepted transaction")
|
||||||
|
test.Cleanup(func() { trans.Close() })
|
||||||
|
for method, payload := range payloads {
|
||||||
|
test.Log("CLIENT waiting...")
|
||||||
|
gotMethod, gotPayloadBytes, err := trans.Receive()
|
||||||
|
if err != nil { test.Fatal("CLIENT", err) }
|
||||||
|
gotPayload := string(gotPayloadBytes)
|
||||||
|
test.Log("CLIENT m:", gotMethod, "p:", gotPayload)
|
||||||
|
if int(gotMethod) != method {
|
||||||
|
test.Errorf("CLIENT method not equal")
|
||||||
|
}
|
||||||
|
if gotPayload != payload {
|
||||||
|
test.Errorf("CLIENT payload not equal")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
test.Log("CLIENT waiting for transaction close...")
|
||||||
|
gotMethod, gotPayload, err := trans.Receive()
|
||||||
|
if !errors.Is(err, io.EOF) {
|
||||||
|
test.Error("CLIENT wrong error:", err)
|
||||||
|
test.Error("CLIENT method:", gotMethod)
|
||||||
|
test.Error("CLIENT payload:", gotPayload)
|
||||||
|
test.Fatal("CLIENT ok byeeeeeeeeeeeee")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
serverFunc := func(a Conn) {
|
||||||
|
trans, err := a.OpenTrans()
|
||||||
|
if err != nil { test.Error("SERVER", err); return }
|
||||||
|
test.Cleanup(func() { trans.Close() })
|
||||||
|
for method, payload := range payloads {
|
||||||
|
test.Log("SERVER m:", method, "p:", payload)
|
||||||
|
err := trans.Send(uint16(method), []byte(payload))
|
||||||
|
if err != nil { test.Error("SERVER", err); return }
|
||||||
|
}
|
||||||
|
test.Log("SERVER closing connection")
|
||||||
|
}
|
||||||
|
|
||||||
|
clientServerEnvironment(test, clientFunc, serverFunc)
|
||||||
|
}
|
||||||
|
|
||||||
|
func TestTransOpenCloseA(test *testing.T) {
|
||||||
|
// currently:
|
||||||
|
//
|
||||||
|
// | data sent | data recvd | close sent | close recvd
|
||||||
|
// 10 | X | X | X | server hangs
|
||||||
|
// 20 | X | X | X | client hangs
|
||||||
|
// 30 | X | | X |
|
||||||
|
//
|
||||||
|
// when a close message is recvd, it tries to push to the trans and
|
||||||
|
// hangs on trans.incoming.Send, which hangs on sending the value to the
|
||||||
|
// underlying channel. why is this?
|
||||||
|
//
|
||||||
|
// check if we are really getting values from the channel when pulling
|
||||||
|
// from the trans channel when we are expecting a close.
|
||||||
|
|
||||||
|
clientFunc := func(conn Conn) {
|
||||||
|
// 10
|
||||||
|
trans, err := conn.OpenTrans()
|
||||||
|
if err != nil { test.Error("CLIENT", err); return }
|
||||||
|
test.Log("CLIENT sending 10")
|
||||||
|
trans.Send(10, []byte("hi"))
|
||||||
|
trans.Close()
|
||||||
|
|
||||||
|
// 20
|
||||||
|
test.Log("CLIENT awaiting 20")
|
||||||
|
trans, err = conn.AcceptTrans()
|
||||||
|
if err != nil { test.Error("CLIENT", err); return }
|
||||||
|
test.Cleanup(func() { trans.Close() })
|
||||||
|
gotMethod, gotPayload, err := trans.Receive()
|
||||||
|
if err != nil { test.Error("CLIENT", err); return }
|
||||||
|
test.Logf("CLIENT m: %d p: %s", gotMethod, gotPayload)
|
||||||
|
if gotMethod != 20 { test.Error("CLIENT wrong method")}
|
||||||
|
|
||||||
|
// 30
|
||||||
|
trans, err = conn.OpenTrans()
|
||||||
|
if err != nil { test.Error("CLIENT", err); return }
|
||||||
|
test.Log("CLIENT sending 30")
|
||||||
|
trans.Send(30, []byte("good"))
|
||||||
|
trans.Close()
|
||||||
|
}
|
||||||
|
|
||||||
|
serverFunc := func(conn Conn) {
|
||||||
|
// 10
|
||||||
|
test.Log("SERVER awaiting 10")
|
||||||
|
trans, err := conn.AcceptTrans()
|
||||||
|
if err != nil { test.Error("SERVER", err); return }
|
||||||
|
test.Cleanup(func() { trans.Close() })
|
||||||
|
gotMethod, gotPayload, err := trans.Receive()
|
||||||
|
if err != nil { test.Error("SERVER", err); return }
|
||||||
|
test.Logf("SERVER m: %d p: %s", gotMethod, gotPayload)
|
||||||
|
if gotMethod != 10 { test.Error("SERVER wrong method")}
|
||||||
|
|
||||||
|
// 20
|
||||||
|
trans, err = conn.OpenTrans()
|
||||||
|
if err != nil { test.Error("SERVER", err); return }
|
||||||
|
test.Log("SERVER sending 20")
|
||||||
|
trans.Send(20, []byte("hi how r u"))
|
||||||
|
trans.Close()
|
||||||
|
|
||||||
|
// 30
|
||||||
|
test.Log("SERVER awaiting 30")
|
||||||
|
trans, err = conn.AcceptTrans()
|
||||||
|
if err != nil { test.Error("SERVER", err); return }
|
||||||
|
test.Cleanup(func() { trans.Close() })
|
||||||
|
gotMethod, gotPayload, err = trans.Receive()
|
||||||
|
if err != nil { test.Error("SERVER", err); return }
|
||||||
|
test.Logf("SERVER m: %d p: %s", gotMethod, gotPayload)
|
||||||
|
if gotMethod != 30 { test.Error("SERVER wrong method")}
|
||||||
|
}
|
||||||
|
|
||||||
|
clientServerEnvironment(test, clientFunc, serverFunc)
|
||||||
|
}
|
||||||
|
|
||||||
func TestEncodeMessageA(test *testing.T) {
|
func TestEncodeMessageA(test *testing.T) {
|
||||||
buffer := new(bytes.Buffer)
|
buffer := new(bytes.Buffer)
|
||||||
payload := []byte { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05 }
|
payload := []byte { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05 }
|
||||||
err := encodeMessageA(buffer, 0x5800FEABC3104F04, 0x6B12, payload)
|
err := encodeMessageA(buffer, defaultSizeLimit, 0x5800FEABC3104F04, 0x6B12, payload)
|
||||||
correct := []byte {
|
correct := []byte {
|
||||||
0x58, 0x00, 0xFE, 0xAB, 0xC3, 0x10, 0x4F, 0x04,
|
0x58, 0x00, 0xFE, 0xAB, 0xC3, 0x10, 0x4F, 0x04,
|
||||||
0x6B, 0x12,
|
0x6B, 0x12,
|
||||||
0x00, 0x06,
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06,
|
||||||
0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
|
0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
|
||||||
}
|
}
|
||||||
if err != nil {
|
if err != nil {
|
||||||
@@ -27,19 +163,19 @@ func TestEncodeMessageA(test *testing.T) {
|
|||||||
func TestEncodeMessageAErr(test *testing.T) {
|
func TestEncodeMessageAErr(test *testing.T) {
|
||||||
buffer := new(bytes.Buffer)
|
buffer := new(bytes.Buffer)
|
||||||
payload := make([]byte, 0x10000)
|
payload := make([]byte, 0x10000)
|
||||||
err := encodeMessageA(buffer, 0x5800FEABC3104F04, 0x6B12, payload)
|
err := encodeMessageA(buffer, 0x20, 0x5800FEABC3104F04, 0x6B12, payload)
|
||||||
if !errors.Is(err, ErrPayloadTooLarge) {
|
if !errors.Is(err, ErrPayloadTooLarge) {
|
||||||
test.Fatalf("wrong error: %v", err)
|
test.Fatalf("wrong error: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDecodeMessageA(test *testing.T) {
|
func TestDecodeMessageA(test *testing.T) {
|
||||||
transID, method, payload, err := decodeMessageA(bytes.NewReader([]byte {
|
transID, method, _, payload, err := decodeMessageA(bytes.NewReader([]byte {
|
||||||
0x58, 0x00, 0xFE, 0xAB, 0xC3, 0x10, 0x4F, 0x04,
|
0x58, 0x00, 0xFE, 0xAB, 0xC3, 0x10, 0x4F, 0x04,
|
||||||
0x6B, 0x12,
|
0x6B, 0x12,
|
||||||
0x00, 0x06,
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x06,
|
||||||
0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
|
0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
|
||||||
}))
|
}), defaultSizeLimit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
test.Fatal(err)
|
test.Fatal(err)
|
||||||
}
|
}
|
||||||
@@ -56,13 +192,76 @@ func TestDecodeMessageA(test *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestDecodeMessageAErr(test *testing.T) {
|
func TestDecodeMessageAErr(test *testing.T) {
|
||||||
_, _, _, err := decodeMessageA(bytes.NewReader([]byte {
|
_, _, _, _, err := decodeMessageA(bytes.NewReader([]byte {
|
||||||
0x58, 0x00, 0xFE, 0xAB, 0xC3, 0x10, 0x4F, 0x04,
|
0x58, 0x00, 0xFE, 0xAB, 0xC3, 0x10, 0x4F, 0x04,
|
||||||
0x6B, 0x12,
|
0x6B, 0x12,
|
||||||
0x01, 0x06,
|
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01, 0x06,
|
||||||
0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
|
0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
|
||||||
}))
|
}), defaultSizeLimit)
|
||||||
if !errors.Is(err, io.ErrUnexpectedEOF) {
|
if !errors.Is(err, io.ErrUnexpectedEOF) {
|
||||||
test.Fatalf("wrong error: %v", err)
|
test.Fatalf("wrong error: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func TestEncodeDecodeMessageA(test *testing.T) {
|
||||||
|
correctTransID := int64(2)
|
||||||
|
correctMethod := uint16(30)
|
||||||
|
correctPayload := []byte("good")
|
||||||
|
buffer := bytes.Buffer { }
|
||||||
|
err := encodeMessageA(&buffer, defaultSizeLimit, correctTransID, correctMethod, correctPayload)
|
||||||
|
if err != nil { test.Fatal(err) }
|
||||||
|
transID, method, chunked, payload, err := decodeMessageA(&buffer, defaultSizeLimit)
|
||||||
|
if got, correct := transID, int64(2); got != correct {
|
||||||
|
test.Fatalf("not equal: %v %v", got, correct)
|
||||||
|
}
|
||||||
|
if got, correct := method, uint16(30); got != correct {
|
||||||
|
test.Fatalf("not equal: %v %v", got, correct)
|
||||||
|
}
|
||||||
|
if chunked {
|
||||||
|
test.Fatalf("message should not be chunked")
|
||||||
|
}
|
||||||
|
if got, correct := payload, correctPayload; !slices.Equal(got, correct) {
|
||||||
|
test.Fatalf("not equal: %v %v", got, correct)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func clientServerEnvironment(test *testing.T, clientFunc func(conn Conn), serverFunc func(conn Conn)) {
|
||||||
|
network := "tcp"
|
||||||
|
addr := "localhost:7959"
|
||||||
|
|
||||||
|
// server
|
||||||
|
listener, err := net.Listen(network, addr)
|
||||||
|
if err != nil { test.Fatal(err) }
|
||||||
|
test.Cleanup(func() { listener.Close() })
|
||||||
|
go func() {
|
||||||
|
test.Log("SERVER listening")
|
||||||
|
conn, err := listener.Accept()
|
||||||
|
if err != nil { test.Error("SERVER", err); return }
|
||||||
|
defer conn.Close()
|
||||||
|
test.Cleanup(func() { conn.Close() })
|
||||||
|
a := AdaptA(conn, ServerSide)
|
||||||
|
test.Cleanup(func() { a.Close() })
|
||||||
|
|
||||||
|
serverFunc(a)
|
||||||
|
test.Log("SERVER closing")
|
||||||
|
}()
|
||||||
|
|
||||||
|
// client
|
||||||
|
test.Log("CLIENT dialing")
|
||||||
|
conn, err := net.Dial(network, addr)
|
||||||
|
if err != nil { test.Fatal("CLIENT", err) }
|
||||||
|
test.Log("CLIENT dialed")
|
||||||
|
a := AdaptA(conn, ClientSide)
|
||||||
|
test.Cleanup(func() { a.Close() })
|
||||||
|
|
||||||
|
clientFunc(a)
|
||||||
|
|
||||||
|
test.Log("CLIENT waiting for connection close...")
|
||||||
|
trans, err := a.AcceptTrans()
|
||||||
|
if !errors.Is(err, io.EOF) {
|
||||||
|
test.Error("CLIENT wrong error:", err)
|
||||||
|
test.Fatal("CLIENT trans:", trans)
|
||||||
|
}
|
||||||
|
test.Log("CLIENT DONE")
|
||||||
|
conn.Close()
|
||||||
|
}
|
||||||
|
|||||||
153
metadaptb.go
153
metadaptb.go
@@ -2,19 +2,23 @@ package hopp
|
|||||||
|
|
||||||
import "io"
|
import "io"
|
||||||
import "net"
|
import "net"
|
||||||
|
import "bytes"
|
||||||
|
import "errors"
|
||||||
import "context"
|
import "context"
|
||||||
import "git.tebibyte.media/sashakoshka/hopp/tape"
|
import "git.tebibyte.media/sashakoshka/hopp/tape"
|
||||||
|
|
||||||
// B implements METADAPT-B over a multiplexed stream-oriented transport such as
|
// B implements METADAPT-B over a multiplexed stream-oriented transport such as
|
||||||
// QUIC.
|
// QUIC.
|
||||||
type b struct {
|
type b struct {
|
||||||
|
sizeLimit int64
|
||||||
underlying MultiConn
|
underlying MultiConn
|
||||||
}
|
}
|
||||||
|
|
||||||
// AdaptB returns a connection implementing METADAPT-B over a singular stream-
|
// AdaptB returns a connection implementing METADAPT-B over a multiplexed
|
||||||
// oriented transport such as TCP or UNIX domain stream sockets.
|
// stream-oriented transport such as QUIC.
|
||||||
func AdaptB(underlying MultiConn) Conn {
|
func AdaptB(underlying MultiConn) Conn {
|
||||||
return &b {
|
return &b {
|
||||||
|
sizeLimit: defaultSizeLimit,
|
||||||
underlying: underlying,
|
underlying: underlying,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -34,33 +38,105 @@ func (this *b) RemoteAddr() net.Addr {
|
|||||||
func (this *b) OpenTrans() (Trans, error) {
|
func (this *b) OpenTrans() (Trans, error) {
|
||||||
stream, err := this.underlying.OpenStream()
|
stream, err := this.underlying.OpenStream()
|
||||||
if err != nil { return nil, err }
|
if err != nil { return nil, err }
|
||||||
return transB { underlying: stream }, nil
|
return this.newTrans(stream), nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *b) AcceptTrans() (Trans, error) {
|
func (this *b) AcceptTrans() (Trans, error) {
|
||||||
stream, err := this.underlying.AcceptStream(context.Background())
|
stream, err := this.underlying.AcceptStream(context.Background())
|
||||||
if err != nil { return nil, err }
|
if err != nil { return nil, err }
|
||||||
return transB { underlying: stream }, nil
|
return this.newTrans(stream), nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *b) SetSizeLimit(limit int64) {
|
||||||
|
this.sizeLimit = limit
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *b) newTrans(underlying Stream) *transB {
|
||||||
|
return &transB {
|
||||||
|
sizeLimit: this.sizeLimit,
|
||||||
|
underlying: underlying,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
type transB struct {
|
type transB struct {
|
||||||
underlying Stream
|
sizeLimit int64
|
||||||
|
underlying Stream
|
||||||
|
currentData io.Reader
|
||||||
|
currentWriter *writerB
|
||||||
}
|
}
|
||||||
|
|
||||||
func (trans transB) Close() error {
|
func (this *transB) Close() error {
|
||||||
return trans.underlying.Close()
|
return this.underlying.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (trans transB) ID() int64 {
|
func (this *transB) ID() int64 {
|
||||||
return trans.underlying.ID()
|
return this.underlying.ID()
|
||||||
}
|
}
|
||||||
|
|
||||||
func (trans transB) Send(method uint16, data []byte) error {
|
func (this *transB) Send(method uint16, data []byte) error {
|
||||||
return encodeMessageB(trans.underlying, method, data)
|
return encodeMessageB(this.underlying, this.sizeLimit, method, data)
|
||||||
}
|
}
|
||||||
|
|
||||||
func (trans transB) Receive() (uint16, []byte, error) {
|
func (this *transB) SendWriter(method uint16) (io.WriteCloser, error) {
|
||||||
return decodeMessageB(trans.underlying)
|
if this.currentWriter != nil {
|
||||||
|
this.currentWriter.Close()
|
||||||
|
}
|
||||||
|
// TODO: come up with a fix that allows us to pipe data through the
|
||||||
|
// writer. as of now, it just reads whatever is written into a buffer
|
||||||
|
// and sends the message on close. we should probably introduce chunked
|
||||||
|
// encoding to METADAPT-B to fix this. the implementation would be
|
||||||
|
// simpler than on METADAPT-A, but most of the code could just be
|
||||||
|
// copied over.
|
||||||
|
writer := &writerB {
|
||||||
|
parent: this,
|
||||||
|
method: method,
|
||||||
|
}
|
||||||
|
this.currentWriter = writer
|
||||||
|
return writer, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *transB) Receive() (uint16, []byte, error) {
|
||||||
|
// get a reader for the next message
|
||||||
|
method, size, data, err := this.receiveReader()
|
||||||
|
if err != nil { return 0, nil, err }
|
||||||
|
// read the entire thing
|
||||||
|
payloadBuffer := make([]byte, int(size))
|
||||||
|
_, err = io.ReadFull(data, payloadBuffer)
|
||||||
|
if err != nil { return 0, nil, err }
|
||||||
|
// we have used up the reader by now so we can forget it exists
|
||||||
|
this.currentData = nil
|
||||||
|
return method, payloadBuffer, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *transB) ReceiveReader() (uint16, io.Reader, error) {
|
||||||
|
method, _, data, err := this.receiveReader()
|
||||||
|
return method, data, err
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *transB) receiveReader() (uint16, int64, io.Reader, error) {
|
||||||
|
// decode the message
|
||||||
|
method, size, data, err := decodeMessageB(this.underlying, this.sizeLimit)
|
||||||
|
if err != nil { return 0, 0, nil, err }
|
||||||
|
// discard current reader if there is one
|
||||||
|
if this.currentData == nil {
|
||||||
|
io.Copy(io.Discard, this.currentData)
|
||||||
|
}
|
||||||
|
this.currentData = data
|
||||||
|
return method, size, data, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
type writerB struct {
|
||||||
|
parent *transB
|
||||||
|
buffer bytes.Buffer
|
||||||
|
method uint16
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *writerB) Write(data []byte) (int, error) {
|
||||||
|
return this.buffer.Write(data)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (this *writerB) Close() error {
|
||||||
|
return this.parent.Send(this.method, this.buffer.Bytes())
|
||||||
}
|
}
|
||||||
|
|
||||||
// MultiConn represens a multiplexed stream-oriented transport for use in
|
// MultiConn represens a multiplexed stream-oriented transport for use in
|
||||||
@@ -84,27 +160,42 @@ type Stream interface {
|
|||||||
ID() int64
|
ID() int64
|
||||||
}
|
}
|
||||||
|
|
||||||
func encodeMessageB(writer io.Writer, method uint16, data []byte) error {
|
func encodeMessageB(writer io.Writer, sizeLimit int64, method uint16, data []byte) error {
|
||||||
buffer := make([]byte, 4 + len(data))
|
if int64(len(data)) > sizeLimit {
|
||||||
|
return ErrPayloadTooLarge
|
||||||
|
}
|
||||||
|
buffer := make([]byte, 10 + len(data))
|
||||||
tape.EncodeI16(buffer[:2], method)
|
tape.EncodeI16(buffer[:2], method)
|
||||||
length, ok := tape.U16CastSafe(len(data))
|
tape.EncodeI64(buffer[2:10], uint64(len(data)))
|
||||||
if !ok { return ErrPayloadTooLarge }
|
copy(buffer[10:], data)
|
||||||
tape.EncodeI16(buffer[2:4], length)
|
|
||||||
copy(buffer[4:], data)
|
|
||||||
_, err := writer.Write(buffer)
|
_, err := writer.Write(buffer)
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
func decodeMessageB(reader io.Reader) (uint16, []byte, error) {
|
func decodeMessageB(
|
||||||
headerBuffer := [4]byte { }
|
reader io.Reader,
|
||||||
_, err := io.ReadFull(reader, headerBuffer[:])
|
sizeLimit int64,
|
||||||
if err != nil { return 0, nil, err }
|
) (
|
||||||
method, err := tape.DecodeI16[uint16](headerBuffer[:2])
|
method uint16,
|
||||||
if err != nil { return 0, nil, err }
|
size int64,
|
||||||
length, err := tape.DecodeI16[uint16](headerBuffer[2:4])
|
data io.Reader,
|
||||||
if err != nil { return 0, nil, err }
|
err error,
|
||||||
payloadBuffer := make([]byte, int(length))
|
) {
|
||||||
_, err = io.ReadFull(reader, payloadBuffer)
|
headerBuffer := [10]byte { }
|
||||||
if err != nil { return 0, nil, err }
|
_, err = io.ReadFull(reader, headerBuffer[:])
|
||||||
return method, payloadBuffer, nil
|
if err != nil {
|
||||||
|
if errors.Is(err, io.EOF) { return 0, 0, nil, io.ErrUnexpectedEOF }
|
||||||
|
return 0, 0, nil, err
|
||||||
|
}
|
||||||
|
method, err = tape.DecodeI16[uint16](headerBuffer[:2])
|
||||||
|
if err != nil { return 0, 0, nil, err }
|
||||||
|
length, err := tape.DecodeI64[uint64](headerBuffer[2:10])
|
||||||
|
if err != nil { return 0, 0, nil, err }
|
||||||
|
if length > uint64(sizeLimit) {
|
||||||
|
return 0, 0, nil, ErrPayloadTooLarge
|
||||||
|
}
|
||||||
|
return method, int64(length), &io.LimitedReader {
|
||||||
|
R: reader,
|
||||||
|
N: int64(length),
|
||||||
|
}, nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -9,9 +9,9 @@ import "testing"
|
|||||||
func TestEncodeMessageB(test *testing.T) {
|
func TestEncodeMessageB(test *testing.T) {
|
||||||
buffer := new(bytes.Buffer)
|
buffer := new(bytes.Buffer)
|
||||||
payload := []byte { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05 }
|
payload := []byte { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05 }
|
||||||
err := encodeMessageB(buffer, 0x6B12, payload)
|
err := encodeMessageB(buffer, defaultSizeLimit, 0x6B12, payload)
|
||||||
correct := []byte {
|
correct := []byte {
|
||||||
0x6B, 0x12,
|
0x6B, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
0x00, 0x06,
|
0x00, 0x06,
|
||||||
0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
|
0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
|
||||||
}
|
}
|
||||||
@@ -26,24 +26,25 @@ func TestEncodeMessageB(test *testing.T) {
|
|||||||
func TestEncodeMessageBErr(test *testing.T) {
|
func TestEncodeMessageBErr(test *testing.T) {
|
||||||
buffer := new(bytes.Buffer)
|
buffer := new(bytes.Buffer)
|
||||||
payload := make([]byte, 0x10000)
|
payload := make([]byte, 0x10000)
|
||||||
err := encodeMessageB(buffer, 0x6B12, payload)
|
err := encodeMessageB(buffer, 255, 0x6B12, payload)
|
||||||
if !errors.Is(err, ErrPayloadTooLarge) {
|
if !errors.Is(err, ErrPayloadTooLarge) {
|
||||||
test.Fatalf("wrong error: %v", err)
|
test.Fatalf("wrong error: %v", err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
func TestDecodeMessageB(test *testing.T) {
|
func TestDecodeMessageB(test *testing.T) {
|
||||||
method, payload, err := decodeMessageB(bytes.NewReader([]byte {
|
method, _, data, err := decodeMessageB(bytes.NewReader([]byte {
|
||||||
0x6B, 0x12,
|
0x6B, 0x12, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
|
||||||
0x00, 0x06,
|
0x00, 0x06,
|
||||||
0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
|
0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
|
||||||
}))
|
}), defaultSizeLimit)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
test.Fatal(err)
|
test.Fatal(err)
|
||||||
}
|
}
|
||||||
if got, correct := method, uint16(0x6B12); got != correct {
|
if got, correct := method, uint16(0x6B12); got != correct {
|
||||||
test.Fatalf("not equal: %v %v", got, correct)
|
test.Fatalf("not equal: %v %v", got, correct)
|
||||||
}
|
}
|
||||||
|
payload, _ := io.ReadAll(data)
|
||||||
correctPayload := []byte { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05 }
|
correctPayload := []byte { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05 }
|
||||||
if got, correct := payload, correctPayload; !slices.Equal(got, correct) {
|
if got, correct := payload, correctPayload; !slices.Equal(got, correct) {
|
||||||
test.Fatalf("not equal: %v %v", got, correct)
|
test.Fatalf("not equal: %v %v", got, correct)
|
||||||
@@ -51,11 +52,9 @@ func TestDecodeMessageB(test *testing.T) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func TestDecodeMessageBErr(test *testing.T) {
|
func TestDecodeMessageBErr(test *testing.T) {
|
||||||
_, _, err := decodeMessageB(bytes.NewReader([]byte {
|
_, _, _, err := decodeMessageB(bytes.NewReader([]byte {
|
||||||
0x6B, 0x12,
|
0x6B, 0x12, 0x00, 0x00, 0x00, 0x00,
|
||||||
0x01, 0x06,
|
}), defaultSizeLimit)
|
||||||
0x00, 0x01, 0x02, 0x03, 0x04, 0x05,
|
|
||||||
}))
|
|
||||||
if !errors.Is(err, io.ErrUnexpectedEOF) {
|
if !errors.Is(err, io.ErrUnexpectedEOF) {
|
||||||
test.Fatalf("wrong error: %v", err)
|
test.Fatalf("wrong error: %v", err)
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user