42 Commits

Author SHA1 Message Date
874ae2e011 Update Trans docs with new concurrency rules 2025-11-19 20:56:16 -05:00
c5073e5f20 METADAPT-A writing is goroutine safe 2025-11-19 20:55:57 -05:00
cdda4f932d Remove stale comment 2025-11-19 20:52:33 -05:00
8c564e4755 Use a buffer pool for METADAPT-A 2025-11-19 20:50:11 -05:00
9651fed635 Use bytes.Buffer instead of a slice for METADAPT-A writer 2025-11-19 20:36:10 -05:00
d6dc6d6e78 examples/chat: Re-generate 2025-11-19 19:45:16 -05:00
52b4c44db3 examples: Add go:generate directives to generate protcol.go files 2025-11-19 19:43:42 -05:00
11e972c696 examples/ping: Client accepts pong 2025-11-19 19:34:02 -05:00
1cf9d47cae Add a diagram of a METADAPT-A MMB 2025-11-19 17:13:38 -05:00
dfbb087333 examples: Regenerate protocol files 2025-11-19 17:08:05 -05:00
647619a7f6 generate: It is no longer possible to make impossible type asserts with Receive 2025-11-19 16:25:31 -05:00
3b5a498aa5 Do not send extraneous chained MMBs 2025-11-19 15:16:44 -05:00
7d189df741 Test two consecutive METADAPT-A writers 2025-11-19 14:57:46 -05:00
5341563668 Test that METADAPT-A can tx/rx different messages consecutively 2025-11-19 14:42:29 -05:00
d2187cb953 examples/ping: Regenerate protocol.go 2025-11-19 13:14:25 -05:00
0ac34b2f22 generate: Don't create a new decoder for a possibly nil reader 2025-11-19 13:13:40 -05:00
3136dcbfdf internal/testutil: Add ConnRecorder which records net.Conn writes 2025-11-19 13:10:26 -05:00
ad930144cf generate: Test send, receive functions 2025-11-17 16:50:43 -05:00
5e965def7c internal/mock: Add mock transaction implementation 2025-11-17 16:49:09 -05:00
8add67c5de Test using readers and writers with a METADAPT-A connection 2025-11-16 16:01:06 -05:00
5f503021bf examples: More examples 2025-11-16 16:00:31 -05:00
0727609067 METADAPT-A message writer returns an error when the transaction closes 2025-11-10 00:24:20 -05:00
968e145cda examples/ping/client: Fix client example 2025-11-08 19:50:22 -05:00
bb14ec20a7 examples/ping: Add a ping example 2025-10-30 00:02:13 -04:00
a00e9d3183 Remove extra error wrapping 2025-10-30 00:01:55 -04:00
4f4443069d generate: Use the new snake in a test to make it able to pass 2025-10-29 15:24:49 -04:00
10d84c2184 generate: Add support for recursive snake in test infrastructure 2025-10-29 15:19:27 -04:00
67881a455a internal/testutil/snake: Test recursiveness 2025-10-29 15:12:03 -04:00
fb374c5cd5 internal/testutil/snake: Make a more advanced (recursive) snake package 2025-10-29 15:06:23 -04:00
1b43b92687 Remove outdated comment 2025-10-27 22:51:52 -04:00
932e076113 examples/chat: Various fixes 2025-10-27 22:50:54 -04:00
5217f65cb8 generate: Surpress warning about this print statement 2025-10-27 22:39:47 -04:00
26b8174f92 Fix METADAPT-A not ever closing the connection properly 2025-10-27 22:32:47 -04:00
3daa66c4bc METADAPT-A tests compile 2025-10-27 18:45:26 -04:00
c5154b3d85 examples/chat: Regenerate protocol.go 2025-10-20 21:22:10 -04:00
c2ce95021c generate: Fix generated Send, Receive functions 2025-10-20 21:19:50 -04:00
d4ccdb282e Fix CCB and flushing in METADAPT-A 2025-10-20 21:19:21 -04:00
2e4c693174 examples/chat: Use new listen/dial API 2025-10-20 18:00:05 -04:00
c9480ba016 Translate "tls" network to "tcp" 2025-10-20 17:56:42 -04:00
09b2259a8c Simplify how dialing and listening works
Also add support for bare TCP
2025-10-20 16:44:07 -04:00
da01a0d119 internal/connshark: Add utility to log activity over a net.Conn 2025-10-19 17:48:13 -04:00
c326a2b6b9 examples/chat/server: Close connection goroutine on error 2025-10-19 17:47:31 -04:00
28 changed files with 1885 additions and 218 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.4 KiB

View File

@@ -34,8 +34,8 @@ type Conn interface {
}
// Trans is a HOPP transaction. Methods of this interface are not safe for
// concurrent use with the exception of the Close and ID methods. The
// recommended use case is one goroutine per transaction.
// concurrent use with the exception of the Write, SendWriter, Close, and ID
// methods. The recommended use case is one goroutine per transaction.
type Trans interface {
// Close closes the transaction. Any blocked operations will be
// unblocked and return errors. This method is safe for concurrent use.
@@ -45,13 +45,13 @@ type Trans interface {
// unique within the connection. This method is safe for concurrent use.
ID() int64
// Send sends a message. This method is not safe for concurrent use.
// Send sends a message. This method is safe for concurrent use.
Send(method uint16, data []byte) error
// SendWriter sends data written to an [io.Writer]. The writer must be
// closed after use. Closing the writer flushes any data that hasn't
// been written yet. Any writer previously opened through this function
// will be discarded. This method is not safe for concurrent use, and
// neither is its result.
// will be discarded. This method is safe for concurrent use, but its
// result isn't.
SendWriter(method uint16) (io.WriteCloser, error)
// Receive receives a message. This method is not safe for concurrent
// use.

View File

@@ -132,6 +132,8 @@ METADAPT-B is used over QUIC for communication over networks such as the
Internet.
### METADAPT-A
![Diagram of a METADAPT-A MMB.](../assets/metadapt-a-mmb-diagram.png)
METADAPT-A requires a transport which offers a single full-duplex data stream
that persists for the duration of the connection. All transactions are
multiplexed onto this single stream. Each MMB contains a 12-octet long header,

62
dial.go
View File

@@ -5,53 +5,59 @@ import "errors"
import "context"
import "crypto/tls"
// Dial opens a connection to a server. The network must be one of "quic",
// "quic4", (IPv4-only) "quic6" (IPv6-only), or "unix". For now, "quic4" and
// "quic6" don't do anything as the quic-go package doesn't seem to support this
// behavior.
func Dial(ctx context.Context, network, address string) (Conn, error) {
return (Dialer { }).Dial(ctx, network, address)
}
// Dialer allows for further configuration of the dialing process.
type Dialer struct {
TLSConfig *tls.Config
}
// Dial opens a connection to a server. The network must be one of:
//
// - "quic"
// - "quic4" (IPv4-only)
// - "quic6" (IPv6-only)
// - "tls"
// - "tls4" (IPv4-only)
// - "tls6" (IPv6-only)
// - "tcp"
// - "tcp4" (IPv4-only)
// - "tcp6" (IPv6-only)
// - "unix"
//
// For now, QUIC is unsupported.
func (diale Dialer) Dial(ctx context.Context, network, address string) (Conn, error) {
func Dial(ctx context.Context, network, address string, tlsConf *tls.Config) (Conn, error) {
switch network {
case "quic", "quic4", "quic6": return diale.dialQUIC(ctx, network, address)
case "tcp", "tcp4", "tcp6": return diale.dialTLS(ctx, network, address)
case "unix": return diale.dialUnix(ctx, network, address)
case "quic", "quic4", "quic6": return DialQUIC(ctx, network, address, tlsConf)
case "tls", "tls4", "tls6": return DialTLS(ctx, network, address, tlsConf)
case "tcp", "tcp4", "tcp6":
addr, err := net.ResolveTCPAddr(network, address)
if err != nil { return nil, err }
return DialTCP(ctx, network, nil, addr)
case "unix":
addr, err := net.ResolveUnixAddr(network, address)
if err != nil { return nil, err }
return DialUnix(ctx, network, addr)
default: return nil, ErrUnknownNetwork
}
}
func (diale Dialer) dialQUIC(ctx context.Context, network, address string) (Conn, error) {
// DialQUIC opens a connection to a server over QUIC.
func DialQUIC(ctx context.Context, network, address string, tlsConf *tls.Config) (Conn, error) {
return nil, errors.New("quic is not yet implemented")
}
func (diale Dialer) dialTLS(ctx context.Context, network, address string) (Conn, error) {
conn, err := tls.Dial(network, address, diale.TLSConfig)
// DialTLS opens a connection to a server over TLS.
func DialTLS(ctx context.Context, network, address string, tlsConf *tls.Config) (Conn, error) {
network, err := tlsNetworkToTCPNetwork(network)
if err != nil { return nil, err }
conn, err := tls.Dial(network, address, tlsConf)
if err != nil { return nil, err }
return AdaptA(conn, ClientSide), nil
}
func (diale Dialer) dialUnix(ctx context.Context, network, address string) (Conn, error) {
if network != "unix" { return nil, ErrUnknownNetwork }
addr, err := net.ResolveUnixAddr(network, address)
// DialTCP opens a connection to a server over TCP.
func DialTCP(ctx context.Context, network string, laddr, raddr *net.TCPAddr) (Conn, error) {
conn, err := net.DialTCP(network, laddr, raddr)
if err != nil { return nil, err }
return AdaptA(conn, ClientSide), nil
}
// DialUnix opens a connection to a server over a Unix domain socket.
func DialUnix(ctx context.Context, network string, addr *net.UnixAddr) (Conn, error) {
conn, err := net.DialUnix(network, nil, addr)
if err != nil { return nil, err }
return AdaptA(conn, ClientSide), nil
@@ -69,7 +75,6 @@ func tlsConfig(conf *tls.Config) *tls.Config {
return conf
}
func quicNetworkToUDPNetwork(network string) (string, error) {
switch network {
case "quic4": return "udp4", nil
@@ -78,3 +83,12 @@ func quicNetworkToUDPNetwork(network string) (string, error) {
default: return "", ErrUnknownNetwork
}
}
func tlsNetworkToTCPNetwork(network string) (string, error) {
switch network {
case "tls4": return "tcp4", nil
case "tls6": return "tcp6", nil
case "tls": return "tcp", nil
default: return "", ErrUnknownNetwork
}
}

86
examples/1proc/main.go Normal file
View File

@@ -0,0 +1,86 @@
package main
import "io"
import "log"
import "time"
import "errors"
import "context"
import "git.tebibyte.media/sashakoshka/hopp"
var network = "tcp"
var addr = "localhost:7959"
func main() {
go func() {
defer log.Println("SERVER closing")
listener, err := hopp.Listen(network, addr, nil)
if err != nil { log.Println("SERVER", err); return }
log.Println("SERVER listening")
conn, err := listener.Accept()
if err != nil { log.Println("SERVER", err); return }
defer conn.Close()
trans, err := conn.AcceptTrans()
if err != nil { log.Println("SERVER", err); return }
defer trans.Close()
for {
method, data, err := trans.Receive()
if err != nil { log.Println("SERVER", err); return }
log.Println("SERVER got", method, data)
log.Println("SERVER send", method, data)
err = trans.Send(1, data[:])
if err != nil { log.Println("SERVER", err); return }
}
}()
time.Sleep(time.Second * 2)
func() {
log.Println("CLIENT dialing")
conn, err := hopp.Dial(context.Background(), network, addr, nil)
if err != nil { log.Fatalln("CLIENT", err) }
log.Println("CLIENT dialed")
trans, err := conn.OpenTrans()
if err != nil {
log.Println("CLIENT", err)
return
}
go func() {
for {
method, data, err := trans.Receive()
if err != nil {
if !errors.Is(err, io.EOF) {
log.Printf("CLIENT failed to receive message: %v", err)
}
return
}
log.Println("CLIENT got", method, data)
}
}()
data := [1]byte { }
for {
log.Println("CLIENT send", 1, data)
err := trans.Send(1, data[:])
if err != nil {
log.Println("CLIENT", err)
return
}
data[0] ++
time.Sleep(time.Second)
}
log.Println("CLIENT waiting for connection close...")
trans, err = conn.AcceptTrans()
if !errors.Is(err, io.EOF) {
log.Println("CLIENT wrong error:", err)
log.Fatalln("CLIENT trans:", trans)
}
log.Println("CLIENT DONE")
conn.Close()
}()
}

View File

@@ -1,9 +1,11 @@
package main
import "os"
import "io"
import "fmt"
import "time"
import "bufio"
import "errors"
import "context"
import "crypto/tls"
import "git.tebibyte.media/sashakoshka/hopp"
@@ -22,6 +24,7 @@ func main() {
}
trans, err := join(address, room, nickname)
handleErr(1, err)
fmt.Fprintf(os.Stdout, "(i) connected to %s/%s\n", address, room)
go func() {
reader := bufio.NewReader(os.Stdin)
for {
@@ -32,28 +35,31 @@ func main() {
}()
for {
message, _, err := chat.Receive(trans)
handleErr(1, err)
if err != nil {
if !errors.Is(err, io.EOF) {
handleErr(1, err)
}
break
}
switch message := message.(type) {
case *chat.MessageChat:
case chat.MessageChat:
fmt.Fprintf(os.Stdout, "%s: %s\n", message.Nickname, message.Content)
case *chat.MessageJoinNotify:
case chat.MessageJoinNotify:
fmt.Fprintf(os.Stdout, "(i) %s joined the room\n", message.Nickname)
case *chat.MessageLeaveNotify:
case chat.MessageLeaveNotify:
fmt.Fprintf(os.Stdout, "(i) %s left the room\n", message.Nickname)
}
}
fmt.Fprintf(os.Stdout, "(i) disconnected\n")
}
func join(address string, room string, nickname string) (hopp.Trans, error) {
ctx, done := context.WithTimeout(context.Background(), 16 * time.Second)
defer done()
dialer := hopp.Dialer {
TLSConfig: &tls.Config {
// don't actually do this in real life
InsecureSkipVerify: true,
},
}
conn, err := dialer.Dial(ctx, "tcp", address)
conn, err := hopp.Dial(ctx, "tls", address, &tls.Config {
// don't actually do this in real life
InsecureSkipVerify: true,
})
if err != nil { return nil, err }
transRoom, err := conn.OpenTrans()

View File

@@ -1,6 +1,6 @@
// Package chat implements a simple chat protocol over HOPP. To re-generate the
// source files, run this command from within the root directory of the
// repository:
//
// go run ./cmd/hopp-generate examples/chat/protocol.pdl -o examples/chat/protocol/protocol.go
// Package chat demonstrates a simple chat protocol.
package chat
// To use this in your own project, replace "go run ../../cmd/hopp-generate"
// with just "hopp-generate".
//go:generate go run ../../cmd/hopp-generate protocol.pdl -o protocol.go

View File

@@ -5,6 +5,7 @@ package chat
// Please edit that file instead, and re-compile it to this location.
// HOPP, TAPE, METADAPT, PDL/0 (c) 2025 holanet.xyz
import "fmt"
import "git.tebibyte.media/sashakoshka/hopp"
import "git.tebibyte.media/sashakoshka/hopp/tape"
@@ -26,7 +27,9 @@ func Send(trans hopp.Trans, message Message) (n int, err error) {
if err != nil { return n, err }
defer writer.Close()
encoder := tape.NewEncoder(writer)
return message.Encode(encoder)
n, err = message.Encode(encoder)
if err != nil { return n, err }
return n, encoder.Flush()
}
// canAssign determines if data from the given source tag can be assigned to
@@ -54,6 +57,25 @@ func boolInt(input bool) int {
// ensure ucontainer is always imported
var _ hopp.Option[int]
// ReceivedMessage is a sealed interface representing the value of a
// message in this package. To determine what kind of message it is,
// use a type switch like this:
//
// switch message := message.(type) {
// case MessageError:
// doSomething()
// case MessageSuccess:
// doSomething()
// case MessageJoin:
// doSomething()
//
// ...
//
// }
type ReceivedMessage interface {
isReceivedMessage()
}
// Error is sent by a party when the other party has done something erroneous. The
// valid error codes are:
//
@@ -81,29 +103,29 @@ func(this *MessageError) Encode(encoder *tape.Encoder) (n int, err error) {
nn, err = encoder.WriteUintN(2, tag_1.CN() + 1)
n += nn; if err != nil { return n, err }
{
nn, err = encoder.WriteUint16(0x0000)
n += nn; if err != nil { return n, err }
tag_2 := tape.LI.WithCN(1)
nn, err = encoder.WriteUint8(uint8(tag_2))
n += nn; if err != nil { return n, err }
nn, err = encoder.WriteUint16(uint16((*this).Code))
n += nn; if err != nil { return n, err }
if value, ok := (*this).Description.Value(); ok {
nn, err = encoder.WriteUint16(0x0001)
n += nn; if err != nil { return n, err }
tag_3 := tape.StringTag(string(value))
nn, err = encoder.WriteUint8(uint8(tag_3))
tag_2 := tape.StringTag(string(value))
nn, err = encoder.WriteUint8(uint8(tag_2))
n += nn; if err != nil { return n, err }
if len(value) > tape.MaxStructureLength {
return n, tape.ErrTooLong
}
if tag_3.Is(tape.LBA) {
nn, err = encoder.WriteUintN(uint64(len(value)), tag_3.CN())
if tag_2.Is(tape.LBA) {
nn, err = encoder.WriteUintN(uint64(len(value)), tag_2.CN())
n += nn; if err != nil { return n, err }
}
nn, err = encoder.Write([]byte(value))
n += nn; if err != nil { return n, err }
}
nn, err = encoder.WriteUint16(0x0000)
n += nn; if err != nil { return n, err }
tag_3 := tape.LI.WithCN(1)
nn, err = encoder.WriteUint8(uint8(tag_3))
n += nn; if err != nil { return n, err }
nn, err = encoder.WriteUint16(uint16((*this).Code))
n += nn; if err != nil { return n, err }
}
return n, nil
}
@@ -121,6 +143,7 @@ func(this *MessageError) Decode(decoder *tape.Decoder) (n int, err error) {
n += nn; if err != nil { return n, err }
return n, nil
}
func (this MessageError) isReceivedMessage() { }
// Success is sent by a party when it has successfully completed a task given to it
// by the other party. The sending party must immediately close the transaction
@@ -159,6 +182,7 @@ func(this *MessageSuccess) Decode(decoder *tape.Decoder) (n int, err error) {
n += nn; if err != nil { return n, err }
return n, nil
}
func (this MessageSuccess) isReceivedMessage() { }
// Join is sent by the client when it wishes to join a room. It must begin a new
// transaction, and that transaction will persist while the user is in that room.
@@ -228,6 +252,7 @@ func(this *MessageJoin) Decode(decoder *tape.Decoder) (n int, err error) {
n += nn; if err != nil { return n, err }
return n, nil
}
func (this MessageJoin) isReceivedMessage() { }
// Chat is sent by the client when it wishes to post a message to the room. It is
// also relayed by the server to other clients to notify them of the message. It
@@ -251,34 +276,34 @@ func(this *MessageChat) Encode(encoder *tape.Encoder) (n int, err error) {
nn, err = encoder.WriteUintN(2, tag_8.CN() + 1)
n += nn; if err != nil { return n, err }
{
nn, err = encoder.WriteUint16(0x0001)
n += nn; if err != nil { return n, err }
tag_9 := tape.StringTag(string((*this).Nickname))
nn, err = encoder.WriteUint8(uint8(tag_9))
n += nn; if err != nil { return n, err }
if len((*this).Nickname) > tape.MaxStructureLength {
return n, tape.ErrTooLong
}
if tag_9.Is(tape.LBA) {
nn, err = encoder.WriteUintN(uint64(len((*this).Nickname)), tag_9.CN())
n += nn; if err != nil { return n, err }
}
nn, err = encoder.Write([]byte((*this).Nickname))
n += nn; if err != nil { return n, err }
nn, err = encoder.WriteUint16(0x0000)
n += nn; if err != nil { return n, err }
tag_10 := tape.StringTag(string((*this).Content))
nn, err = encoder.WriteUint8(uint8(tag_10))
tag_9 := tape.StringTag(string((*this).Content))
nn, err = encoder.WriteUint8(uint8(tag_9))
n += nn; if err != nil { return n, err }
if len((*this).Content) > tape.MaxStructureLength {
return n, tape.ErrTooLong
}
if tag_10.Is(tape.LBA) {
nn, err = encoder.WriteUintN(uint64(len((*this).Content)), tag_10.CN())
if tag_9.Is(tape.LBA) {
nn, err = encoder.WriteUintN(uint64(len((*this).Content)), tag_9.CN())
n += nn; if err != nil { return n, err }
}
nn, err = encoder.Write([]byte((*this).Content))
n += nn; if err != nil { return n, err }
nn, err = encoder.WriteUint16(0x0001)
n += nn; if err != nil { return n, err }
tag_10 := tape.StringTag(string((*this).Nickname))
nn, err = encoder.WriteUint8(uint8(tag_10))
n += nn; if err != nil { return n, err }
if len((*this).Nickname) > tape.MaxStructureLength {
return n, tape.ErrTooLong
}
if tag_10.Is(tape.LBA) {
nn, err = encoder.WriteUintN(uint64(len((*this).Nickname)), tag_10.CN())
n += nn; if err != nil { return n, err }
}
nn, err = encoder.Write([]byte((*this).Nickname))
n += nn; if err != nil { return n, err }
}
return n, nil
}
@@ -296,6 +321,7 @@ func(this *MessageChat) Decode(decoder *tape.Decoder) (n int, err error) {
n += nn; if err != nil { return n, err }
return n, nil
}
func (this MessageChat) isReceivedMessage() { }
// JoinNotify is sent by the server when another client joins the room. It must be
// sent within a room transaction.
@@ -348,6 +374,7 @@ func(this *MessageJoinNotify) Decode(decoder *tape.Decoder) (n int, err error) {
n += nn; if err != nil { return n, err }
return n, nil
}
func (this MessageJoinNotify) isReceivedMessage() { }
// LeaveNotify is sent by the server when another client leaves the room. It must
// be sent within a room transaction.
@@ -400,6 +427,7 @@ func(this *MessageLeaveNotify) Decode(decoder *tape.Decoder) (n int, err error)
n += nn; if err != nil { return n, err }
return n, nil
}
func (this MessageLeaveNotify) isReceivedMessage() { }
func decodeBranch_1d505103df99c95e6bed0800d0ea881a_MessageError(this *MessageError, decoder *tape.Decoder, tag tape.Tag) (n int, err error) {
var nn int
@@ -690,41 +718,41 @@ func decodeBranch_68c536511e6d598462efc482144438e9_MessageLeaveNotify(this *Mess
// Receive decodes a message from a transaction and returns it as a value.
// Use a type switch to determine what type of message it is.
func Receive(trans hopp.Trans) (message any, n int, err error) {
func Receive(trans hopp.Trans) (message ReceivedMessage, n int, err error) {
method, reader, err := trans.ReceiveReader()
decoder := tape.NewDecoder(reader)
if err != nil { return nil, n, err }
decoder := tape.NewDecoder(reader)
switch method {
case 0001:
var message MessageSuccess
nn, err := message.Decode(decoder)
n += nn; if err != nil { return nil, n, err }
return message, n, nil
case 0200:
var message MessageJoin
nn, err := message.Decode(decoder)
n += nn; if err != nil { return nil, n, err }
return message, n, nil
case 0300:
var message MessageChat
nn, err := message.Decode(decoder)
n += nn; if err != nil { return nil, n, err }
return message, n, nil
case 0400:
var message MessageJoinNotify
nn, err := message.Decode(decoder)
n += nn; if err != nil { return nil, n, err }
return message, n, nil
case 0401:
var message MessageLeaveNotify
nn, err := message.Decode(decoder)
n += nn; if err != nil { return nil, n, err }
return message, n, nil
case 0000:
case 0x0000:
var message MessageError
nn, err := message.Decode(decoder)
n += nn; if err != nil { return nil, n, err }
return message, n, nil
case 0x0001:
var message MessageSuccess
nn, err := message.Decode(decoder)
n += nn; if err != nil { return nil, n, err }
return message, n, nil
case 0x0200:
var message MessageJoin
nn, err := message.Decode(decoder)
n += nn; if err != nil { return nil, n, err }
return message, n, nil
case 0x0300:
var message MessageChat
nn, err := message.Decode(decoder)
n += nn; if err != nil { return nil, n, err }
return message, n, nil
case 0x0400:
var message MessageJoinNotify
nn, err := message.Decode(decoder)
n += nn; if err != nil { return nil, n, err }
return message, n, nil
case 0x0401:
var message MessageLeaveNotify
nn, err := message.Decode(decoder)
n += nn; if err != nil { return nil, n, err }
return message, n, nil
}
return nil, n, hopp.ErrUnknownMethod
return nil, n, fmt.Errorf("%w: M%04X", hopp.ErrUnknownMethod, method)
}

View File

@@ -1,6 +1,7 @@
package main
import "os"
import "io"
import "fmt"
import "log"
import "errors"
@@ -28,7 +29,7 @@ func main() {
func host(address string, certPath, keyPath string) error {
keyPair, err := tls.LoadX509KeyPair(certPath, keyPath)
if err != nil { return err }
listener, err := hopp.ListenTLS("tcp", address, &tls.Config {
listener, err := hopp.Listen("tls", address, &tls.Config {
InsecureSkipVerify: true,
Certificates: []tls.Certificate { keyPair },
})
@@ -48,7 +49,7 @@ func host(address string, certPath, keyPath string) error {
type client struct {
conn hopp.Conn
nickname hopp.Option[string]
nickname string
rooms usync.RWMonitor[map[string] hopp.Trans]
}
@@ -58,12 +59,12 @@ func (this *client) run() {
defer this.conn.Close()
for {
log.Println("accepting transaction")
trans, err := this.conn.AcceptTrans()
log.Println("accepted transaction")
if err != nil {
log.Printf("XXX %v failed: %v", this.conn.RemoteAddr(), err)
continue
if !errors.Is(err, io.EOF) {
log.Printf("XXX %v failed: %v", this.conn.RemoteAddr(), err)
}
return
}
go this.runTrans(trans)
}
@@ -97,21 +98,28 @@ func (this *client) transTalk(trans hopp.Trans, initial *chat.MessageJoin) error
err := this.joinRoom(trans, room)
if err != nil { return err }
defer this.leaveRoom(trans, room)
_, err = chat.Send(trans, &chat.MessageChat {
Content: "(i) joined " + room,
Nickname: "SYSTEM",
})
if err != nil { return err }
for {
message, _, err := chat.Receive(trans)
if err != nil { return err }
switch message := message.(type) {
case *chat.MessageChat:
case chat.MessageChat:
err := this.handleMessageChat(trans, room, message)
if err != nil { return err }
case *chat.MessageError:
return message
case chat.MessageError:
return &message
}
}
}
func (this *client) handleMessageChat(trans hopp.Trans, room string, message *chat.MessageChat) error {
log.Println("(). %s #%s: %s", this.nickname, room, message.Content)
func (this *client) handleMessageChat(trans hopp.Trans, room string, message chat.MessageChat) error {
log.Printf("(). %s #%s: %s", this.nickname, room, message.Content)
clients, done := clients.RBorrow()
defer done()
for client := range clients {
@@ -123,11 +131,11 @@ func (this *client) handleMessageChat(trans hopp.Trans, room string, message *ch
return nil
}
func (this *client) relayMessage(room string, message *chat.MessageChat) error {
func (this *client) relayMessage(room string, message chat.MessageChat) error {
rooms, done := this.rooms.RBorrow()
defer done()
if trans, ok := rooms[room]; ok {
_, err := chat.Send(trans, message)
_, err := chat.Send(trans, &message)
if err != nil {
return fmt.Errorf("could not relay message: %w", err)
}

View File

@@ -0,0 +1,84 @@
package main
import "io"
import "os"
import "log"
import "fmt"
import "time"
import "context"
import "git.tebibyte.media/sashakoshka/hopp"
// import "git.tebibyte.media/sashakoshka/hopp/examples/ping"
func main() {
name := os.Args[0]
if len(os.Args) != 2 {
fmt.Fprintf(os.Stderr, "Usage: %s HOST:PORT\n", name)
os.Exit(2)
}
address := os.Args[1]
conn, err := dial(address)
handleErr(1, err)
trans, err := conn.OpenTrans()
handleErr(1, err)
go func() {
defer fmt.Fprintf(os.Stdout, "(i) disconnected\n")
for {
// message, _, err := ping.Receive(trans)
// if err != nil {
// if !errors.Is(err, io.EOF) {
// handleErr(1, err)
// }
// break
// }
// switch message := message.(type) {
// case *ping.MessagePong:
// log.Printf("--> pong (%d) from %v", message, address)
// }
method, reader, err := trans.ReceiveReader()
if err != nil {
log.Printf("CLIENT recv: %v", err)
return
}
data, err := io.ReadAll(reader)
if err != nil {
log.Printf("CLIENT recv: %v", err)
return
}
log.Println("CLIENT got", method, data)
}
}()
// message := ping.MessagePing(0)
// for {
// log.Printf("<-- ping (%d)", message)
// _, err := ping.Send(trans, &message)
// handleErr(1, err)
// message ++
// time.Sleep(time.Second)
// }
data := [1]byte { }
for {
log.Println("CLIENT send", 1, data)
err := trans.Send(1, data[:])
handleErr(1, err)
data[0] ++
time.Sleep(time.Second)
}
}
func dial(address string) (hopp.Conn, error) {
ctx, done := context.WithTimeout(context.Background(), 16 * time.Second)
defer done()
conn, err := hopp.Dial(ctx, "tcp", address, nil)
if err != nil { return nil, err }
return conn, nil
}
func handleErr(code int, err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "%s: %v\n", os.Args[0], err)
os.Exit(code)
}
}

View File

@@ -0,0 +1,97 @@
package main
import "io"
import "os"
import "fmt"
import "log"
import "errors"
import "git.tebibyte.media/sashakoshka/hopp"
// import "git.tebibyte.media/sashakoshka/hopp/examples/ping"
var network = "tcp"
var addr = "localhost:7959"
func main() {
name := os.Args[0]
if len(os.Args) != 2 {
fmt.Fprintf(os.Stderr, "Usage: %s HOST:PORT\n", name)
os.Exit(2)
}
address := os.Args[1]
err := listen(address)
handleErr(1, err)
}
func listen(addr string) error {
defer log.Println("(i) closing")
listener, err := hopp.Listen(network, addr, nil)
if err != nil { return err }
log.Printf("(i) hosting on %s", addr)
for {
conn, err := listener.Accept()
if err != nil { return err }
go run(conn)
}
}
func run(conn hopp.Conn) {
log.Printf("-=E %v connected", conn.RemoteAddr())
defer log.Printf("X=- %v disconnected", conn.RemoteAddr())
defer conn.Close()
for {
trans, err := conn.AcceptTrans()
if err != nil {
if !errors.Is(err, io.EOF) {
log.Printf("XXX %v failed: %v", conn.RemoteAddr(), err)
}
return
}
go runTrans(conn, trans)
}
}
func runTrans(conn hopp.Conn, trans hopp.Trans) {
defer trans.Close()
for {
// message, _, err := ping.Receive(trans)
// if err != nil {
// if !errors.Is(err, io.EOF) {
// log.Printf("XXX failed to receive message: %v", err)
// }
// return
// }
// switch message := message.(type) {
// case *ping.MessagePing:
// log.Printf("--> ping (%d) from %v", message, conn.RemoteAddr())
// response := ping.MessagePong(*message)
// _, err := ping.Send(trans, &response)
// if err != nil {
// log.Printf("XXX failed to send message: %v", err)
// return
// }
// }
method, reader, err := trans.ReceiveReader()
if err != nil { log.Println("SERVER", err); return }
data, err := io.ReadAll(reader)
if err != nil { log.Println("SERVER", err); return }
log.Println("SERVER got", method, data)
log.Println("SERVER send", method, data)
func (){
writer, err := trans.SendWriter(1)
if err != nil { log.Println("SERVER", err); return }
defer writer.Close()
_, err = writer.Write(data[:])
if err != nil { log.Println("SERVER", err); return }
}()
}
}
func handleErr(code int, err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "%s: %v\n", os.Args[0], err)
os.Exit(code)
}
}

View File

@@ -0,0 +1,65 @@
package main
import "os"
import "io"
import "fmt"
import "log"
import "time"
import "errors"
import "context"
import "git.tebibyte.media/sashakoshka/hopp"
import "git.tebibyte.media/sashakoshka/hopp/examples/ping"
func main() {
name := os.Args[0]
if len(os.Args) != 2 {
fmt.Fprintf(os.Stderr, "Usage: %s HOST:PORT\n", name)
os.Exit(2)
}
address := os.Args[1]
conn, err := dial(address)
handleErr(1, err)
trans, err := conn.OpenTrans()
handleErr(1, err)
go func() {
message := ping.MessagePing(0)
for _ = range time.Tick(time.Second) {
log.Printf("<-- ping (%d)", message)
_, err := ping.Send(trans, &message)
handleErr(1, err)
message ++
}
}()
defer fmt.Fprintf(os.Stdout, "(i) disconnected\n")
for {
message, _, err := ping.Receive(trans)
if err != nil {
if !errors.Is(err, io.EOF) {
handleErr(1, err)
}
break
}
switch message := message.(type) {
case ping.MessagePong:
log.Printf("--> pong (%d) from %v", message, address)
}
}
}
func dial(address string) (hopp.Conn, error) {
ctx, done := context.WithTimeout(context.Background(), 16 * time.Second)
defer done()
conn, err := hopp.Dial(ctx, "tcp", address, nil)
if err != nil { return nil, err }
return conn, nil
}
func handleErr(code int, err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "%s: %v\n", os.Args[0], err)
os.Exit(code)
}
}

6
examples/ping/doc.go Normal file
View File

@@ -0,0 +1,6 @@
// Example ping demonstrates a simple ping/pong protocol.
package ping
// To use this in your own project, replace "go run ../../cmd/hopp-generate"
// with just "hopp-generate".
//go:generate go run ../../cmd/hopp-generate protocol.pdl -o protocol.go

159
examples/ping/protocol.go Normal file
View File

@@ -0,0 +1,159 @@
package ping
// Code generated by the Holanet PDL compiler. DO NOT EDIT.
// The source file is located at <path>
// Please edit that file instead, and re-compile it to this location.
// HOPP, TAPE, METADAPT, PDL/0 (c) 2025 holanet.xyz
import "fmt"
import "git.tebibyte.media/sashakoshka/hopp"
import "git.tebibyte.media/sashakoshka/hopp/tape"
// Table is a KTV table with an undefined schema.
type Table = map[uint16] any
// Message is any message that can be sent along this protocol.
type Message interface {
tape.Encodable
tape.Decodable
// Method returns the method code of the message.
Method() uint16
}
// Send encodes a message and sends it along a transaction.
func Send(trans hopp.Trans, message Message) (n int, err error) {
writer, err := trans.SendWriter(message.Method())
if err != nil { return n, err }
defer writer.Close()
encoder := tape.NewEncoder(writer)
n, err = message.Encode(encoder)
if err != nil { return n, err }
return n, encoder.Flush()
}
// canAssign determines if data from the given source tag can be assigned to
// a Go type represented by destination. It is designed to receive destination
// values from [generate.Generator.generateCanAssign]. The eventual Go type and
// the destination tag must come from the same (or hash-equivalent) PDL type.
func canAssign(destination, source tape.Tag) bool {
if destination.Is(source) { return true }
if (destination.Is(tape.SBA) || destination.Is(tape.LBA)) &&
(source.Is(tape.SBA) || source.Is(tape.LBA)) {
return true
}
return false
}
// boolInt converts a bool to an integer.
func boolInt(input bool) int {
if input {
return 1
} else {
return 0
}
}
// ensure ucontainer is always imported
var _ hopp.Option[int]
// ReceivedMessage is a sealed interface representing the value of a
// message in this package. To determine what kind of message it is,
// use a type switch like this:
//
// switch message := message.(type) {
// case MessagePing:
// doSomething()
// case MessagePong:
// doSomething()
// }
type ReceivedMessage interface {
isReceivedMessage()
}
// Ping is sent by the client to the server. It may contain any number. This
// number will be returned to the client via a [Pong] message.
type MessagePing int32
// Method returns the message's method number.
func(this *MessagePing) Method() uint16 { return 0x0000 }
// Encode encodes this message's tag and value.
func(this *MessagePing) Encode(encoder *tape.Encoder) (n int, err error) {
tag_1 := tape.LSI.WithCN(3)
nn, err := encoder.WriteTag(tag_1)
n += nn; if err != nil { return n, err }
nn, err = encoder.WriteInt32(int32((*this)))
n += nn; if err != nil { return n, err }
return n, nil
}
// Decode decodes this message's tag and value.
func(this *MessagePing) Decode(decoder *tape.Decoder) (n int, err error) {
tag, nn, err := decoder.ReadTag()
n += nn; if err != nil { return n, err }
if !(canAssign(tape.LSI, tag)) {
nn, err = tape.Skim(decoder, tag)
n += nn; if err != nil { return n, err }
return n, nil
}
destination_2, nn, err := decoder.ReadInt32()
n += nn; if err != nil { return n, err }
*this = MessagePing(destination_2)
return n, nil
}
func (this MessagePing) isReceivedMessage() { }
// Pong is sent by the server to the client in response to a [Ping] message, It
// will contain the same number as that message.
type MessagePong int32
// Method returns the message's method number.
func(this *MessagePong) Method() uint16 { return 0x0001 }
// Encode encodes this message's tag and value.
func(this *MessagePong) Encode(encoder *tape.Encoder) (n int, err error) {
tag_3 := tape.LSI.WithCN(3)
nn, err := encoder.WriteTag(tag_3)
n += nn; if err != nil { return n, err }
nn, err = encoder.WriteInt32(int32((*this)))
n += nn; if err != nil { return n, err }
return n, nil
}
// Decode decodes this message's tag and value.
func(this *MessagePong) Decode(decoder *tape.Decoder) (n int, err error) {
tag, nn, err := decoder.ReadTag()
n += nn; if err != nil { return n, err }
if !(canAssign(tape.LSI, tag)) {
nn, err = tape.Skim(decoder, tag)
n += nn; if err != nil { return n, err }
return n, nil
}
destination_4, nn, err := decoder.ReadInt32()
n += nn; if err != nil { return n, err }
*this = MessagePong(destination_4)
return n, nil
}
func (this MessagePong) isReceivedMessage() { }
// Receive decodes a message from a transaction and returns it as a value.
// Use a type switch to determine what type of message it is.
func Receive(trans hopp.Trans) (message ReceivedMessage, n int, err error) {
method, reader, err := trans.ReceiveReader()
if err != nil { return nil, n, err }
decoder := tape.NewDecoder(reader)
switch method {
case 0x0000:
var message MessagePing
nn, err := message.Decode(decoder)
n += nn; if err != nil { return nil, n, err }
return message, n, nil
case 0x0001:
var message MessagePong
nn, err := message.Decode(decoder)
n += nn; if err != nil { return nil, n, err }
return message, n, nil
}
return nil, n, fmt.Errorf("%w: M%04X", hopp.ErrUnknownMethod, method)
}

View File

@@ -0,0 +1,7 @@
// Ping is sent by the client to the server. It may contain any number. This
// number will be returned to the client via a [Pong] message.
M0000 Ping I32
// Pong is sent by the server to the client in response to a [Ping] message, It
// will contain the same number as that message.
M0001 Pong I32

View File

@@ -0,0 +1,77 @@
package main
import "os"
import "io"
import "fmt"
import "log"
import "errors"
import "git.tebibyte.media/sashakoshka/hopp"
import "git.tebibyte.media/sashakoshka/hopp/examples/ping"
func main() {
name := os.Args[0]
if len(os.Args) != 2 {
fmt.Fprintf(os.Stderr, "Usage: %s HOST:PORT\n", name)
os.Exit(2)
}
address := os.Args[1]
err := listen(address)
handleErr(1, err)
}
func listen(address string) error {
listener, err := hopp.Listen("tcp", address, nil)
if err != nil { return err }
log.Printf("(i) hosting on %s", address)
for {
conn, err := listener.Accept()
if err != nil { return err }
go run(conn)
}
}
func run(conn hopp.Conn) {
log.Printf("-=E %v connected", conn.RemoteAddr())
defer log.Printf("X=- %v disconnected", conn.RemoteAddr())
defer conn.Close()
for {
trans, err := conn.AcceptTrans()
if err != nil {
if !errors.Is(err, io.EOF) {
log.Printf("XXX %v failed: %v", conn.RemoteAddr(), err)
}
return
}
go runTrans(conn, trans)
}
}
func runTrans(conn hopp.Conn, trans hopp.Trans) {
for {
message, _, err := ping.Receive(trans)
if err != nil {
if !errors.Is(err, io.EOF) {
log.Printf("XXX failed to receive message: %v", err)
}
return
}
switch message := message.(type) {
case ping.MessagePing:
log.Printf("--> ping (%d) from %v", message, conn.RemoteAddr())
response := ping.MessagePong(message)
_, err := ping.Send(trans, &response)
if err != nil {
log.Printf("XXX failed to send message: %v", err)
return
}
}
}
}
func handleErr(code int, err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "%s: %v\n", os.Args[0], err)
os.Exit(code)
}
}

View File

@@ -11,6 +11,7 @@ import "git.tebibyte.media/sashakoshka/hopp/tape"
const imports =
`
import "fmt"
import "git.tebibyte.media/sashakoshka/hopp"
import "git.tebibyte.media/sashakoshka/hopp/tape"
`
@@ -41,7 +42,9 @@ func Send(trans hopp.Trans, message Message) (n int, err error) {
if err != nil { return n, err }
defer writer.Close()
encoder := tape.NewEncoder(writer)
return message.Encode(encoder)
n, err = message.Encode(encoder)
if err != nil { return n, err }
return n, encoder.Flush()
}
// canAssign determines if data from the given source tag can be assigned to
@@ -111,6 +114,8 @@ func (this *Generator) Generate(protocol *Protocol) (n int, err error) {
n += nn; if err != nil { return n, err }
// type definitions
nn, err = this.generateMessageValueInterface()
n += nn; if err != nil { return n, err }
for _, name := range slices.Sorted(maps.Keys(protocol.Types)) {
nn, err := this.generateTypedef(name, protocol.Types[name])
n += nn; if err != nil { return n, err }
@@ -137,6 +142,42 @@ func (this *Generator) Generate(protocol *Protocol) (n int, err error) {
return n, nil
}
func (this *Generator) generateMessageValueInterface() (n int, err error) {
keys := slices.Sorted(maps.Keys(this.protocol.Messages))
nn, err := this.iprint(
"\n// ReceivedMessage is a sealed interface representing the value of a\n" +
"// message in this package. To determine what kind of message it is,\n" +
"// use a type switch like this:\n" +
"// \n" +
"// switch message := message.(type) {\n")
n += nn; if err != nil { return n, err }
for index, method := range keys {
if index > 2 {
nn, err := this.iprint("// \n// ...\n// \n")
n += nn; if err != nil { return n, err }
break
}
nn, err := this.iprintf("// case %s:\n",
this.resolveMessageName(this.protocol.Messages[method].Name))
n += nn; if err != nil { return n, err }
nn, err = this.iprintf("// doSomething()\n")
n += nn; if err != nil { return n, err }
}
nn, err = this.iprintf("// }\n")
n += nn; if err != nil { return n, err }
nn, err = this.iprintf("type ReceivedMessage interface {\n")
n += nn; if err != nil { return n, err }
this.push()
nn, err = this.iprintf("isReceivedMessage()\n")
n += nn; if err != nil { return n, err }
this.pop()
nn, err = this.iprintf("}\n")
n += nn; if err != nil { return n, err }
return n, nil
}
func (this *Generator) generateTypedef(name string, typedef Typedef) (n int, err error) {
typ := typedef.Type
@@ -324,6 +365,10 @@ func (this *Generator) generateMessage(method uint16, message Message) (n int, e
this.pop()
nn, err = this.iprintf("}\n")
n += nn; if err != nil { return n, err }
// isReceivedMessage method
nn, err = this.iprintf("func (this %s) isReceivedMessage() { }\n", this.resolveMessageName(message.Name))
n += nn; if err != nil { return n, err }
return n, nil
}
@@ -1204,19 +1249,19 @@ func (this *Generator) generateReceive() (n int, err error) {
"// Use a type switch to determine what type of message it is.\n")
n += nn; if err != nil { return n, err }
nn, err = this.iprintf(
"func Receive(trans hopp.Trans) (message any, n int, err error) {\n")
"func Receive(trans hopp.Trans) (message ReceivedMessage, n int, err error) {\n")
n += nn; if err != nil { return n, err }
this.push()
nn, err = this.iprintf("method, reader, err := trans.ReceiveReader()\n")
n += nn; if err != nil { return n, err }
nn, err = this.iprintf("decoder := tape.NewDecoder(reader)\n")
n += nn; if err != nil { return n, err }
nn, err = this.iprintf("if err != nil { return nil, n, err }\n")
n += nn; if err != nil { return n, err }
nn, err = this.iprintf("decoder := tape.NewDecoder(reader)\n")
n += nn; if err != nil { return n, err }
nn, err = this.iprintf("switch method {\n")
n += nn; if err != nil { return n, err }
for method, message := range this.protocol.Messages {
nn, err = this.iprintf("case %04X:\n", method)
nn, err = this.iprintf("case 0x%04X:\n", method)
n += nn; if err != nil { return n, err }
this.push()
nn, err = this.iprintf(
@@ -1234,7 +1279,9 @@ func (this *Generator) generateReceive() (n int, err error) {
}
nn, err = this.iprint("}\n")
n += nn; if err != nil { return n, err }
nn, err = this.iprintf("return nil, n, hopp.ErrUnknownMethod\n")
// fuck off go vet
str := `return nil, n, fmt.Errorf("%w: M%04X", hopp.ErrUnknownMethod, method)`
nn, err = this.iprintln(str)
n += nn; if err != nil { return n, err }
this.pop()
nn, err = this.iprint("}\n")

View File

@@ -7,8 +7,10 @@ import "testing"
// generator is equal to something specific
var exampleProtocol = defaultProtocol()
var pingProtocol = defaultProtocol()
func init() {
// example protocol
exampleProtocol.Messages[0x0000] = Message {
Name: "Connect",
Type: TypeTableDefined {
@@ -121,6 +123,16 @@ func init() {
},
},
}
// ping protocol
pingProtocol.Messages[0x0000] = Message {
Name: "Ping",
Type: TypeInt { Bits: 32, Signed: true },
}
pingProtocol.Messages[0x0001] = Message {
Name: "Pong",
Type: TypeInt { Bits: 32, Signed: true },
}
}
func TestGenerateRunEncodeDecode(test *testing.T) {
@@ -276,31 +288,32 @@ func TestGenerateRunEncodeDecode(test *testing.T) {
}
testEncodeDecode(
&messageDynamic,
tu.S(0xE0, 14).AddVar(
[]byte { 0x00, 0x00, 0x20, 0x23 },
[]byte { 0x00, 0x01, 0x21, 0x32, 0x47 },
[]byte { 0x00, 0x02, 0x23, 0x87, 0x32, 0x45, 0x23 },
[]byte { 0x00, 0x03, 0x27, 0x32, 0x84, 0x02, 0x90, 0x34, 0x09, 0x82, 0x34 },
[]byte { 0x00, 0x04, 0x40, 0x23 },
[]byte { 0x00, 0x05, 0x41, 0x32, 0x47 },
[]byte { 0x00, 0x06, 0x43, 0x57, 0x32, 0x45, 0x23 },
[]byte { 0x00, 0x07, 0x47, 0x32, 0x84, 0x02, 0x90, 0x34, 0x09, 0x82, 0x34 },
[]byte { 0x00, 0x08, 0x63, 0x45, 0x12, 0x63, 0xCE },
[]byte { 0x00, 0x09, 0x67, 0x40, 0x74, 0x4E, 0x3D, 0x6F, 0xCD, 0x17, 0x75 },
[]byte { 0x00, 0x0A, 0x87, 'f', 'o', 'x', ' ', 'b', 'e', 'd' },
[]byte { 0x00, 0x0B, 0xC0, 0x04, 0x41,
snake.O().AddL(0xE0, 14).AddS(
snake.L(0x00, 0x00, 0x20, 0x23),
snake.L(0x00, 0x01, 0x21, 0x32, 0x47),
snake.L(0x00, 0x02, 0x23, 0x87, 0x32, 0x45, 0x23),
snake.L(0x00, 0x03, 0x27, 0x32, 0x84, 0x02, 0x90, 0x34, 0x09, 0x82, 0x34),
snake.L(0x00, 0x04, 0x40, 0x23),
snake.L(0x00, 0x05, 0x41, 0x32, 0x47),
snake.L(0x00, 0x06, 0x43, 0x57, 0x32, 0x45, 0x23),
snake.L(0x00, 0x07, 0x47, 0x32, 0x84, 0x02, 0x90, 0x34, 0x09, 0x82, 0x34),
snake.L(0x00, 0x08, 0x63, 0x45, 0x12, 0x63, 0xCE),
snake.L(0x00, 0x09, 0x67, 0x40, 0x74, 0x4E, 0x3D, 0x6F, 0xCD, 0x17, 0x75),
snake.L(0x00, 0x0A, 0x87, 'f', 'o', 'x', ' ', 'b', 'e', 'd'),
snake.L(0x00, 0x0B, 0xC0, 0x04, 0x41,
0x00, 0x07,
0x00, 0x06,
0x00, 0x05,
0x00, 0x04 },
[]byte { 0x00, 0x0C, 0xE0, 0x02,
0x00, 0x04),
snake.L(0x00, 0x0C, 0xE0, 0x02,
0x00, 0x01, 0x40, 0x08,
0x00, 0x02, 0x67, 0x40, 0x11, 0x99, 0x99, 0x99, 0x99, 0x99, 0x9A },
[]byte { 0x00, 0x0D, 0xE0, 0x03, // ERR
0x00, 0x01, 0x63, 0x43, 0xF4, 0xC0, 0x00,
0x00, 0x02, 0x82, 'h', 'i',
0x00, 0x03, 0x21, 0x39, 0x92 },
))
0x00, 0x02, 0x67, 0x40, 0x11, 0x99, 0x99, 0x99, 0x99, 0x99, 0x9A),
snake.O(snake.L(0x00, 0x0D, 0xE0, 0x03),
snake.S(
snake.L(0x00, 0x01, 0x63, 0x43, 0xF4, 0xC0, 0x00),
snake.L(0x00, 0x02, 0x82, 'h', 'i'),
snake.L(0x00, 0x03, 0x21, 0x39, 0x92)),
)))
`)
}
@@ -484,3 +497,119 @@ func TestGenerateRunDecodeWrongType(test *testing.T) {
}
`)
}
func TestGenerateRunSendReceive(test *testing.T) {
testGenerateRun(test, &pingProtocol, "send-receive", `
// imports
import "git.tebibyte.media/sashakoshka/hopp/internal/mock"
`, `
log.Println("Send"); {
message := MessagePing(77)
trans := mock.Trans { }
_, err := Send(&trans, &message)
if err != nil { log.Fatal(err) }
gotMethod, gotPayload := trans.Methods[0], trans.Messages[0]
log.Printf("method M%04X", gotMethod)
log.Println("payload", tu.HexBytes(gotPayload))
if gotMethod != 0x0000 {
log.Fatalln("wrong method")
}
if ok, n := snake.L(0x43, 0x00, 0x00, 0x00, 0x4D).Check(gotPayload); !ok {
log.Fatalln("not equal at:", n)
}
}
log.Println("Receive"); {
trans := mock.Trans {
Methods: []uint16 { 1 },
Messages: [][]byte { []byte { 0x43, 0x00, 0x00, 0x00, 0x4E } },
}
gotMessage, n, err := Receive(&trans)
if err != nil { log.Fatal(err) }
log.Println("message", gotMessage)
log.Println("n", n)
casted, ok := gotMessage.(MessagePong)
if !ok { log.Fatalln("expected MessagePong") }
if casted != 78 { log.Fatalln("wrong message value") }
if n != 5 { log.Fatalln("wrong n value") }
}
`)
}
func TestGenerateRunConn(test *testing.T) {
testGenerateRun(test, &pingProtocol, "send-receive", `
// imports
import "sync"
import "context"
import "git.tebibyte.media/sashakoshka/hopp"
`, `
group := sync.WaitGroup { }
group.Add(2)
// server
listener, err := hopp.Listen("tcp", "localhost:43957", nil)
if err != nil { log.Fatalln("SERVER listen:", err) }
go func() {
defer listener.Close()
defer group.Done()
conn, err := listener.Accept()
if err != nil { log.Fatalln("SERVER accept:", err) }
trans, err := conn.AcceptTrans()
if err != nil { log.Fatalln("SERVER accept trans:", err) }
message, n, err := Receive(trans)
if err != nil { log.Fatalln("SERVER receive:", err) }
log.Println("SERVER got message", message)
log.Println("SERVER got n", n)
casted, ok := message.(MessagePing)
if !ok { log.Fatalln("SERVER expected MessagePong") }
if casted != 77 { log.Fatalln("SERVER wrong message value") }
if n != 5 { log.Fatalln("SERVER wrong n value") }
message, n, err = Receive(trans)
if err != nil { log.Fatalln("SERVER receive:", err) }
log.Println("SERVER got message", message)
log.Println("SERVER got n", n)
casted, ok = message.(MessagePing)
if !ok { log.Fatalln("SERVER expected MessagePong") }
if casted != 78 { log.Fatalln("SERVER wrong message value") }
if n != 5 { log.Fatalln("SERVER wrong n value") }
}()
// client
go func() {
defer group.Done()
log.Println("CLIENT dialing")
conn, err := hopp.Dial(
context.Background(),
"tcp", "localhost:43957",
nil)
if err != nil { log.Fatalln("CLIENT dial:", err) }
defer conn.Close()
log.Println("CLIENT connected")
log.Println("CLIENT opening trans")
trans, err := conn.OpenTrans()
if err != nil { log.Fatalln("CLIENT open trans:", err) }
message := MessagePing(77)
log.Println("CLIENT sending message")
n, err := Send(trans, &message)
if err != nil { log.Fatalln("CLIENT send:", err) }
log.Println("CLIENT sent n", n)
if n != 5 { log.Fatalln("CLIENT wrong n value") }
message = MessagePing(78)
log.Println("CLIENT sending message")
n, err = Send(trans, &message)
if err != nil { log.Fatalln("CLIENT send:", err) }
log.Println("CLIENT sent n", n)
if n != 5 { log.Fatalln("CLIENT wrong n value") }
}()
group.Wait()
`)
}

View File

@@ -37,6 +37,7 @@ func testGenerateRun(test *testing.T, protocol *Protocol, title, imports, testCa
import "reflect"
import "git.tebibyte.media/sashakoshka/hopp/tape"
import tu "git.tebibyte.media/sashakoshka/hopp/internal/testutil"
import "git.tebibyte.media/sashakoshka/hopp/internal/testutil/snake"
` + imports
setup := `log.Println("*** BEGIN TEST CASE OUTPUT ***")`
teardown := `log.Println("--- END TEST CASE OUTPUT ---")`
@@ -61,8 +62,9 @@ func testGenerateRun(test *testing.T, protocol *Protocol, title, imports, testCa
func testDecode(correct Message, data any) {
var flat []byte
switch data := data.(type) {
case []byte: flat = data
case tu.Snake: flat = data.Flatten()
case []byte: flat = data
case tu.Snake: flat = data.Flatten()
case snake.Snake: flat = data.Flatten()
}
message := reflect.New(reflect.ValueOf(correct).Elem().Type()).Interface().(Message)
log.Println("before: ", message)
@@ -79,9 +81,7 @@ func testGenerateRun(test *testing.T, protocol *Protocol, title, imports, testCa
}
}
// TODO: possibly combine the two above functions into this one,
// also take a data parameter here (snake)
func testEncodeDecode(message Message, data tu.Snake) {buffer := bytes.Buffer { }
func testEncodeDecode(message Message, data any) {buffer := bytes.Buffer { }
log.Println("encoding:")
encoder := tape.NewEncoder(&buffer)
n, err := message.Encode(encoder)
@@ -93,13 +93,30 @@ func testGenerateRun(test *testing.T, protocol *Protocol, title, imports, testCa
if n != len(got) {
log.Fatalf("n incorrect: %d != %d\n", n, len(got))
}
if ok, n := data.Check(got); !ok {
log.Fatalln("not equal at", n)
var flat []byte
switch data := data.(type) {
case []byte:
flat = data
if ok, n := snake.Check(snake.L(data...), got); !ok {
log.Fatalln("not equal at", n)
}
case tu.Snake:
flat = data.Flatten()
if ok, n := data.Check(got); !ok {
log.Fatalln("not equal at", n)
}
case snake.Node:
flat = data.Flatten()
if ok, n := snake.Check(data, got); !ok {
log.Fatalln("not equal at", n)
}
default:
panic("AUSIAUGH AAAUUGUHGHGHH OUHGHGJDSGK")
}
log.Println("decoding:")
destination := reflect.New(reflect.ValueOf(message).Elem().Type()).Interface().(Message)
flat := data.Flatten()
log.Println("before: ", tu.Describe(destination))
decoder := tape.NewDecoder(bytes.NewBuffer(flat))
n, err = destination.Decode(decoder)

View File

@@ -0,0 +1,56 @@
package connshark
import "os"
import "io"
import "fmt"
import "net"
import "log"
import "sync"
import "math/rand"
import tu "git.tebibyte.media/sashakoshka/hopp/internal/testutil"
type insert struct {
net.Conn
output io.WriteCloser
lock sync.Mutex
}
func LogDebugFile(underlying net.Conn) net.Conn {
file, err := os.Create(fmt.Sprintf("connshark-%08X.log", rand.Uint32()))
if err != nil {
log.Println("XXX COULD NOT OPEN DEBUG FILE! reason: ", err)
return underlying
}
return Log(underlying, file)
}
func Log(underlying net.Conn, output io.WriteCloser) net.Conn {
return &insert {
Conn: underlying,
output: output,
}
}
func (this *insert) Read(buffer []byte) (n int, err error) {
if n > 0 {
this.lock.Lock()
defer this.lock.Unlock()
fmt.Fprintf(this.output, "TX: %s\n", tu.HexBytes(buffer[:n]))
}
return n, err
}
func (this *insert) Write(buffer []byte) (n int, err error) {
n, err = this.Conn.Write(buffer)
if n > 0 {
this.lock.Lock()
defer this.lock.Unlock()
fmt.Fprintf(this.output, "RX: %s\n", tu.HexBytes(buffer[:n]))
}
return n, err
}
func (this *insert) Close() error {
this.output.Close()
return this.Conn.Close()
}

View File

@@ -0,0 +1,64 @@
package mock
import "io"
import "time"
import "bytes"
import "git.tebibyte.media/sashakoshka/hopp"
var _ hopp.Trans = new(Trans)
// Trans is a mock transaction implementation.
type Trans struct {
// These arrays must be the same length. You can load this up
// with messages to read, or deposit messages and retrieve them
// here later.
Methods []uint16
Messages [][]byte
}
func (this *Trans) Close() error { return nil }
func (this *Trans) ID() int64 { return 56 }
func (this *Trans) Send(method uint16, data []byte) error {
this.Methods = append(this.Methods, method)
this.Messages = append(this.Messages, data)
return nil
}
func (this *Trans) SendWriter(method uint16) (io.WriteCloser, error) {
return &transWriter {
Buffer: new(bytes.Buffer),
method: method,
parent: this,
}, nil
}
func (this *Trans) Receive() (method uint16, data []byte, err error) {
if len(this.Methods) == 0 {
return 0, nil, io.EOF
}
method = this.Methods[0]
data = this.Messages[0]
this.Methods = this.Methods[1:]
this.Messages = this.Messages[1:]
return method, data, nil
}
func (this *Trans) ReceiveReader() (method uint16, reader io.Reader, err error) {
method, data, err := this.Receive()
if err != nil { return 0, nil, err }
return method, bytes.NewBuffer(data), nil
}
func (this *Trans) SetDeadline(time.Time) error { return nil }
type transWriter struct {
*bytes.Buffer
method uint16
parent *Trans
}
func (this *transWriter) Close() error {
return this.parent.Send(this.method, this.Bytes())
}

View File

@@ -0,0 +1,45 @@
package testutil
import "net"
import "fmt"
import "strings"
var _ net.Conn = new(ConnRecorder)
// ConnRecorder records write/flush actions performed on a net.Conn.
type ConnRecorder struct {
net.Conn
// A []byte means data was written, and untyped nil
// means data was flushed.
Log []any
}
func RecordConn(underlying net.Conn) *ConnRecorder {
return &ConnRecorder {
Conn: underlying,
}
}
func (this *ConnRecorder) Write(data []byte) (n int, err error) {
this.Log = append(this.Log, data)
return len(data), nil
}
func (this *ConnRecorder) Flush() error {
this.Log = append(this.Log, nil)
return nil
}
func (this *ConnRecorder) Dump() string {
builder := strings.Builder { }
for index, item := range this.Log {
fmt.Fprintf(&builder, "%06d ", index)
switch item := item.(type) {
case nil:
fmt.Fprintln(&builder, "FLUSH")
case []byte:
fmt.Fprintln(&builder, HexBytes(item))
}
}
return builder.String()
}

View File

@@ -0,0 +1,44 @@
package testutil
import "net"
import "testing"
func TestConnRecorder(test *testing.T) {
// server
listener, err := net.Listen("tcp", "localhost:9999")
if err != nil { test.Fatal(err) }
defer listener.Close()
go func() {
conn, err := listener.Accept()
defer conn.Close()
if err != nil { test.Fatal(err) }
buf := [16]byte { }
for {
_, err := conn.Read(buf[:])
if err != nil { break }
}
}()
// client
conn, err := net.Dial("tcp", "localhost:9999")
if err != nil { test.Fatal(err) }
defer conn.Close()
recorder := RecordConn(conn)
_, err = recorder.Write([]byte("hello"))
if err != nil { test.Fatal(err) }
_, err = recorder.Write([]byte("world!"))
if err != nil { test.Fatal(err) }
err = recorder.Flush()
if err != nil { test.Fatal(err) }
test.Log("GOT:\n" + recorder.Dump())
if len(recorder.Log) != 3 { test.Fatal("wrong length") }
if string(recorder.Log[0].([]byte)) != "hello" {
test.Fatal("not equal")
}
if string(recorder.Log[1].([]byte)) != "world!" {
test.Fatal("not equal")
}
}

View File

@@ -0,0 +1,214 @@
// Package snake lets you compare blocks of data where the ordering of certain
// parts may be swapped every which way. It is designed for comparing the
// encoding of maps where the ordering of individual elements is inconsistent.
package snake
import "fmt"
import "strings"
import tu "git.tebibyte.media/sashakoshka/hopp/internal/testutil"
var _ Node = Order { }
var _ Node = Snake { }
var _ Node = Leaf { }
// Check checks the data against the specified node. If the data doesn't satisfy
// the node, or the comparison succeded but didn't consume all the data, this
// function returns false, and the index of the byte where the inequality is.
func Check(node Node, data []byte) (ok bool, n int) {
ok, n = node.Check(data)
if !ok {
return false, n
}
if n != len(data) {
return false, n
}
return true, n
}
// O returns a new order given a vararg node slice.
func O(nodes ...Node) Order {
return Order(nodes)
}
// S returns a new snake given a vararg node slice.
func S(nodes ...Node) Snake {
return Snake(nodes)
}
// L returns a new leaf given a vararg byte slice.
func L(data ...byte) Leaf {
return Leaf([]byte(data))
}
// Order is satisfied when the data satisfies each of its nodes in the order
// that they are specified in the slice.
type Order []Node
// Check determines if the data satisfies the Order.
func (this Order) Check(data []byte) (ok bool, n int) {
left := data
for _, node := range this {
ok, nn := node.Check(left)
n += nn; if !ok { return false, n }
left = left[nn:]
}
return true, n
}
// Flatten returns the Order flattened to a byte array. The result of this
// function always satisfies the Order.
func (this Order) Flatten() []byte {
flat := []byte { }
for _, node := range this {
flat = append(flat, node.Flatten()...)
}
return flat
}
func (this Order) String() string {
out := strings.Builder { }
for index, node := range this {
if index > 0 {
fmt.Fprint(&out, " :")
}
fmt.Fprintf(&out, " %v", node)
}
return out.String()
}
// Add returns a new order with the given nodes appended to it.
func (this Order) Add(nodes ...Node) Order {
newOrder := make(Order, len(this) + len(nodes))
copy(newOrder, this)
copy(newOrder[len(this):], Order(nodes))
return newOrder
}
// AddO returns a new order with the given order appended to it.
func (this Order) AddO(nodes ...Node) Order {
return this.Add(O(nodes...))
}
// AddS returns a new order with the given snake appended to it.
func (this Order) AddS(nodes ...Node) Order {
return this.Add(S(nodes...))
}
// AddL returns a new order with the given leaf appended to it.
func (this Order) AddL(data ...byte) Order {
return this.Add(L(data...))
}
// Snake is satisfied when the data satisfies each of its nodes in no particular
// order.
type Snake []Node
// Check determines if the data satisfies the snake.
func (this Snake) Check(data []byte) (ok bool, n int) {
fmt.Println("CHECKING SNAKE")
left := data
nodes := map[int] Node { }
for key, node := range this {
nodes[key] = node
}
for len(nodes) > 0 {
found := false
for key, node := range nodes {
fmt.Println(left, key, node)
ok, nn := node.Check(left)
fmt.Println(ok, nn)
if !ok { continue }
n += nn
left = data[n:]
delete(nodes, key)
found = true
break
}
if !found { return false, n }
}
return true, n
}
// Flatten returns the snake flattened to a byte array. The result of this
// function always satisfies the snake.
func (this Snake) Flatten() []byte {
flat := []byte { }
for _, node := range this {
flat = append(flat, node.Flatten()...)
}
return flat
}
func (this Snake) String() string {
out := strings.Builder { }
out.WriteString("[")
for index, node := range this {
if index > 0 {
fmt.Fprint(&out, " /")
}
fmt.Fprintf(&out, " %v", node)
}
out.WriteString(" ]")
return out.String()
}
// Add returns a new snake with the given nodes appended to it.
func (this Snake) Add(nodes ...Node) Snake {
newSnake := make(Snake, len(this) + len(nodes))
copy(newSnake, this)
copy(newSnake[len(this):], Snake(nodes))
return newSnake
}
// AddO returns a new snake with the given order appended to it.
func (this Snake) AddO(nodes ...Node) Snake {
return this.Add(O(nodes...))
}
// AddS returns a new snake with the given snake appended to it.
func (this Snake) AddS(nodes ...Node) Snake {
return this.Add(S(nodes...))
}
// AddL returns a new snake with the given leaf appended to it.
func (this Snake) AddL(data ... byte) Snake {
return this.Add(L(data...))
}
// Leaf is satisfied when the data matches it exactly.
type Leaf []byte
// Check determines if the data is equal to the leaf.
func (this Leaf) Check(data []byte) (ok bool, n int) {
if len(data) < len(this) {
return false, len(data)
}
for index, byt := range this {
if byt != data[index] {
return false, index
}
}
return true, len(this)
}
// This one's easy.
func (this Leaf) Flatten() []byte {
return []byte(this)
}
func (this Leaf) String() string {
return tu.HexBytes([]byte(this))
}
// Node represents a snake node.
type Node interface {
// Check determines if the data satisfies the node. If satisfied, the function
// will return true, and the index at which it stopped. If not, the
// function will return false, and the index of the first byte that
// didn't match. As long as the start of the data satisfies the node,
// whatever comes after it doesn't matter.
Check(data []byte) (ok bool, n int)
// Flatten returns the node flattened to a byte array. The result of
// this function always satisfies the node.
Flatten() []byte
}

View File

@@ -0,0 +1,89 @@
package snake
import "testing"
func TestSnakeA(test *testing.T) {
snake := O().AddL(1, 6).AddS(
L(1),
L(2),
L(3),
L(4),
L(5),
).AddL(9)
test.Log(snake)
ok, n := Check(snake, []byte { 1, 6, 1, 2, 3, 4, 5, 9 })
if !ok { test.Fatal("false negative:", n) }
ok, n = Check(snake, []byte { 1, 6, 5, 4, 3, 2, 1, 9 })
if !ok { test.Fatal("false negative:", n) }
ok, n = Check(snake, []byte { 1, 6, 3, 1, 4, 2, 5, 9 })
if !ok { test.Fatal("false negative:", n) }
ok, n = Check(snake, []byte { 1, 6, 9 })
if ok { test.Fatal("false positive:", n) }
ok, n = Check(snake, []byte { 1, 6, 1, 2, 3, 4, 5, 6, 9 })
if ok { test.Fatal("false positive:", n) }
ok, n = Check(snake, []byte { 1, 6, 0, 2, 3, 4, 5, 6, 9 })
if ok { test.Fatal("false positive:", n) }
ok, n = Check(snake, []byte { 1, 6, 7, 1, 4, 2, 5, 9 })
if ok { test.Fatal("false positive:", n) }
ok, n = Check(snake, []byte { 1, 6, 7, 3, 1, 4, 2, 5, 9 })
if ok { test.Fatal("false positive:", n) }
ok, n = Check(snake, []byte { 1, 6, 7, 3, 1, 4, 2, 5, 9 })
if ok { test.Fatal("false positive:", n) }
ok, n = Check(snake, []byte { 1, 6, 1, 2, 3, 4, 5, 9, 10})
if ok { test.Fatal("false positive:", n) }
}
func TestSnakeB(test *testing.T) {
snake := O().AddO(L(1), L(6)).AddS(
L(1),
L(2),
).AddL(9).AddS(
L(3, 2),
L(0),
L(1, 1, 2, 3),
)
test.Log(snake)
ok, n := Check(snake, []byte { 1, 6, 1, 2, 9, 3, 2, 0, 1, 1, 2, 3})
if !ok { test.Fatal("false negative:", n) }
ok, n = Check(snake, []byte { 1, 6, 2, 1, 9, 0, 1, 1, 2, 3, 3, 2})
if !ok { test.Fatal("false negative:", n) }
ok, n = Check(snake, []byte { 1, 6, 9 })
if ok { test.Fatal("false positive:", n) }
ok, n = Check(snake, []byte { 1, 6, 1, 2, 9 })
if ok { test.Fatal("false positive:", n) }
ok, n = Check(snake, []byte { 1, 6, 9, 3, 2, 0, 1, 1, 2, 3})
if ok { test.Fatal("false positive:", n) }
ok, n = Check(snake, []byte { 1, 6, 2, 9, 0, 1, 1, 2, 3, 3, 2})
if ok { test.Fatal("false positive:", n) }
ok, n = Check(snake, []byte { 1, 6, 1, 2, 9, 3, 2, 1, 1, 2, 3})
if ok { test.Fatal("false positive:", n) }
}
func TestSnakeC(test *testing.T) {
snake := S(
L(1, 2, 3),
S(L(6), L(7), L(8)),
)
test.Log(snake)
ok, n := Check(snake, []byte { 1, 2, 3, 6, 7, 8 })
if !ok { test.Fatal("false negative:", n) }
ok, n = Check(snake, []byte { 6, 7, 8, 1, 2, 3 })
if !ok { test.Fatal("false negative:", n) }
ok, n = Check(snake, []byte { 7, 8, 6, 1, 2, 3 })
if !ok { test.Fatal("false negative:", n) }
ok, n = Check(snake, []byte { 1, 2, 3, 8, 6, 7 })
if !ok { test.Fatal("false negative:", n) }
ok, n = Check(snake, []byte { 2, 1, 3, 6, 7, 8 })
if ok { test.Fatal("false positive:", n) }
ok, n = Check(snake, []byte { 6, 1, 2, 3, 7, 8 })
if ok { test.Fatal("false positive:", n) }
}

View File

@@ -20,6 +20,9 @@ type Listener interface {
// - "quic"
// - "quic4" (IPv4-only)
// - "quic6" (IPv6-only)
// - "tls"
// - "tls4" (IPv4-only)
// - "tls6" (IPv6-only)
// - "tcp"
// - "tcp4" (IPv4-only)
// - "tcp6" (IPv6-only)
@@ -29,9 +32,17 @@ type Listener interface {
func Listen(network, address string, tlsConf *tls.Config) (Listener, error) {
switch network {
case "quic", "quic4", "quic6": return ListenQUIC(network, address, tlsConf)
case "tcp", "tcp4", "tcp6": return ListenTLS(network, address, tlsConf)
case "unix": return ListenUnix(network, address)
default: return nil, ErrUnknownNetwork
case "tls", "tls4", "tls6": return ListenTLS(network, address, tlsConf)
case "tcp", "tcp4", "tcp6":
addr, err := net.ResolveTCPAddr(network, address)
if err != nil { return nil, err }
return ListenTCP(network, addr)
case "unix":
addr, err := net.ResolveUnixAddr(network, address)
if err != nil { return nil, err }
return ListenUnix(network, addr)
default:
return nil, ErrUnknownNetwork
}
}
@@ -46,7 +57,8 @@ func ListenQUIC(network, address string, tlsConf *tls.Config) (Listener, error)
// ListenTLS listens for incoming HOPP connections using a TLS socket as a
// transport. The network must be "tcp".
func ListenTLS(network, address string, tlsConf *tls.Config) (Listener, error) {
if network != "tcp" { return nil, ErrUnknownNetwork }
network, err := tlsNetworkToTCPNetwork(network)
if err != nil { return nil, err }
listener, err := tls.Listen(network, address, tlsConf)
if err != nil { return nil, err }
return &netListenerWrapper {
@@ -54,12 +66,19 @@ func ListenTLS(network, address string, tlsConf *tls.Config) (Listener, error) {
}, nil
}
// ListenTCP listens for incoming HOPP connections using a TCP socket as a
// transport. The network must be "tcp".
func ListenTCP(network string, laddr *net.TCPAddr) (Listener, error) {
listener, err := net.ListenTCP(network, laddr)
if err != nil { return nil, err }
return &netListenerWrapper {
underlying: listener,
}, nil
}
// ListenUnix listens for incoming HOPP connections using a Unix domain socket
// as a transport. The network must be "unix".
func ListenUnix(network, address string) (Listener, error) {
if network != "unix" { return nil, ErrUnknownNetwork }
addr, err := net.ResolveUnixAddr(network, address)
if err != nil { return nil, err }
func ListenUnix(network string, addr *net.UnixAddr) (Listener, error) {
listener, err := net.ListenUnix(network, addr)
if err != nil { return nil, err }
return &netListenerWrapper {

View File

@@ -6,6 +6,8 @@ import "fmt"
import "net"
import "sync"
import "time"
import "bytes"
import "context"
import "sync/atomic"
import "git.tebibyte.media/sashakoshka/go-util/sync"
@@ -16,6 +18,12 @@ const closeMethod = 0xFFFF
const int64Max = int64((^uint64(0)) >> 1)
const defaultChunkSize = 0x1000
var bufferPool = sync.Pool {
New: func() any {
return &bytes.Buffer { }
},
}
// Party represents a side of a connection.
type Party bool; const (
ServerSide Party = false
@@ -39,20 +47,23 @@ type a struct {
sendLock sync.Mutex
transMap map[int64] *transA
transChan chan *transA
done chan struct { }
ctx context.Context
done func()
err error
}
// AdaptA returns a connection implementing METADAPT-A over a singular stream-
// oriented transport such as TCP or UNIX domain stream sockets.
func AdaptA(underlying net.Conn, party Party) Conn {
ctx, done := context.WithCancel(context.Background())
conn := &a {
sizeLimit: defaultSizeLimit,
underlying: underlying,
party: party,
transMap: make(map[int64] *transA),
transChan: make(chan *transA),
done: make(chan struct { }),
ctx: ctx,
done: done,
}
if party == ClientSide {
conn.transID = 1
@@ -60,11 +71,15 @@ func AdaptA(underlying net.Conn, party Party) Conn {
conn.transID = -1
}
go conn.receive()
go func() {
<- ctx.Done()
underlying.Close()
}()
return conn
}
func (this *a) Close() error {
close(this.done)
this.done()
return nil
}
@@ -105,7 +120,7 @@ func (this *a) AcceptTrans() (Trans, error) {
return nil, eof
}
return trans, nil
case <- this.done:
case <- this.ctx.Done():
return nil, eof
}
}
@@ -124,10 +139,10 @@ func (this *a) unlistTransactionSafe(id int64) {
delete(this.transMap, id)
}
func (this *a) sendMessageSafe(trans int64, method uint16, data []byte) error {
func (this *a) sendMessageSafe(trans int64, method uint16, ccb uint64, data []byte) error {
this.sendLock.Lock()
defer this.sendLock.Unlock()
return encodeMessageA(this.underlying, this.sizeLimit, trans, method, data)
return encodeMessageA(this.underlying, this.sizeLimit, trans, method, 0, data)
}
func (this *a) receive() {
@@ -214,12 +229,12 @@ type transA struct {
parent *a
id int64
incoming usync.Gate[incomingMessage]
currentReader io.Reader
currentWriter io.Closer
writeBuffer []byte
closed atomic.Bool
closeErr error
currentReader io.Reader
currentWriter usync.Monitor[io.Closer]
deadline *time.Timer
deadlineLock sync.Mutex
}
@@ -251,27 +266,28 @@ func (this *transA) ID() int64 {
}
func (this *transA) Send(method uint16, data []byte) error {
return this.parent.sendMessageSafe(this.id, method, data)
return this.parent.sendMessageSafe(this.id, method, 0, data)
}
func (this *transA) SendWriter(method uint16) (io.WriteCloser, error) {
currentWriter, done := this.currentWriter.BorrowReturn()
defer done(&currentWriter)
// close previous writer if necessary
if this.currentWriter != nil {
this.currentWriter.Close()
this.currentWriter = nil
if currentWriter != nil {
currentWriter.Close()
currentWriter = nil
}
// create new writer
writer := &writerA {
parent: this,
// there is only ever one writer at a time, so they can all
// share a buffer
buffer: this.writeBuffer[:0],
buffer: bufferPool.Get().(*bytes.Buffer),
method: method,
chunkSize: defaultChunkSize,
open: true,
}
this.currentWriter = writer
currentWriter = writer
return writer, nil
}
@@ -382,7 +398,7 @@ func (this *readerA) pull() (uint16, error) {
// close and return error on failure
this.eof = true
this.parent.Close()
return 0, fmt.Errorf("could not receive message: %w", this.parent.bestErr())
return 0, this.parent.bestErr()
}
func (this *readerA) Read(buffer []byte) (int, error) {
@@ -398,14 +414,14 @@ func (this *readerA) Read(buffer []byte) (int, error) {
type writerA struct {
parent *transA
buffer []byte
buffer *bytes.Buffer
method uint16
chunkSize int64
open bool
}
func (this *writerA) Write(data []byte) (n int, err error) {
if !this.open { return 0, io.EOF }
if !this.open || this.parent.closed.Load() { return 0, io.EOF }
toSend := data
for len(toSend) > 0 {
nn, err := this.writeOne(toSend)
@@ -417,7 +433,18 @@ func (this *writerA) Write(data []byte) (n int, err error) {
}
func (this *writerA) Close() error {
this.open = false
if this.buffer != nil {
// flush if needed
if this.buffer.Len() > 0 {
this.flush(0)
}
this.open = false
// reset the buffer and put it back in the pool
this.buffer.Reset()
bufferPool.Put(this.buffer)
this.buffer = nil
}
return nil
}
@@ -425,26 +452,31 @@ func (this *writerA) writeOne(data []byte) (n int, err error) {
data = data[:min(len(data), int(this.chunkSize))]
// if there is more room, append to the buffer and exit
if int64(len(this.buffer) + len(data)) <= this.chunkSize {
this.buffer = append(this.buffer, data...)
if int64(this.buffer.Len() + len(data)) <= this.chunkSize {
this.buffer.Write(data)
n = len(data)
// if have a full chunk, flush
if int64(len(this.buffer)) == this.chunkSize {
err = this.flush()
if int64(this.buffer.Len()) == this.chunkSize {
err = this.flush(1)
if err != nil { return n, err }
}
return n, nil
}
// if not, flush and store as much as we can in the buffer
err = this.flush()
err = this.flush(1)
if err != nil { return n, err }
this.buffer = append(this.buffer, data...)
n = int(min(int64(len(data)), this.chunkSize))
this.buffer.Write(data[:n])
return n, nil
}
func (this *writerA) flush() error {
return this.parent.parent.sendMessageSafe(this.parent.id, this.method, this.buffer)
func (this *writerA) flush(ccb uint64) error {
err := this.parent.parent.sendMessageSafe(
this.parent.id, this.method, ccb,
this.buffer.Bytes())
this.buffer.Reset()
return err
}
type incomingMessage struct {
@@ -458,15 +490,19 @@ func encodeMessageA(
sizeLimit int64,
trans int64,
method uint16,
ccb uint64,
data []byte,
) error {
if int64(len(data)) > sizeLimit {
return ErrPayloadTooLarge
}
buffer := make([]byte, 18 + len(data))
// transaction ID field
encodeI64(buffer[:8], trans)
// method field
encodeI16(buffer[8:10], method)
encodeI64(buffer[10:18], uint64(len(data)))
// payload size field
encodeI64(buffer[10:18], uint64(len(data)) & 0x7FFFFFFFFFFFFFFF | ccb << 63)
copy(buffer[18:], data)
_, err := writer.Write(buffer)
return err
@@ -485,6 +521,7 @@ func decodeMessageA(
headerBuffer := [18]byte { }
_, err = io.ReadFull(reader, headerBuffer[:])
if err != nil { return 0, 0, false, nil, err }
transID, err = decodeI64[int64](headerBuffer[:8])
if err != nil { return 0, 0, false, nil, err }
method, err = decodeI16[uint16](headerBuffer[8:10])

View File

@@ -2,10 +2,13 @@ package hopp
import "io"
import "net"
import "sync"
import "bytes"
import "errors"
import "slices"
import "testing"
import "context"
import tu "git.tebibyte.media/sashakoshka/hopp/internal/testutil"
// some of these tests spawn goroutines that can signal a failure.
// abide by the documentation for testing.T (https://pkg.go.dev/testing#T):
@@ -52,6 +55,7 @@ func TestConnA(test *testing.T) {
test.Error("CLIENT payload:", gotPayload)
test.Fatal("CLIENT ok byeeeeeeeeeeeee")
}
test.Log("CLIENT transaction has closed")
}
serverFunc := func(a Conn) {
@@ -70,20 +74,6 @@ func TestConnA(test *testing.T) {
}
func TestTransOpenCloseA(test *testing.T) {
// currently:
//
// | data sent | data recvd | close sent | close recvd
// 10 | X | X | X | server hangs
// 20 | X | X | X | client hangs
// 30 | X | | X |
//
// when a close message is recvd, it tries to push to the trans and
// hangs on trans.incoming.Send, which hangs on sending the value to the
// underlying channel. why is this?
//
// check if we are really getting values from the channel when pulling
// from the trans channel when we are expecting a close.
clientFunc := func(conn Conn) {
// 10
trans, err := conn.OpenTrans()
@@ -142,10 +132,71 @@ func TestTransOpenCloseA(test *testing.T) {
clientServerEnvironment(test, clientFunc, serverFunc)
}
func TestReadWriteA(test *testing.T) {
payloads := []string {
"hello",
"world",
"When the impostor is sus!",
}
clientFunc := func(a Conn) {
test.Log("CLIENT accepting transaction")
trans, err := a.AcceptTrans()
if err != nil { test.Fatal("CLIENT", err) }
test.Log("CLIENT accepted transaction")
test.Cleanup(func() { trans.Close() })
for method, payload := range payloads {
test.Log("CLIENT waiting...")
gotMethod, gotReader, err := trans.ReceiveReader()
if err != nil { test.Fatal("CLIENT", err) }
gotPayloadBytes, err := io.ReadAll(gotReader)
if err != nil { test.Fatal("CLIENT", err) }
gotPayload := string(gotPayloadBytes)
test.Log("CLIENT m:", gotMethod, "p:", tu.HexBytes(gotPayloadBytes))
if int(gotMethod) != method {
test.Error("CLIENT method not equal, expected", method)
}
if gotPayload != payload {
test.Error(
"CLIENT payload not equal, expected",
tu.HexBytes([]byte(payload)))
}
}
test.Log("CLIENT waiting for transaction close...")
gotMethod, gotPayload, err := trans.Receive()
if !errors.Is(err, io.EOF) {
test.Error("CLIENT wrong error:", err)
test.Error("CLIENT method:", gotMethod)
test.Error("CLIENT payload:", tu.HexBytes(gotPayload))
test.Fatal("CLIENT (expected io.EOF and no message)")
}
test.Log("CLIENT transaction has closed")
}
serverFunc := func(a Conn) {
defer test.Log("SERVER closing connection")
trans, err := a.OpenTrans()
if err != nil { test.Error("SERVER", err); return }
test.Cleanup(func() { trans.Close() })
for method, payload := range payloads {
test.Log("SERVER m:", method, "p:", tu.HexBytes([]byte(payload)))
func() {
writer, err := trans.SendWriter(uint16(method))
if err != nil { test.Error("SERVER", err); return }
defer writer.Close()
_, err = writer.Write([]byte(payload))
if err != nil { test.Error("SERVER", err); return }
}()
}
}
clientServerEnvironment(test, clientFunc, serverFunc)
}
func TestEncodeMessageA(test *testing.T) {
buffer := new(bytes.Buffer)
payload := []byte { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05 }
err := encodeMessageA(buffer, defaultSizeLimit, 0x5800FEABC3104F04, 0x6B12, payload)
err := encodeMessageA(buffer, defaultSizeLimit, 0x5800FEABC3104F04, 0x6B12, 0, payload)
correct := []byte {
0x58, 0x00, 0xFE, 0xAB, 0xC3, 0x10, 0x4F, 0x04,
0x6B, 0x12,
@@ -163,7 +214,7 @@ func TestEncodeMessageA(test *testing.T) {
func TestEncodeMessageAErr(test *testing.T) {
buffer := new(bytes.Buffer)
payload := make([]byte, 0x10000)
err := encodeMessageA(buffer, 0x20, 0x5800FEABC3104F04, 0x6B12, payload)
err := encodeMessageA(buffer, 0x20, 0x5800FEABC3104F04, 0x6B12, 0, payload)
if !errors.Is(err, ErrPayloadTooLarge) {
test.Fatalf("wrong error: %v", err)
}
@@ -208,7 +259,7 @@ func TestEncodeDecodeMessageA(test *testing.T) {
correctMethod := uint16(30)
correctPayload := []byte("good")
buffer := bytes.Buffer { }
err := encodeMessageA(&buffer, defaultSizeLimit, correctTransID, correctMethod, correctPayload)
err := encodeMessageA(&buffer, defaultSizeLimit, correctTransID, correctMethod, 0, correctPayload)
if err != nil { test.Fatal(err) }
transID, method, chunked, payload, err := decodeMessageA(&buffer, defaultSizeLimit)
if got, correct := transID, int64(2); got != correct {
@@ -225,39 +276,255 @@ func TestEncodeDecodeMessageA(test *testing.T) {
}
}
func TestConsecutiveSend(test *testing.T) {
packets := [][]byte {
[]byte {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05,
0x43, 0x00, 0x00, 0x00, 0x07 },
[]byte {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05,
0x43, 0x00, 0x00, 0x00, 0x08 },
}
payloads := [][]byte {
[]byte { 0x43, 0x00, 0x00, 0x00, 0x07 },
[]byte { 0x43, 0x00, 0x00, 0x00, 0x08 },
}
var group sync.WaitGroup
group.Add(2)
// server
listener, err := net.Listen("tcp", "localhost:9999")
if err != nil { test.Fatal("SERVER", err) }
go func() {
defer group.Done()
defer listener.Close()
conn, err := listener.Accept()
if err != nil { test.Fatal("SERVER", err) }
defer conn.Close()
buf := [16]byte { }
for {
_, err := conn.Read(buf[:])
if err != nil { break }
}
}()
// client
go func() {
defer group.Done()
conn, err := net.Dial("tcp", "localhost:9999")
if err != nil { test.Fatal("CLIENT", err) }
defer conn.Close()
recorder := tu.RecordConn(conn)
a := AdaptA(recorder, ClientSide)
trans, err := a.OpenTrans()
if err != nil { test.Fatal("CLIENT", err) }
for _, payload := range payloads {
err := trans.Send(0x0000, payload)
if err != nil { test.Fatal("CLIENT", err) }
}
test.Log("CLIENT recorded output:\n" + recorder.Dump())
if len(recorder.Log) != 2 { test.Fatal("wrong length") }
if !slices.Equal(recorder.Log[0].([]byte), packets[0]) {
test.Fatal("not equal")
}
if !slices.Equal(recorder.Log[1].([]byte), packets[1]) {
test.Fatal("not equal")
}
}()
group.Wait()
}
func TestConsecutiveWrite(test *testing.T) {
packets := [][]byte {
[]byte {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05,
0x43, 0x00, 0x00, 0x00, 0x07 },
[]byte {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05,
0x43, 0x00, 0x00, 0x00, 0x08 },
}
payloads := [][]byte {
[]byte { 0x43, 0x00, 0x00, 0x00, 0x07 },
[]byte { 0x43, 0x00, 0x00, 0x00, 0x08 },
}
var group sync.WaitGroup
group.Add(2)
// server
listener, err := net.Listen("tcp", "localhost:9999")
if err != nil { test.Fatal("SERVER", err) }
go func() {
defer group.Done()
defer listener.Close()
conn, err := listener.Accept()
if err != nil { test.Fatal("SERVER", err) }
defer conn.Close()
buf := [16]byte { }
for {
_, err := conn.Read(buf[:])
if err != nil { break }
}
}()
// client
go func() {
defer group.Done()
conn, err := net.Dial("tcp", "localhost:9999")
if err != nil { test.Fatal("CLIENT", err) }
defer conn.Close()
recorder := tu.RecordConn(conn)
a := AdaptA(recorder, ClientSide)
trans, err := a.OpenTrans()
if err != nil { test.Fatal("CLIENT", err) }
for _, payload := range payloads {
func() {
writer, err := trans.SendWriter(0x0000)
if err != nil { test.Fatal("CLIENT", err) }
_, err = writer.Write(payload)
if err != nil { test.Fatal("CLIENT", err) }
writer.Close()
}()
}
test.Log("CLIENT recorded output:\n" + recorder.Dump())
if len(recorder.Log) != 2 { test.Fatal("wrong length") }
if !slices.Equal(recorder.Log[0].([]byte), packets[0]) {
test.Fatal("not equal")
}
if !slices.Equal(recorder.Log[1].([]byte), packets[1]) {
test.Fatal("not equal")
}
}()
group.Wait()
}
func TestConsecutiveReceive(test *testing.T) {
stream := []byte {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05,
0x43, 0x00, 0x00, 0x00, 0x07,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05,
0x43, 0x00, 0x00, 0x00, 0x08,
}
payloads := [][]byte {
[]byte { 0x43, 0x00, 0x00, 0x00, 0x07 },
[]byte { 0x43, 0x00, 0x00, 0x00, 0x08 },
}
var group sync.WaitGroup
group.Add(2)
// server
listener, err := net.Listen("tcp", "localhost:9999")
if err != nil { test.Fatal("SERVER", err) }
go func() {
defer group.Done()
defer listener.Close()
conn, err := listener.Accept()
if err != nil { test.Fatal("SERVER", err) }
defer conn.Close()
a := AdaptA(conn, ServerSide)
trans, err := a.AcceptTrans()
if err != nil { test.Fatal("SERVER", err) }
index := 0
for {
method, data, err := trans.Receive()
if err != nil {
if !errors.Is(err, io.EOF) {
test.Fatal("SERVER", err)
}
break
}
test.Logf("SERVER GOT: M%04X %s", method, tu.HexBytes(data))
if index >= len(payloads) {
test.Fatalf(
"SERVER we weren't supposed to receive %d messages",
index + 1)
}
if method != 0 {
test.Fatal("SERVER", "method not equal")
}
if !slices.Equal(data, payloads[index]) {
test.Fatal("SERVER", "data not equal")
}
index ++
}
if index != len(payloads) {
test.Fatalf(
"SERVER we weren't supposed to receive %d messages",
index + 1)
}
}()
// client
go func() {
defer group.Done()
conn, err := net.Dial("tcp", "localhost:9999")
if err != nil { test.Fatal("CLIENT", err) }
defer conn.Close()
_, err = conn.Write(stream)
if err != nil { test.Fatal("CLIENT", err) }
}()
group.Wait()
}
func clientServerEnvironment(test *testing.T, clientFunc func(conn Conn), serverFunc func(conn Conn)) {
network := "tcp"
addr := "localhost:7959"
// server
listener, err := net.Listen(network, addr)
if err != nil { test.Fatal(err) }
listener, err := Listen(network, addr, nil)
test.Cleanup(func() { listener.Close() })
go func() {
test.Log("SERVER listening")
conn, err := listener.Accept()
if err != nil { test.Error("SERVER", err); return }
defer conn.Close()
test.Cleanup(func() { conn.Close() })
a := AdaptA(conn, ServerSide)
test.Cleanup(func() { a.Close() })
serverFunc(a)
serverFunc(conn)
test.Log("SERVER closing")
}()
// client
test.Log("CLIENT dialing")
conn, err := net.Dial(network, addr)
conn, err := Dial(context.Background(), network, addr, nil)
if err != nil { test.Fatal("CLIENT", err) }
test.Cleanup(func() { conn.Close() })
test.Log("CLIENT dialed")
a := AdaptA(conn, ClientSide)
test.Cleanup(func() { a.Close() })
clientFunc(a)
clientFunc(conn)
test.Log("CLIENT waiting for connection close...")
trans, err := a.AcceptTrans()
trans, err := conn.AcceptTrans()
if !errors.Is(err, io.EOF) {
test.Error("CLIENT wrong error:", err)
test.Fatal("CLIENT trans:", trans)