Compare commits

..

No commits in common. "1b43b9268741aaa8aa0ed93d95efcdde0eb8b3f5" and "3daa66c4bceee6aaf45a5d533cb557cfaf5e3cee" have entirely different histories.

5 changed files with 25 additions and 33 deletions

View File

@ -1,11 +1,9 @@
package main package main
import "os" import "os"
import "io"
import "fmt" import "fmt"
import "time" import "time"
import "bufio" import "bufio"
import "errors"
import "context" import "context"
import "crypto/tls" import "crypto/tls"
import "git.tebibyte.media/sashakoshka/hopp" import "git.tebibyte.media/sashakoshka/hopp"
@ -24,7 +22,6 @@ func main() {
} }
trans, err := join(address, room, nickname) trans, err := join(address, room, nickname)
handleErr(1, err) handleErr(1, err)
fmt.Fprintf(os.Stdout, "(i) connected to %s/%s\n", address, room)
go func() { go func() {
reader := bufio.NewReader(os.Stdin) reader := bufio.NewReader(os.Stdin)
for { for {
@ -35,12 +32,7 @@ func main() {
}() }()
for { for {
message, _, err := chat.Receive(trans) message, _, err := chat.Receive(trans)
if err != nil { handleErr(1, err)
if !errors.Is(err, io.EOF) {
handleErr(1, err)
}
break
}
switch message := message.(type) { switch message := message.(type) {
case *chat.MessageChat: case *chat.MessageChat:
fmt.Fprintf(os.Stdout, "%s: %s\n", message.Nickname, message.Content) fmt.Fprintf(os.Stdout, "%s: %s\n", message.Nickname, message.Content)
@ -50,7 +42,6 @@ func main() {
fmt.Fprintf(os.Stdout, "(i) %s left the room\n", message.Nickname) fmt.Fprintf(os.Stdout, "(i) %s left the room\n", message.Nickname)
} }
} }
fmt.Fprintf(os.Stdout, "(i) disconnected\n")
} }
func join(address string, room string, nickname string) (hopp.Trans, error) { func join(address string, room string, nickname string) (hopp.Trans, error) {

View File

@ -1,7 +1,6 @@
package main package main
import "os" import "os"
import "io"
import "fmt" import "fmt"
import "log" import "log"
import "errors" import "errors"
@ -49,7 +48,7 @@ func host(address string, certPath, keyPath string) error {
type client struct { type client struct {
conn hopp.Conn conn hopp.Conn
nickname string nickname hopp.Option[string]
rooms usync.RWMonitor[map[string] hopp.Trans] rooms usync.RWMonitor[map[string] hopp.Trans]
} }
@ -59,13 +58,13 @@ func (this *client) run() {
defer this.conn.Close() defer this.conn.Close()
for { for {
log.Println("accepting transaction")
trans, err := this.conn.AcceptTrans() trans, err := this.conn.AcceptTrans()
if err != nil { if err != nil {
if !errors.Is(err, io.EOF) { log.Printf("XXX %v failed: %v", this.conn.RemoteAddr(), err)
log.Printf("XXX %v failed: %v", this.conn.RemoteAddr(), err)
}
return return
} }
log.Println("accepted transaction")
go this.runTrans(trans) go this.runTrans(trans)
} }
} }
@ -112,7 +111,7 @@ func (this *client) transTalk(trans hopp.Trans, initial *chat.MessageJoin) error
} }
func (this *client) handleMessageChat(trans hopp.Trans, room string, message *chat.MessageChat) error { func (this *client) handleMessageChat(trans hopp.Trans, room string, message *chat.MessageChat) error {
log.Printf("(). %s #%s: %s", this.nickname, room, message.Content) log.Println("(). %s #%s: %s", this.nickname, room, message.Content)
clients, done := clients.RBorrow() clients, done := clients.RBorrow()
defer done() defer done()
for client := range clients { for client := range clients {

View File

@ -1237,9 +1237,7 @@ func (this *Generator) generateReceive() (n int, err error) {
} }
nn, err = this.iprint("}\n") nn, err = this.iprint("}\n")
n += nn; if err != nil { return n, err } n += nn; if err != nil { return n, err }
// fuck off go vet nn, err = this.iprint("return nil, n, fmt.Errorf(\"%w: M%04X\", hopp.ErrUnknownMethod, method)\n")
str := `return nil, n, fmt.Errorf("%w: M%04X", hopp.ErrUnknownMethod, method)`
nn, err = this.iprintln(str)
n += nn; if err != nil { return n, err } n += nn; if err != nil { return n, err }
this.pop() this.pop()
nn, err = this.iprint("}\n") nn, err = this.iprint("}\n")

View File

@ -6,7 +6,6 @@ import "fmt"
import "net" import "net"
import "sync" import "sync"
import "time" import "time"
import "context"
import "sync/atomic" import "sync/atomic"
import "git.tebibyte.media/sashakoshka/go-util/sync" import "git.tebibyte.media/sashakoshka/go-util/sync"
@ -40,23 +39,20 @@ type a struct {
sendLock sync.Mutex sendLock sync.Mutex
transMap map[int64] *transA transMap map[int64] *transA
transChan chan *transA transChan chan *transA
ctx context.Context done chan struct { }
done func()
err error err error
} }
// AdaptA returns a connection implementing METADAPT-A over a singular stream- // AdaptA returns a connection implementing METADAPT-A over a singular stream-
// oriented transport such as TCP or UNIX domain stream sockets. // oriented transport such as TCP or UNIX domain stream sockets.
func AdaptA(underlying net.Conn, party Party) Conn { func AdaptA(underlying net.Conn, party Party) Conn {
ctx, done := context.WithCancel(context.Background())
conn := &a { conn := &a {
sizeLimit: defaultSizeLimit, sizeLimit: defaultSizeLimit,
underlying: underlying, underlying: underlying,
party: party, party: party,
transMap: make(map[int64] *transA), transMap: make(map[int64] *transA),
transChan: make(chan *transA), transChan: make(chan *transA),
ctx: ctx, done: make(chan struct { }),
done: done,
} }
if party == ClientSide { if party == ClientSide {
conn.transID = 1 conn.transID = 1
@ -64,15 +60,11 @@ func AdaptA(underlying net.Conn, party Party) Conn {
conn.transID = -1 conn.transID = -1
} }
go conn.receive() go conn.receive()
go func() {
<- ctx.Done()
underlying.Close()
}()
return conn return conn
} }
func (this *a) Close() error { func (this *a) Close() error {
this.done() close(this.done)
return nil return nil
} }
@ -113,7 +105,7 @@ func (this *a) AcceptTrans() (Trans, error) {
return nil, eof return nil, eof
} }
return trans, nil return trans, nil
case <- this.ctx.Done(): case <- this.done:
return nil, eof return nil, eof
} }
} }
@ -498,7 +490,6 @@ func decodeMessageA(
headerBuffer := [18]byte { } headerBuffer := [18]byte { }
_, err = io.ReadFull(reader, headerBuffer[:]) _, err = io.ReadFull(reader, headerBuffer[:])
if err != nil { return 0, 0, false, nil, err } if err != nil { return 0, 0, false, nil, err }
transID, err = decodeI64[int64](headerBuffer[:8]) transID, err = decodeI64[int64](headerBuffer[:8])
if err != nil { return 0, 0, false, nil, err } if err != nil { return 0, 0, false, nil, err }
method, err = decodeI16[uint16](headerBuffer[8:10]) method, err = decodeI16[uint16](headerBuffer[8:10])

View File

@ -52,7 +52,6 @@ func TestConnA(test *testing.T) {
test.Error("CLIENT payload:", gotPayload) test.Error("CLIENT payload:", gotPayload)
test.Fatal("CLIENT ok byeeeeeeeeeeeee") test.Fatal("CLIENT ok byeeeeeeeeeeeee")
} }
test.Log("CLIENT transaction has closed")
} }
serverFunc := func(a Conn) { serverFunc := func(a Conn) {
@ -71,6 +70,20 @@ func TestConnA(test *testing.T) {
} }
func TestTransOpenCloseA(test *testing.T) { func TestTransOpenCloseA(test *testing.T) {
// currently:
//
// | data sent | data recvd | close sent | close recvd
// 10 | X | X | X | server hangs
// 20 | X | X | X | client hangs
// 30 | X | | X |
//
// when a close message is recvd, it tries to push to the trans and
// hangs on trans.incoming.Send, which hangs on sending the value to the
// underlying channel. why is this?
//
// check if we are really getting values from the channel when pulling
// from the trans channel when we are expecting a close.
clientFunc := func(conn Conn) { clientFunc := func(conn Conn) {
// 10 // 10
trans, err := conn.OpenTrans() trans, err := conn.OpenTrans()