Compare commits

...

3 Commits

View File

@ -10,6 +10,7 @@ import "git.tebibyte.media/sashakoshka/go-util/sync"
const closeMethod = 0xFFFF const closeMethod = 0xFFFF
const int64Max = int64((^uint64(0)) >> 1) const int64Max = int64((^uint64(0)) >> 1)
// Party represents a side of a connection. // Party represents a side of a connection.
type Party bool; const ( type Party bool; const (
ServerSide Party = false ServerSide Party = false
@ -33,6 +34,7 @@ type a struct {
// oriented transport such as TCP or UNIX domain stream sockets. // oriented transport such as TCP or UNIX domain stream sockets.
func AdaptA(underlying net.Conn, party Party) Conn { func AdaptA(underlying net.Conn, party Party) Conn {
conn := &a { conn := &a {
sizeLimit: defaultSizeLimit,
underlying: underlying, underlying: underlying,
party: party, party: party,
transMap: make(map[int64] *transA), transMap: make(map[int64] *transA),
@ -115,7 +117,7 @@ func (this *a) receive() {
clear(this.transMap) clear(this.transMap)
}() }()
for { for {
transID, method, chunked, payload, err := decodeMessageA(this.underlying) transID, method, chunked, payload, err := decodeMessageA(this.underlying, this.sizeLimit)
if err != nil { if err != nil {
this.err = fmt.Errorf("could not receive message: %w", err) this.err = fmt.Errorf("could not receive message: %w", err)
return return
@ -234,6 +236,7 @@ func (this *readerA) pull() (uint16, error) {
if !message.chunked { if !message.chunked {
this.eof = true this.eof = true
} }
return message.method, nil
} }
} }
} }
@ -266,18 +269,25 @@ type incomingMessage struct {
} }
func encodeMessageA(writer io.Writer, trans int64, method uint16, data []byte) error { func encodeMessageA(writer io.Writer, trans int64, method uint16, data []byte) error {
buffer := make([]byte, 12 + len(data)) buffer := make([]byte, 18 + len(data))
tape.EncodeI64(buffer[:8], trans) tape.EncodeI64(buffer[:8], trans)
tape.EncodeI16(buffer[8:10], method) tape.EncodeI16(buffer[8:10], method)
length, ok := tape.U16CastSafe(len(data)) tape.EncodeI64(buffer[10:18], uint64(len(data)))
if !ok { return ErrPayloadTooLarge } copy(buffer[18:], data)
tape.EncodeI16(buffer[10:12], length)
copy(buffer[12:], data)
_, err := writer.Write(buffer) _, err := writer.Write(buffer)
return err return err
} }
func decodeMessageA(reader io.Reader) (transID int64, method uint16, chunked bool, payloadBuffer []byte, err error) { func decodeMessageA(
reader io.Reader,
sizeLimit int64,
) (
transID int64,
method uint16,
chunked bool,
payloadBuffer []byte,
err error,
) {
headerBuffer := [18]byte { } headerBuffer := [18]byte { }
_, err = io.ReadFull(reader, headerBuffer[:]) _, err = io.ReadFull(reader, headerBuffer[:])
if err != nil { return 0, 0, false, nil, err } if err != nil { return 0, 0, false, nil, err }
@ -288,6 +298,9 @@ func decodeMessageA(reader io.Reader) (transID int64, method uint16, chunked boo
size, err := tape.DecodeI64[uint64](headerBuffer[10:18]) size, err := tape.DecodeI64[uint64](headerBuffer[10:18])
if err != nil { return 0, 0, false, nil, err } if err != nil { return 0, 0, false, nil, err }
chunked, size = splitCCBSize(size) chunked, size = splitCCBSize(size)
if size > uint64(sizeLimit) {
return 0, 0, false, nil, ErrPayloadTooLarge
}
payloadBuffer = make([]byte, int(size)) payloadBuffer = make([]byte, int(size))
_, err = io.ReadFull(reader, payloadBuffer) _, err = io.ReadFull(reader, payloadBuffer)
if err != nil { return 0, 0, false, nil, err } if err != nil { return 0, 0, false, nil, err }