21 Commits

Author SHA1 Message Date
874ae2e011 Update Trans docs with new concurrency rules 2025-11-19 20:56:16 -05:00
c5073e5f20 METADAPT-A writing is goroutine safe 2025-11-19 20:55:57 -05:00
cdda4f932d Remove stale comment 2025-11-19 20:52:33 -05:00
8c564e4755 Use a buffer pool for METADAPT-A 2025-11-19 20:50:11 -05:00
9651fed635 Use bytes.Buffer instead of a slice for METADAPT-A writer 2025-11-19 20:36:10 -05:00
d6dc6d6e78 examples/chat: Re-generate 2025-11-19 19:45:16 -05:00
52b4c44db3 examples: Add go:generate directives to generate protcol.go files 2025-11-19 19:43:42 -05:00
11e972c696 examples/ping: Client accepts pong 2025-11-19 19:34:02 -05:00
1cf9d47cae Add a diagram of a METADAPT-A MMB 2025-11-19 17:13:38 -05:00
dfbb087333 examples: Regenerate protocol files 2025-11-19 17:08:05 -05:00
647619a7f6 generate: It is no longer possible to make impossible type asserts with Receive 2025-11-19 16:25:31 -05:00
3b5a498aa5 Do not send extraneous chained MMBs 2025-11-19 15:16:44 -05:00
7d189df741 Test two consecutive METADAPT-A writers 2025-11-19 14:57:46 -05:00
5341563668 Test that METADAPT-A can tx/rx different messages consecutively 2025-11-19 14:42:29 -05:00
d2187cb953 examples/ping: Regenerate protocol.go 2025-11-19 13:14:25 -05:00
0ac34b2f22 generate: Don't create a new decoder for a possibly nil reader 2025-11-19 13:13:40 -05:00
3136dcbfdf internal/testutil: Add ConnRecorder which records net.Conn writes 2025-11-19 13:10:26 -05:00
ad930144cf generate: Test send, receive functions 2025-11-17 16:50:43 -05:00
5e965def7c internal/mock: Add mock transaction implementation 2025-11-17 16:49:09 -05:00
8add67c5de Test using readers and writers with a METADAPT-A connection 2025-11-16 16:01:06 -05:00
5f503021bf examples: More examples 2025-11-16 16:00:31 -05:00
21 changed files with 1025 additions and 79 deletions

Binary file not shown.

After

Width:  |  Height:  |  Size: 2.4 KiB

View File

@@ -34,8 +34,8 @@ type Conn interface {
} }
// Trans is a HOPP transaction. Methods of this interface are not safe for // Trans is a HOPP transaction. Methods of this interface are not safe for
// concurrent use with the exception of the Close and ID methods. The // concurrent use with the exception of the Write, SendWriter, Close, and ID
// recommended use case is one goroutine per transaction. // methods. The recommended use case is one goroutine per transaction.
type Trans interface { type Trans interface {
// Close closes the transaction. Any blocked operations will be // Close closes the transaction. Any blocked operations will be
// unblocked and return errors. This method is safe for concurrent use. // unblocked and return errors. This method is safe for concurrent use.
@@ -45,13 +45,13 @@ type Trans interface {
// unique within the connection. This method is safe for concurrent use. // unique within the connection. This method is safe for concurrent use.
ID() int64 ID() int64
// Send sends a message. This method is not safe for concurrent use. // Send sends a message. This method is safe for concurrent use.
Send(method uint16, data []byte) error Send(method uint16, data []byte) error
// SendWriter sends data written to an [io.Writer]. The writer must be // SendWriter sends data written to an [io.Writer]. The writer must be
// closed after use. Closing the writer flushes any data that hasn't // closed after use. Closing the writer flushes any data that hasn't
// been written yet. Any writer previously opened through this function // been written yet. Any writer previously opened through this function
// will be discarded. This method is not safe for concurrent use, and // will be discarded. This method is safe for concurrent use, but its
// neither is its result. // result isn't.
SendWriter(method uint16) (io.WriteCloser, error) SendWriter(method uint16) (io.WriteCloser, error)
// Receive receives a message. This method is not safe for concurrent // Receive receives a message. This method is not safe for concurrent
// use. // use.

View File

@@ -132,6 +132,8 @@ METADAPT-B is used over QUIC for communication over networks such as the
Internet. Internet.
### METADAPT-A ### METADAPT-A
![Diagram of a METADAPT-A MMB.](../assets/metadapt-a-mmb-diagram.png)
METADAPT-A requires a transport which offers a single full-duplex data stream METADAPT-A requires a transport which offers a single full-duplex data stream
that persists for the duration of the connection. All transactions are that persists for the duration of the connection. All transactions are
multiplexed onto this single stream. Each MMB contains a 12-octet long header, multiplexed onto this single stream. Each MMB contains a 12-octet long header,

86
examples/1proc/main.go Normal file
View File

@@ -0,0 +1,86 @@
package main
import "io"
import "log"
import "time"
import "errors"
import "context"
import "git.tebibyte.media/sashakoshka/hopp"
var network = "tcp"
var addr = "localhost:7959"
func main() {
go func() {
defer log.Println("SERVER closing")
listener, err := hopp.Listen(network, addr, nil)
if err != nil { log.Println("SERVER", err); return }
log.Println("SERVER listening")
conn, err := listener.Accept()
if err != nil { log.Println("SERVER", err); return }
defer conn.Close()
trans, err := conn.AcceptTrans()
if err != nil { log.Println("SERVER", err); return }
defer trans.Close()
for {
method, data, err := trans.Receive()
if err != nil { log.Println("SERVER", err); return }
log.Println("SERVER got", method, data)
log.Println("SERVER send", method, data)
err = trans.Send(1, data[:])
if err != nil { log.Println("SERVER", err); return }
}
}()
time.Sleep(time.Second * 2)
func() {
log.Println("CLIENT dialing")
conn, err := hopp.Dial(context.Background(), network, addr, nil)
if err != nil { log.Fatalln("CLIENT", err) }
log.Println("CLIENT dialed")
trans, err := conn.OpenTrans()
if err != nil {
log.Println("CLIENT", err)
return
}
go func() {
for {
method, data, err := trans.Receive()
if err != nil {
if !errors.Is(err, io.EOF) {
log.Printf("CLIENT failed to receive message: %v", err)
}
return
}
log.Println("CLIENT got", method, data)
}
}()
data := [1]byte { }
for {
log.Println("CLIENT send", 1, data)
err := trans.Send(1, data[:])
if err != nil {
log.Println("CLIENT", err)
return
}
data[0] ++
time.Sleep(time.Second)
}
log.Println("CLIENT waiting for connection close...")
trans, err = conn.AcceptTrans()
if !errors.Is(err, io.EOF) {
log.Println("CLIENT wrong error:", err)
log.Fatalln("CLIENT trans:", trans)
}
log.Println("CLIENT DONE")
conn.Close()
}()
}

View File

@@ -42,11 +42,11 @@ func main() {
break break
} }
switch message := message.(type) { switch message := message.(type) {
case *chat.MessageChat: case chat.MessageChat:
fmt.Fprintf(os.Stdout, "%s: %s\n", message.Nickname, message.Content) fmt.Fprintf(os.Stdout, "%s: %s\n", message.Nickname, message.Content)
case *chat.MessageJoinNotify: case chat.MessageJoinNotify:
fmt.Fprintf(os.Stdout, "(i) %s joined the room\n", message.Nickname) fmt.Fprintf(os.Stdout, "(i) %s joined the room\n", message.Nickname)
case *chat.MessageLeaveNotify: case chat.MessageLeaveNotify:
fmt.Fprintf(os.Stdout, "(i) %s left the room\n", message.Nickname) fmt.Fprintf(os.Stdout, "(i) %s left the room\n", message.Nickname)
} }
} }

View File

@@ -1,6 +1,6 @@
// Package chat implements a simple chat protocol over HOPP. To re-generate the // Package chat demonstrates a simple chat protocol.
// source files, run this command from within the root directory of the
// repository:
//
// go run ./cmd/hopp-generate examples/chat/protocol.pdl -o examples/chat/protocol/protocol.go
package chat package chat
// To use this in your own project, replace "go run ../../cmd/hopp-generate"
// with just "hopp-generate".
//go:generate go run ../../cmd/hopp-generate protocol.pdl -o protocol.go

View File

@@ -57,6 +57,25 @@ func boolInt(input bool) int {
// ensure ucontainer is always imported // ensure ucontainer is always imported
var _ hopp.Option[int] var _ hopp.Option[int]
// ReceivedMessage is a sealed interface representing the value of a
// message in this package. To determine what kind of message it is,
// use a type switch like this:
//
// switch message := message.(type) {
// case MessageError:
// doSomething()
// case MessageSuccess:
// doSomething()
// case MessageJoin:
// doSomething()
//
// ...
//
// }
type ReceivedMessage interface {
isReceivedMessage()
}
// Error is sent by a party when the other party has done something erroneous. The // Error is sent by a party when the other party has done something erroneous. The
// valid error codes are: // valid error codes are:
// //
@@ -124,6 +143,7 @@ func(this *MessageError) Decode(decoder *tape.Decoder) (n int, err error) {
n += nn; if err != nil { return n, err } n += nn; if err != nil { return n, err }
return n, nil return n, nil
} }
func (this MessageError) isReceivedMessage() { }
// Success is sent by a party when it has successfully completed a task given to it // Success is sent by a party when it has successfully completed a task given to it
// by the other party. The sending party must immediately close the transaction // by the other party. The sending party must immediately close the transaction
@@ -162,6 +182,7 @@ func(this *MessageSuccess) Decode(decoder *tape.Decoder) (n int, err error) {
n += nn; if err != nil { return n, err } n += nn; if err != nil { return n, err }
return n, nil return n, nil
} }
func (this MessageSuccess) isReceivedMessage() { }
// Join is sent by the client when it wishes to join a room. It must begin a new // Join is sent by the client when it wishes to join a room. It must begin a new
// transaction, and that transaction will persist while the user is in that room. // transaction, and that transaction will persist while the user is in that room.
@@ -231,6 +252,7 @@ func(this *MessageJoin) Decode(decoder *tape.Decoder) (n int, err error) {
n += nn; if err != nil { return n, err } n += nn; if err != nil { return n, err }
return n, nil return n, nil
} }
func (this MessageJoin) isReceivedMessage() { }
// Chat is sent by the client when it wishes to post a message to the room. It is // Chat is sent by the client when it wishes to post a message to the room. It is
// also relayed by the server to other clients to notify them of the message. It // also relayed by the server to other clients to notify them of the message. It
@@ -299,6 +321,7 @@ func(this *MessageChat) Decode(decoder *tape.Decoder) (n int, err error) {
n += nn; if err != nil { return n, err } n += nn; if err != nil { return n, err }
return n, nil return n, nil
} }
func (this MessageChat) isReceivedMessage() { }
// JoinNotify is sent by the server when another client joins the room. It must be // JoinNotify is sent by the server when another client joins the room. It must be
// sent within a room transaction. // sent within a room transaction.
@@ -351,6 +374,7 @@ func(this *MessageJoinNotify) Decode(decoder *tape.Decoder) (n int, err error) {
n += nn; if err != nil { return n, err } n += nn; if err != nil { return n, err }
return n, nil return n, nil
} }
func (this MessageJoinNotify) isReceivedMessage() { }
// LeaveNotify is sent by the server when another client leaves the room. It must // LeaveNotify is sent by the server when another client leaves the room. It must
// be sent within a room transaction. // be sent within a room transaction.
@@ -403,6 +427,7 @@ func(this *MessageLeaveNotify) Decode(decoder *tape.Decoder) (n int, err error)
n += nn; if err != nil { return n, err } n += nn; if err != nil { return n, err }
return n, nil return n, nil
} }
func (this MessageLeaveNotify) isReceivedMessage() { }
func decodeBranch_1d505103df99c95e6bed0800d0ea881a_MessageError(this *MessageError, decoder *tape.Decoder, tag tape.Tag) (n int, err error) { func decodeBranch_1d505103df99c95e6bed0800d0ea881a_MessageError(this *MessageError, decoder *tape.Decoder, tag tape.Tag) (n int, err error) {
var nn int var nn int
@@ -693,16 +718,11 @@ func decodeBranch_68c536511e6d598462efc482144438e9_MessageLeaveNotify(this *Mess
// Receive decodes a message from a transaction and returns it as a value. // Receive decodes a message from a transaction and returns it as a value.
// Use a type switch to determine what type of message it is. // Use a type switch to determine what type of message it is.
func Receive(trans hopp.Trans) (message any, n int, err error) { func Receive(trans hopp.Trans) (message ReceivedMessage, n int, err error) {
method, reader, err := trans.ReceiveReader() method, reader, err := trans.ReceiveReader()
decoder := tape.NewDecoder(reader)
if err != nil { return nil, n, err } if err != nil { return nil, n, err }
decoder := tape.NewDecoder(reader)
switch method { switch method {
case 0x0401:
var message MessageLeaveNotify
nn, err := message.Decode(decoder)
n += nn; if err != nil { return nil, n, err }
return message, n, nil
case 0x0000: case 0x0000:
var message MessageError var message MessageError
nn, err := message.Decode(decoder) nn, err := message.Decode(decoder)
@@ -728,6 +748,11 @@ func Receive(trans hopp.Trans) (message any, n int, err error) {
nn, err := message.Decode(decoder) nn, err := message.Decode(decoder)
n += nn; if err != nil { return nil, n, err } n += nn; if err != nil { return nil, n, err }
return message, n, nil return message, n, nil
case 0x0401:
var message MessageLeaveNotify
nn, err := message.Decode(decoder)
n += nn; if err != nil { return nil, n, err }
return message, n, nil
} }
return nil, n, fmt.Errorf("%w: M%04X", hopp.ErrUnknownMethod, method) return nil, n, fmt.Errorf("%w: M%04X", hopp.ErrUnknownMethod, method)
} }

View File

@@ -98,20 +98,27 @@ func (this *client) transTalk(trans hopp.Trans, initial *chat.MessageJoin) error
err := this.joinRoom(trans, room) err := this.joinRoom(trans, room)
if err != nil { return err } if err != nil { return err }
defer this.leaveRoom(trans, room) defer this.leaveRoom(trans, room)
_, err = chat.Send(trans, &chat.MessageChat {
Content: "(i) joined " + room,
Nickname: "SYSTEM",
})
if err != nil { return err }
for { for {
message, _, err := chat.Receive(trans) message, _, err := chat.Receive(trans)
if err != nil { return err } if err != nil { return err }
switch message := message.(type) { switch message := message.(type) {
case *chat.MessageChat: case chat.MessageChat:
err := this.handleMessageChat(trans, room, message) err := this.handleMessageChat(trans, room, message)
if err != nil { return err } if err != nil { return err }
case *chat.MessageError: case chat.MessageError:
return message return &message
} }
} }
} }
func (this *client) handleMessageChat(trans hopp.Trans, room string, message *chat.MessageChat) error { func (this *client) handleMessageChat(trans hopp.Trans, room string, message chat.MessageChat) error {
log.Printf("(). %s #%s: %s", this.nickname, room, message.Content) log.Printf("(). %s #%s: %s", this.nickname, room, message.Content)
clients, done := clients.RBorrow() clients, done := clients.RBorrow()
defer done() defer done()
@@ -124,11 +131,11 @@ func (this *client) handleMessageChat(trans hopp.Trans, room string, message *ch
return nil return nil
} }
func (this *client) relayMessage(room string, message *chat.MessageChat) error { func (this *client) relayMessage(room string, message chat.MessageChat) error {
rooms, done := this.rooms.RBorrow() rooms, done := this.rooms.RBorrow()
defer done() defer done()
if trans, ok := rooms[room]; ok { if trans, ok := rooms[room]; ok {
_, err := chat.Send(trans, message) _, err := chat.Send(trans, &message)
if err != nil { if err != nil {
return fmt.Errorf("could not relay message: %w", err) return fmt.Errorf("could not relay message: %w", err)
} }

View File

@@ -0,0 +1,84 @@
package main
import "io"
import "os"
import "log"
import "fmt"
import "time"
import "context"
import "git.tebibyte.media/sashakoshka/hopp"
// import "git.tebibyte.media/sashakoshka/hopp/examples/ping"
func main() {
name := os.Args[0]
if len(os.Args) != 2 {
fmt.Fprintf(os.Stderr, "Usage: %s HOST:PORT\n", name)
os.Exit(2)
}
address := os.Args[1]
conn, err := dial(address)
handleErr(1, err)
trans, err := conn.OpenTrans()
handleErr(1, err)
go func() {
defer fmt.Fprintf(os.Stdout, "(i) disconnected\n")
for {
// message, _, err := ping.Receive(trans)
// if err != nil {
// if !errors.Is(err, io.EOF) {
// handleErr(1, err)
// }
// break
// }
// switch message := message.(type) {
// case *ping.MessagePong:
// log.Printf("--> pong (%d) from %v", message, address)
// }
method, reader, err := trans.ReceiveReader()
if err != nil {
log.Printf("CLIENT recv: %v", err)
return
}
data, err := io.ReadAll(reader)
if err != nil {
log.Printf("CLIENT recv: %v", err)
return
}
log.Println("CLIENT got", method, data)
}
}()
// message := ping.MessagePing(0)
// for {
// log.Printf("<-- ping (%d)", message)
// _, err := ping.Send(trans, &message)
// handleErr(1, err)
// message ++
// time.Sleep(time.Second)
// }
data := [1]byte { }
for {
log.Println("CLIENT send", 1, data)
err := trans.Send(1, data[:])
handleErr(1, err)
data[0] ++
time.Sleep(time.Second)
}
}
func dial(address string) (hopp.Conn, error) {
ctx, done := context.WithTimeout(context.Background(), 16 * time.Second)
defer done()
conn, err := hopp.Dial(ctx, "tcp", address, nil)
if err != nil { return nil, err }
return conn, nil
}
func handleErr(code int, err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "%s: %v\n", os.Args[0], err)
os.Exit(code)
}
}

View File

@@ -0,0 +1,97 @@
package main
import "io"
import "os"
import "fmt"
import "log"
import "errors"
import "git.tebibyte.media/sashakoshka/hopp"
// import "git.tebibyte.media/sashakoshka/hopp/examples/ping"
var network = "tcp"
var addr = "localhost:7959"
func main() {
name := os.Args[0]
if len(os.Args) != 2 {
fmt.Fprintf(os.Stderr, "Usage: %s HOST:PORT\n", name)
os.Exit(2)
}
address := os.Args[1]
err := listen(address)
handleErr(1, err)
}
func listen(addr string) error {
defer log.Println("(i) closing")
listener, err := hopp.Listen(network, addr, nil)
if err != nil { return err }
log.Printf("(i) hosting on %s", addr)
for {
conn, err := listener.Accept()
if err != nil { return err }
go run(conn)
}
}
func run(conn hopp.Conn) {
log.Printf("-=E %v connected", conn.RemoteAddr())
defer log.Printf("X=- %v disconnected", conn.RemoteAddr())
defer conn.Close()
for {
trans, err := conn.AcceptTrans()
if err != nil {
if !errors.Is(err, io.EOF) {
log.Printf("XXX %v failed: %v", conn.RemoteAddr(), err)
}
return
}
go runTrans(conn, trans)
}
}
func runTrans(conn hopp.Conn, trans hopp.Trans) {
defer trans.Close()
for {
// message, _, err := ping.Receive(trans)
// if err != nil {
// if !errors.Is(err, io.EOF) {
// log.Printf("XXX failed to receive message: %v", err)
// }
// return
// }
// switch message := message.(type) {
// case *ping.MessagePing:
// log.Printf("--> ping (%d) from %v", message, conn.RemoteAddr())
// response := ping.MessagePong(*message)
// _, err := ping.Send(trans, &response)
// if err != nil {
// log.Printf("XXX failed to send message: %v", err)
// return
// }
// }
method, reader, err := trans.ReceiveReader()
if err != nil { log.Println("SERVER", err); return }
data, err := io.ReadAll(reader)
if err != nil { log.Println("SERVER", err); return }
log.Println("SERVER got", method, data)
log.Println("SERVER send", method, data)
func (){
writer, err := trans.SendWriter(1)
if err != nil { log.Println("SERVER", err); return }
defer writer.Close()
_, err = writer.Write(data[:])
if err != nil { log.Println("SERVER", err); return }
}()
}
}
func handleErr(code int, err error) {
if err != nil {
fmt.Fprintf(os.Stderr, "%s: %v\n", os.Args[0], err)
os.Exit(code)
}
}

View File

@@ -4,10 +4,9 @@ import "os"
import "io" import "io"
import "fmt" import "fmt"
import "log" import "log"
import "net"
import "time" import "time"
import "errors" import "errors"
// import "context" import "context"
import "git.tebibyte.media/sashakoshka/hopp" import "git.tebibyte.media/sashakoshka/hopp"
import "git.tebibyte.media/sashakoshka/hopp/examples/ping" import "git.tebibyte.media/sashakoshka/hopp/examples/ping"
@@ -25,6 +24,15 @@ func main() {
handleErr(1, err) handleErr(1, err)
go func() { go func() {
message := ping.MessagePing(0)
for _ = range time.Tick(time.Second) {
log.Printf("<-- ping (%d)", message)
_, err := ping.Send(trans, &message)
handleErr(1, err)
message ++
}
}()
defer fmt.Fprintf(os.Stdout, "(i) disconnected\n") defer fmt.Fprintf(os.Stdout, "(i) disconnected\n")
for { for {
message, _, err := ping.Receive(trans) message, _, err := ping.Receive(trans)
@@ -35,31 +43,17 @@ func main() {
break break
} }
switch message := message.(type) { switch message := message.(type) {
case *ping.MessagePong: case ping.MessagePong:
log.Printf("--> pong (%d) from %v", message, address) log.Printf("--> pong (%d) from %v", message, address)
} }
} }
}()
message := ping.MessagePing(0)
for {
log.Printf("<-- ping (%d)", message)
_, err := ping.Send(trans, &message)
handleErr(1, err)
message ++
time.Sleep(time.Second)
}
} }
func dial(address string) (hopp.Conn, error) { func dial(address string) (hopp.Conn, error) {
// ctx, done := context.WithTimeout(context.Background(), 16 * time.Second) ctx, done := context.WithTimeout(context.Background(), 16 * time.Second)
// defer done() defer done()
// conn, err := hopp.Dial(ctx, "tcp", address, nil) conn, err := hopp.Dial(ctx, "tcp", address, nil)
// if err != nil { return nil, err }
// return conn, nil
underlying, err := net.Dial("tcp", address)
if err != nil { return nil, err } if err != nil { return nil, err }
conn := hopp.AdaptA(underlying, hopp.ServerSide)
return conn, nil return conn, nil
} }

6
examples/ping/doc.go Normal file
View File

@@ -0,0 +1,6 @@
// Example ping demonstrates a simple ping/pong protocol.
package ping
// To use this in your own project, replace "go run ../../cmd/hopp-generate"
// with just "hopp-generate".
//go:generate go run ../../cmd/hopp-generate protocol.pdl -o protocol.go

View File

@@ -57,6 +57,20 @@ func boolInt(input bool) int {
// ensure ucontainer is always imported // ensure ucontainer is always imported
var _ hopp.Option[int] var _ hopp.Option[int]
// ReceivedMessage is a sealed interface representing the value of a
// message in this package. To determine what kind of message it is,
// use a type switch like this:
//
// switch message := message.(type) {
// case MessagePing:
// doSomething()
// case MessagePong:
// doSomething()
// }
type ReceivedMessage interface {
isReceivedMessage()
}
// Ping is sent by the client to the server. It may contain any number. This // Ping is sent by the client to the server. It may contain any number. This
// number will be returned to the client via a [Pong] message. // number will be returned to the client via a [Pong] message.
type MessagePing int32 type MessagePing int32
@@ -88,6 +102,7 @@ func(this *MessagePing) Decode(decoder *tape.Decoder) (n int, err error) {
*this = MessagePing(destination_2) *this = MessagePing(destination_2)
return n, nil return n, nil
} }
func (this MessagePing) isReceivedMessage() { }
// Pong is sent by the server to the client in response to a [Ping] message, It // Pong is sent by the server to the client in response to a [Ping] message, It
// will contain the same number as that message. // will contain the same number as that message.
@@ -120,13 +135,14 @@ func(this *MessagePong) Decode(decoder *tape.Decoder) (n int, err error) {
*this = MessagePong(destination_4) *this = MessagePong(destination_4)
return n, nil return n, nil
} }
func (this MessagePong) isReceivedMessage() { }
// Receive decodes a message from a transaction and returns it as a value. // Receive decodes a message from a transaction and returns it as a value.
// Use a type switch to determine what type of message it is. // Use a type switch to determine what type of message it is.
func Receive(trans hopp.Trans) (message any, n int, err error) { func Receive(trans hopp.Trans) (message ReceivedMessage, n int, err error) {
method, reader, err := trans.ReceiveReader() method, reader, err := trans.ReceiveReader()
decoder := tape.NewDecoder(reader)
if err != nil { return nil, n, err } if err != nil { return nil, n, err }
decoder := tape.NewDecoder(reader)
switch method { switch method {
case 0x0000: case 0x0000:
var message MessagePing var message MessagePing

View File

@@ -57,9 +57,9 @@ func runTrans(conn hopp.Conn, trans hopp.Trans) {
return return
} }
switch message := message.(type) { switch message := message.(type) {
case *ping.MessagePing: case ping.MessagePing:
log.Printf("--> ping (%d) from %v", message, conn.RemoteAddr()) log.Printf("--> ping (%d) from %v", message, conn.RemoteAddr())
response := ping.MessagePong(*message) response := ping.MessagePong(message)
_, err := ping.Send(trans, &response) _, err := ping.Send(trans, &response)
if err != nil { if err != nil {
log.Printf("XXX failed to send message: %v", err) log.Printf("XXX failed to send message: %v", err)

View File

@@ -114,6 +114,8 @@ func (this *Generator) Generate(protocol *Protocol) (n int, err error) {
n += nn; if err != nil { return n, err } n += nn; if err != nil { return n, err }
// type definitions // type definitions
nn, err = this.generateMessageValueInterface()
n += nn; if err != nil { return n, err }
for _, name := range slices.Sorted(maps.Keys(protocol.Types)) { for _, name := range slices.Sorted(maps.Keys(protocol.Types)) {
nn, err := this.generateTypedef(name, protocol.Types[name]) nn, err := this.generateTypedef(name, protocol.Types[name])
n += nn; if err != nil { return n, err } n += nn; if err != nil { return n, err }
@@ -140,6 +142,42 @@ func (this *Generator) Generate(protocol *Protocol) (n int, err error) {
return n, nil return n, nil
} }
func (this *Generator) generateMessageValueInterface() (n int, err error) {
keys := slices.Sorted(maps.Keys(this.protocol.Messages))
nn, err := this.iprint(
"\n// ReceivedMessage is a sealed interface representing the value of a\n" +
"// message in this package. To determine what kind of message it is,\n" +
"// use a type switch like this:\n" +
"// \n" +
"// switch message := message.(type) {\n")
n += nn; if err != nil { return n, err }
for index, method := range keys {
if index > 2 {
nn, err := this.iprint("// \n// ...\n// \n")
n += nn; if err != nil { return n, err }
break
}
nn, err := this.iprintf("// case %s:\n",
this.resolveMessageName(this.protocol.Messages[method].Name))
n += nn; if err != nil { return n, err }
nn, err = this.iprintf("// doSomething()\n")
n += nn; if err != nil { return n, err }
}
nn, err = this.iprintf("// }\n")
n += nn; if err != nil { return n, err }
nn, err = this.iprintf("type ReceivedMessage interface {\n")
n += nn; if err != nil { return n, err }
this.push()
nn, err = this.iprintf("isReceivedMessage()\n")
n += nn; if err != nil { return n, err }
this.pop()
nn, err = this.iprintf("}\n")
n += nn; if err != nil { return n, err }
return n, nil
}
func (this *Generator) generateTypedef(name string, typedef Typedef) (n int, err error) { func (this *Generator) generateTypedef(name string, typedef Typedef) (n int, err error) {
typ := typedef.Type typ := typedef.Type
@@ -328,6 +366,10 @@ func (this *Generator) generateMessage(method uint16, message Message) (n int, e
nn, err = this.iprintf("}\n") nn, err = this.iprintf("}\n")
n += nn; if err != nil { return n, err } n += nn; if err != nil { return n, err }
// isReceivedMessage method
nn, err = this.iprintf("func (this %s) isReceivedMessage() { }\n", this.resolveMessageName(message.Name))
n += nn; if err != nil { return n, err }
return n, nil return n, nil
} }
@@ -1207,15 +1249,15 @@ func (this *Generator) generateReceive() (n int, err error) {
"// Use a type switch to determine what type of message it is.\n") "// Use a type switch to determine what type of message it is.\n")
n += nn; if err != nil { return n, err } n += nn; if err != nil { return n, err }
nn, err = this.iprintf( nn, err = this.iprintf(
"func Receive(trans hopp.Trans) (message any, n int, err error) {\n") "func Receive(trans hopp.Trans) (message ReceivedMessage, n int, err error) {\n")
n += nn; if err != nil { return n, err } n += nn; if err != nil { return n, err }
this.push() this.push()
nn, err = this.iprintf("method, reader, err := trans.ReceiveReader()\n") nn, err = this.iprintf("method, reader, err := trans.ReceiveReader()\n")
n += nn; if err != nil { return n, err } n += nn; if err != nil { return n, err }
nn, err = this.iprintf("decoder := tape.NewDecoder(reader)\n")
n += nn; if err != nil { return n, err }
nn, err = this.iprintf("if err != nil { return nil, n, err }\n") nn, err = this.iprintf("if err != nil { return nil, n, err }\n")
n += nn; if err != nil { return n, err } n += nn; if err != nil { return n, err }
nn, err = this.iprintf("decoder := tape.NewDecoder(reader)\n")
n += nn; if err != nil { return n, err }
nn, err = this.iprintf("switch method {\n") nn, err = this.iprintf("switch method {\n")
n += nn; if err != nil { return n, err } n += nn; if err != nil { return n, err }
for method, message := range this.protocol.Messages { for method, message := range this.protocol.Messages {

View File

@@ -7,8 +7,10 @@ import "testing"
// generator is equal to something specific // generator is equal to something specific
var exampleProtocol = defaultProtocol() var exampleProtocol = defaultProtocol()
var pingProtocol = defaultProtocol()
func init() { func init() {
// example protocol
exampleProtocol.Messages[0x0000] = Message { exampleProtocol.Messages[0x0000] = Message {
Name: "Connect", Name: "Connect",
Type: TypeTableDefined { Type: TypeTableDefined {
@@ -121,6 +123,16 @@ func init() {
}, },
}, },
} }
// ping protocol
pingProtocol.Messages[0x0000] = Message {
Name: "Ping",
Type: TypeInt { Bits: 32, Signed: true },
}
pingProtocol.Messages[0x0001] = Message {
Name: "Pong",
Type: TypeInt { Bits: 32, Signed: true },
}
} }
func TestGenerateRunEncodeDecode(test *testing.T) { func TestGenerateRunEncodeDecode(test *testing.T) {
@@ -485,3 +497,119 @@ func TestGenerateRunDecodeWrongType(test *testing.T) {
} }
`) `)
} }
func TestGenerateRunSendReceive(test *testing.T) {
testGenerateRun(test, &pingProtocol, "send-receive", `
// imports
import "git.tebibyte.media/sashakoshka/hopp/internal/mock"
`, `
log.Println("Send"); {
message := MessagePing(77)
trans := mock.Trans { }
_, err := Send(&trans, &message)
if err != nil { log.Fatal(err) }
gotMethod, gotPayload := trans.Methods[0], trans.Messages[0]
log.Printf("method M%04X", gotMethod)
log.Println("payload", tu.HexBytes(gotPayload))
if gotMethod != 0x0000 {
log.Fatalln("wrong method")
}
if ok, n := snake.L(0x43, 0x00, 0x00, 0x00, 0x4D).Check(gotPayload); !ok {
log.Fatalln("not equal at:", n)
}
}
log.Println("Receive"); {
trans := mock.Trans {
Methods: []uint16 { 1 },
Messages: [][]byte { []byte { 0x43, 0x00, 0x00, 0x00, 0x4E } },
}
gotMessage, n, err := Receive(&trans)
if err != nil { log.Fatal(err) }
log.Println("message", gotMessage)
log.Println("n", n)
casted, ok := gotMessage.(MessagePong)
if !ok { log.Fatalln("expected MessagePong") }
if casted != 78 { log.Fatalln("wrong message value") }
if n != 5 { log.Fatalln("wrong n value") }
}
`)
}
func TestGenerateRunConn(test *testing.T) {
testGenerateRun(test, &pingProtocol, "send-receive", `
// imports
import "sync"
import "context"
import "git.tebibyte.media/sashakoshka/hopp"
`, `
group := sync.WaitGroup { }
group.Add(2)
// server
listener, err := hopp.Listen("tcp", "localhost:43957", nil)
if err != nil { log.Fatalln("SERVER listen:", err) }
go func() {
defer listener.Close()
defer group.Done()
conn, err := listener.Accept()
if err != nil { log.Fatalln("SERVER accept:", err) }
trans, err := conn.AcceptTrans()
if err != nil { log.Fatalln("SERVER accept trans:", err) }
message, n, err := Receive(trans)
if err != nil { log.Fatalln("SERVER receive:", err) }
log.Println("SERVER got message", message)
log.Println("SERVER got n", n)
casted, ok := message.(MessagePing)
if !ok { log.Fatalln("SERVER expected MessagePong") }
if casted != 77 { log.Fatalln("SERVER wrong message value") }
if n != 5 { log.Fatalln("SERVER wrong n value") }
message, n, err = Receive(trans)
if err != nil { log.Fatalln("SERVER receive:", err) }
log.Println("SERVER got message", message)
log.Println("SERVER got n", n)
casted, ok = message.(MessagePing)
if !ok { log.Fatalln("SERVER expected MessagePong") }
if casted != 78 { log.Fatalln("SERVER wrong message value") }
if n != 5 { log.Fatalln("SERVER wrong n value") }
}()
// client
go func() {
defer group.Done()
log.Println("CLIENT dialing")
conn, err := hopp.Dial(
context.Background(),
"tcp", "localhost:43957",
nil)
if err != nil { log.Fatalln("CLIENT dial:", err) }
defer conn.Close()
log.Println("CLIENT connected")
log.Println("CLIENT opening trans")
trans, err := conn.OpenTrans()
if err != nil { log.Fatalln("CLIENT open trans:", err) }
message := MessagePing(77)
log.Println("CLIENT sending message")
n, err := Send(trans, &message)
if err != nil { log.Fatalln("CLIENT send:", err) }
log.Println("CLIENT sent n", n)
if n != 5 { log.Fatalln("CLIENT wrong n value") }
message = MessagePing(78)
log.Println("CLIENT sending message")
n, err = Send(trans, &message)
if err != nil { log.Fatalln("CLIENT send:", err) }
log.Println("CLIENT sent n", n)
if n != 5 { log.Fatalln("CLIENT wrong n value") }
}()
group.Wait()
`)
}

View File

@@ -0,0 +1,64 @@
package mock
import "io"
import "time"
import "bytes"
import "git.tebibyte.media/sashakoshka/hopp"
var _ hopp.Trans = new(Trans)
// Trans is a mock transaction implementation.
type Trans struct {
// These arrays must be the same length. You can load this up
// with messages to read, or deposit messages and retrieve them
// here later.
Methods []uint16
Messages [][]byte
}
func (this *Trans) Close() error { return nil }
func (this *Trans) ID() int64 { return 56 }
func (this *Trans) Send(method uint16, data []byte) error {
this.Methods = append(this.Methods, method)
this.Messages = append(this.Messages, data)
return nil
}
func (this *Trans) SendWriter(method uint16) (io.WriteCloser, error) {
return &transWriter {
Buffer: new(bytes.Buffer),
method: method,
parent: this,
}, nil
}
func (this *Trans) Receive() (method uint16, data []byte, err error) {
if len(this.Methods) == 0 {
return 0, nil, io.EOF
}
method = this.Methods[0]
data = this.Messages[0]
this.Methods = this.Methods[1:]
this.Messages = this.Messages[1:]
return method, data, nil
}
func (this *Trans) ReceiveReader() (method uint16, reader io.Reader, err error) {
method, data, err := this.Receive()
if err != nil { return 0, nil, err }
return method, bytes.NewBuffer(data), nil
}
func (this *Trans) SetDeadline(time.Time) error { return nil }
type transWriter struct {
*bytes.Buffer
method uint16
parent *Trans
}
func (this *transWriter) Close() error {
return this.parent.Send(this.method, this.Bytes())
}

View File

@@ -0,0 +1,45 @@
package testutil
import "net"
import "fmt"
import "strings"
var _ net.Conn = new(ConnRecorder)
// ConnRecorder records write/flush actions performed on a net.Conn.
type ConnRecorder struct {
net.Conn
// A []byte means data was written, and untyped nil
// means data was flushed.
Log []any
}
func RecordConn(underlying net.Conn) *ConnRecorder {
return &ConnRecorder {
Conn: underlying,
}
}
func (this *ConnRecorder) Write(data []byte) (n int, err error) {
this.Log = append(this.Log, data)
return len(data), nil
}
func (this *ConnRecorder) Flush() error {
this.Log = append(this.Log, nil)
return nil
}
func (this *ConnRecorder) Dump() string {
builder := strings.Builder { }
for index, item := range this.Log {
fmt.Fprintf(&builder, "%06d ", index)
switch item := item.(type) {
case nil:
fmt.Fprintln(&builder, "FLUSH")
case []byte:
fmt.Fprintln(&builder, HexBytes(item))
}
}
return builder.String()
}

View File

@@ -0,0 +1,44 @@
package testutil
import "net"
import "testing"
func TestConnRecorder(test *testing.T) {
// server
listener, err := net.Listen("tcp", "localhost:9999")
if err != nil { test.Fatal(err) }
defer listener.Close()
go func() {
conn, err := listener.Accept()
defer conn.Close()
if err != nil { test.Fatal(err) }
buf := [16]byte { }
for {
_, err := conn.Read(buf[:])
if err != nil { break }
}
}()
// client
conn, err := net.Dial("tcp", "localhost:9999")
if err != nil { test.Fatal(err) }
defer conn.Close()
recorder := RecordConn(conn)
_, err = recorder.Write([]byte("hello"))
if err != nil { test.Fatal(err) }
_, err = recorder.Write([]byte("world!"))
if err != nil { test.Fatal(err) }
err = recorder.Flush()
if err != nil { test.Fatal(err) }
test.Log("GOT:\n" + recorder.Dump())
if len(recorder.Log) != 3 { test.Fatal("wrong length") }
if string(recorder.Log[0].([]byte)) != "hello" {
test.Fatal("not equal")
}
if string(recorder.Log[1].([]byte)) != "world!" {
test.Fatal("not equal")
}
}

View File

@@ -6,6 +6,7 @@ import "fmt"
import "net" import "net"
import "sync" import "sync"
import "time" import "time"
import "bytes"
import "context" import "context"
import "sync/atomic" import "sync/atomic"
import "git.tebibyte.media/sashakoshka/go-util/sync" import "git.tebibyte.media/sashakoshka/go-util/sync"
@@ -17,6 +18,12 @@ const closeMethod = 0xFFFF
const int64Max = int64((^uint64(0)) >> 1) const int64Max = int64((^uint64(0)) >> 1)
const defaultChunkSize = 0x1000 const defaultChunkSize = 0x1000
var bufferPool = sync.Pool {
New: func() any {
return &bytes.Buffer { }
},
}
// Party represents a side of a connection. // Party represents a side of a connection.
type Party bool; const ( type Party bool; const (
ServerSide Party = false ServerSide Party = false
@@ -222,12 +229,12 @@ type transA struct {
parent *a parent *a
id int64 id int64
incoming usync.Gate[incomingMessage] incoming usync.Gate[incomingMessage]
currentReader io.Reader
currentWriter io.Closer
writeBuffer []byte
closed atomic.Bool closed atomic.Bool
closeErr error closeErr error
currentReader io.Reader
currentWriter usync.Monitor[io.Closer]
deadline *time.Timer deadline *time.Timer
deadlineLock sync.Mutex deadlineLock sync.Mutex
} }
@@ -263,23 +270,24 @@ func (this *transA) Send(method uint16, data []byte) error {
} }
func (this *transA) SendWriter(method uint16) (io.WriteCloser, error) { func (this *transA) SendWriter(method uint16) (io.WriteCloser, error) {
currentWriter, done := this.currentWriter.BorrowReturn()
defer done(&currentWriter)
// close previous writer if necessary // close previous writer if necessary
if this.currentWriter != nil { if currentWriter != nil {
this.currentWriter.Close() currentWriter.Close()
this.currentWriter = nil currentWriter = nil
} }
// create new writer // create new writer
writer := &writerA { writer := &writerA {
parent: this, parent: this,
// there is only ever one writer at a time, so they can all buffer: bufferPool.Get().(*bytes.Buffer),
// share a buffer
buffer: this.writeBuffer[:0],
method: method, method: method,
chunkSize: defaultChunkSize, chunkSize: defaultChunkSize,
open: true, open: true,
} }
this.currentWriter = writer currentWriter = writer
return writer, nil return writer, nil
} }
@@ -406,7 +414,7 @@ func (this *readerA) Read(buffer []byte) (int, error) {
type writerA struct { type writerA struct {
parent *transA parent *transA
buffer []byte buffer *bytes.Buffer
method uint16 method uint16
chunkSize int64 chunkSize int64
open bool open bool
@@ -425,8 +433,18 @@ func (this *writerA) Write(data []byte) (n int, err error) {
} }
func (this *writerA) Close() error { func (this *writerA) Close() error {
if this.buffer != nil {
// flush if needed
if this.buffer.Len() > 0 {
this.flush(0) this.flush(0)
}
this.open = false this.open = false
// reset the buffer and put it back in the pool
this.buffer.Reset()
bufferPool.Put(this.buffer)
this.buffer = nil
}
return nil return nil
} }
@@ -434,11 +452,11 @@ func (this *writerA) writeOne(data []byte) (n int, err error) {
data = data[:min(len(data), int(this.chunkSize))] data = data[:min(len(data), int(this.chunkSize))]
// if there is more room, append to the buffer and exit // if there is more room, append to the buffer and exit
if int64(len(this.buffer) + len(data)) <= this.chunkSize { if int64(this.buffer.Len() + len(data)) <= this.chunkSize {
this.buffer = append(this.buffer, data...) this.buffer.Write(data)
n = len(data) n = len(data)
// if have a full chunk, flush // if have a full chunk, flush
if int64(len(this.buffer)) == this.chunkSize { if int64(this.buffer.Len()) == this.chunkSize {
err = this.flush(1) err = this.flush(1)
if err != nil { return n, err } if err != nil { return n, err }
} }
@@ -448,12 +466,17 @@ func (this *writerA) writeOne(data []byte) (n int, err error) {
// if not, flush and store as much as we can in the buffer // if not, flush and store as much as we can in the buffer
err = this.flush(1) err = this.flush(1)
if err != nil { return n, err } if err != nil { return n, err }
this.buffer = append(this.buffer, data...) n = int(min(int64(len(data)), this.chunkSize))
this.buffer.Write(data[:n])
return n, nil return n, nil
} }
func (this *writerA) flush(ccb uint64) error { func (this *writerA) flush(ccb uint64) error {
return this.parent.parent.sendMessageSafe(this.parent.id, this.method, ccb, this.buffer) err := this.parent.parent.sendMessageSafe(
this.parent.id, this.method, ccb,
this.buffer.Bytes())
this.buffer.Reset()
return err
} }
type incomingMessage struct { type incomingMessage struct {

View File

@@ -1,11 +1,14 @@
package hopp package hopp
import "io" import "io"
import "net"
import "sync"
import "bytes" import "bytes"
import "errors" import "errors"
import "slices" import "slices"
import "testing" import "testing"
import "context" import "context"
import tu "git.tebibyte.media/sashakoshka/hopp/internal/testutil"
// some of these tests spawn goroutines that can signal a failure. // some of these tests spawn goroutines that can signal a failure.
// abide by the documentation for testing.T (https://pkg.go.dev/testing#T): // abide by the documentation for testing.T (https://pkg.go.dev/testing#T):
@@ -129,6 +132,67 @@ func TestTransOpenCloseA(test *testing.T) {
clientServerEnvironment(test, clientFunc, serverFunc) clientServerEnvironment(test, clientFunc, serverFunc)
} }
func TestReadWriteA(test *testing.T) {
payloads := []string {
"hello",
"world",
"When the impostor is sus!",
}
clientFunc := func(a Conn) {
test.Log("CLIENT accepting transaction")
trans, err := a.AcceptTrans()
if err != nil { test.Fatal("CLIENT", err) }
test.Log("CLIENT accepted transaction")
test.Cleanup(func() { trans.Close() })
for method, payload := range payloads {
test.Log("CLIENT waiting...")
gotMethod, gotReader, err := trans.ReceiveReader()
if err != nil { test.Fatal("CLIENT", err) }
gotPayloadBytes, err := io.ReadAll(gotReader)
if err != nil { test.Fatal("CLIENT", err) }
gotPayload := string(gotPayloadBytes)
test.Log("CLIENT m:", gotMethod, "p:", tu.HexBytes(gotPayloadBytes))
if int(gotMethod) != method {
test.Error("CLIENT method not equal, expected", method)
}
if gotPayload != payload {
test.Error(
"CLIENT payload not equal, expected",
tu.HexBytes([]byte(payload)))
}
}
test.Log("CLIENT waiting for transaction close...")
gotMethod, gotPayload, err := trans.Receive()
if !errors.Is(err, io.EOF) {
test.Error("CLIENT wrong error:", err)
test.Error("CLIENT method:", gotMethod)
test.Error("CLIENT payload:", tu.HexBytes(gotPayload))
test.Fatal("CLIENT (expected io.EOF and no message)")
}
test.Log("CLIENT transaction has closed")
}
serverFunc := func(a Conn) {
defer test.Log("SERVER closing connection")
trans, err := a.OpenTrans()
if err != nil { test.Error("SERVER", err); return }
test.Cleanup(func() { trans.Close() })
for method, payload := range payloads {
test.Log("SERVER m:", method, "p:", tu.HexBytes([]byte(payload)))
func() {
writer, err := trans.SendWriter(uint16(method))
if err != nil { test.Error("SERVER", err); return }
defer writer.Close()
_, err = writer.Write([]byte(payload))
if err != nil { test.Error("SERVER", err); return }
}()
}
}
clientServerEnvironment(test, clientFunc, serverFunc)
}
func TestEncodeMessageA(test *testing.T) { func TestEncodeMessageA(test *testing.T) {
buffer := new(bytes.Buffer) buffer := new(bytes.Buffer)
payload := []byte { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05 } payload := []byte { 0x00, 0x01, 0x02, 0x03, 0x04, 0x05 }
@@ -212,6 +276,225 @@ func TestEncodeDecodeMessageA(test *testing.T) {
} }
} }
func TestConsecutiveSend(test *testing.T) {
packets := [][]byte {
[]byte {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05,
0x43, 0x00, 0x00, 0x00, 0x07 },
[]byte {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05,
0x43, 0x00, 0x00, 0x00, 0x08 },
}
payloads := [][]byte {
[]byte { 0x43, 0x00, 0x00, 0x00, 0x07 },
[]byte { 0x43, 0x00, 0x00, 0x00, 0x08 },
}
var group sync.WaitGroup
group.Add(2)
// server
listener, err := net.Listen("tcp", "localhost:9999")
if err != nil { test.Fatal("SERVER", err) }
go func() {
defer group.Done()
defer listener.Close()
conn, err := listener.Accept()
if err != nil { test.Fatal("SERVER", err) }
defer conn.Close()
buf := [16]byte { }
for {
_, err := conn.Read(buf[:])
if err != nil { break }
}
}()
// client
go func() {
defer group.Done()
conn, err := net.Dial("tcp", "localhost:9999")
if err != nil { test.Fatal("CLIENT", err) }
defer conn.Close()
recorder := tu.RecordConn(conn)
a := AdaptA(recorder, ClientSide)
trans, err := a.OpenTrans()
if err != nil { test.Fatal("CLIENT", err) }
for _, payload := range payloads {
err := trans.Send(0x0000, payload)
if err != nil { test.Fatal("CLIENT", err) }
}
test.Log("CLIENT recorded output:\n" + recorder.Dump())
if len(recorder.Log) != 2 { test.Fatal("wrong length") }
if !slices.Equal(recorder.Log[0].([]byte), packets[0]) {
test.Fatal("not equal")
}
if !slices.Equal(recorder.Log[1].([]byte), packets[1]) {
test.Fatal("not equal")
}
}()
group.Wait()
}
func TestConsecutiveWrite(test *testing.T) {
packets := [][]byte {
[]byte {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05,
0x43, 0x00, 0x00, 0x00, 0x07 },
[]byte {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05,
0x43, 0x00, 0x00, 0x00, 0x08 },
}
payloads := [][]byte {
[]byte { 0x43, 0x00, 0x00, 0x00, 0x07 },
[]byte { 0x43, 0x00, 0x00, 0x00, 0x08 },
}
var group sync.WaitGroup
group.Add(2)
// server
listener, err := net.Listen("tcp", "localhost:9999")
if err != nil { test.Fatal("SERVER", err) }
go func() {
defer group.Done()
defer listener.Close()
conn, err := listener.Accept()
if err != nil { test.Fatal("SERVER", err) }
defer conn.Close()
buf := [16]byte { }
for {
_, err := conn.Read(buf[:])
if err != nil { break }
}
}()
// client
go func() {
defer group.Done()
conn, err := net.Dial("tcp", "localhost:9999")
if err != nil { test.Fatal("CLIENT", err) }
defer conn.Close()
recorder := tu.RecordConn(conn)
a := AdaptA(recorder, ClientSide)
trans, err := a.OpenTrans()
if err != nil { test.Fatal("CLIENT", err) }
for _, payload := range payloads {
func() {
writer, err := trans.SendWriter(0x0000)
if err != nil { test.Fatal("CLIENT", err) }
_, err = writer.Write(payload)
if err != nil { test.Fatal("CLIENT", err) }
writer.Close()
}()
}
test.Log("CLIENT recorded output:\n" + recorder.Dump())
if len(recorder.Log) != 2 { test.Fatal("wrong length") }
if !slices.Equal(recorder.Log[0].([]byte), packets[0]) {
test.Fatal("not equal")
}
if !slices.Equal(recorder.Log[1].([]byte), packets[1]) {
test.Fatal("not equal")
}
}()
group.Wait()
}
func TestConsecutiveReceive(test *testing.T) {
stream := []byte {
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05,
0x43, 0x00, 0x00, 0x00, 0x07,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x01,
0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x05,
0x43, 0x00, 0x00, 0x00, 0x08,
}
payloads := [][]byte {
[]byte { 0x43, 0x00, 0x00, 0x00, 0x07 },
[]byte { 0x43, 0x00, 0x00, 0x00, 0x08 },
}
var group sync.WaitGroup
group.Add(2)
// server
listener, err := net.Listen("tcp", "localhost:9999")
if err != nil { test.Fatal("SERVER", err) }
go func() {
defer group.Done()
defer listener.Close()
conn, err := listener.Accept()
if err != nil { test.Fatal("SERVER", err) }
defer conn.Close()
a := AdaptA(conn, ServerSide)
trans, err := a.AcceptTrans()
if err != nil { test.Fatal("SERVER", err) }
index := 0
for {
method, data, err := trans.Receive()
if err != nil {
if !errors.Is(err, io.EOF) {
test.Fatal("SERVER", err)
}
break
}
test.Logf("SERVER GOT: M%04X %s", method, tu.HexBytes(data))
if index >= len(payloads) {
test.Fatalf(
"SERVER we weren't supposed to receive %d messages",
index + 1)
}
if method != 0 {
test.Fatal("SERVER", "method not equal")
}
if !slices.Equal(data, payloads[index]) {
test.Fatal("SERVER", "data not equal")
}
index ++
}
if index != len(payloads) {
test.Fatalf(
"SERVER we weren't supposed to receive %d messages",
index + 1)
}
}()
// client
go func() {
defer group.Done()
conn, err := net.Dial("tcp", "localhost:9999")
if err != nil { test.Fatal("CLIENT", err) }
defer conn.Close()
_, err = conn.Write(stream)
if err != nil { test.Fatal("CLIENT", err) }
}()
group.Wait()
}
func clientServerEnvironment(test *testing.T, clientFunc func(conn Conn), serverFunc func(conn Conn)) { func clientServerEnvironment(test *testing.T, clientFunc func(conn Conn), serverFunc func(conn Conn)) {
network := "tcp" network := "tcp"
addr := "localhost:7959" addr := "localhost:7959"