Fix CCB and flushing in METADAPT-A

This commit is contained in:
Sasha Koshka 2025-10-20 21:19:21 -04:00
parent 2e4c693174
commit d4ccdb282e

View File

@ -124,10 +124,10 @@ func (this *a) unlistTransactionSafe(id int64) {
delete(this.transMap, id) delete(this.transMap, id)
} }
func (this *a) sendMessageSafe(trans int64, method uint16, data []byte) error { func (this *a) sendMessageSafe(trans int64, method uint16, ccb uint64, data []byte) error {
this.sendLock.Lock() this.sendLock.Lock()
defer this.sendLock.Unlock() defer this.sendLock.Unlock()
return encodeMessageA(this.underlying, this.sizeLimit, trans, method, data) return encodeMessageA(this.underlying, this.sizeLimit, trans, method, 0, data)
} }
func (this *a) receive() { func (this *a) receive() {
@ -251,7 +251,7 @@ func (this *transA) ID() int64 {
} }
func (this *transA) Send(method uint16, data []byte) error { func (this *transA) Send(method uint16, data []byte) error {
return this.parent.sendMessageSafe(this.id, method, data) return this.parent.sendMessageSafe(this.id, method, 0, data)
} }
func (this *transA) SendWriter(method uint16) (io.WriteCloser, error) { func (this *transA) SendWriter(method uint16) (io.WriteCloser, error) {
@ -417,6 +417,7 @@ func (this *writerA) Write(data []byte) (n int, err error) {
} }
func (this *writerA) Close() error { func (this *writerA) Close() error {
this.flush(0)
this.open = false this.open = false
return nil return nil
} }
@ -430,21 +431,21 @@ func (this *writerA) writeOne(data []byte) (n int, err error) {
n = len(data) n = len(data)
// if have a full chunk, flush // if have a full chunk, flush
if int64(len(this.buffer)) == this.chunkSize { if int64(len(this.buffer)) == this.chunkSize {
err = this.flush() err = this.flush(1)
if err != nil { return n, err } if err != nil { return n, err }
} }
return n, nil return n, nil
} }
// if not, flush and store as much as we can in the buffer // if not, flush and store as much as we can in the buffer
err = this.flush() err = this.flush(1)
if err != nil { return n, err } if err != nil { return n, err }
this.buffer = append(this.buffer, data...) this.buffer = append(this.buffer, data...)
return n, nil return n, nil
} }
func (this *writerA) flush() error { func (this *writerA) flush(ccb uint64) error {
return this.parent.parent.sendMessageSafe(this.parent.id, this.method, this.buffer) return this.parent.parent.sendMessageSafe(this.parent.id, this.method, ccb, this.buffer)
} }
type incomingMessage struct { type incomingMessage struct {
@ -458,15 +459,19 @@ func encodeMessageA(
sizeLimit int64, sizeLimit int64,
trans int64, trans int64,
method uint16, method uint16,
ccb uint64,
data []byte, data []byte,
) error { ) error {
if int64(len(data)) > sizeLimit { if int64(len(data)) > sizeLimit {
return ErrPayloadTooLarge return ErrPayloadTooLarge
} }
buffer := make([]byte, 18 + len(data)) buffer := make([]byte, 18 + len(data))
// transaction ID field
encodeI64(buffer[:8], trans) encodeI64(buffer[:8], trans)
// method field
encodeI16(buffer[8:10], method) encodeI16(buffer[8:10], method)
encodeI64(buffer[10:18], uint64(len(data))) // payload size field
encodeI64(buffer[10:18], uint64(len(data)) & 0x7FFFFFFFFFFFFFFF | ccb << 63)
copy(buffer[18:], data) copy(buffer[18:], data)
_, err := writer.Write(buffer) _, err := writer.Write(buffer)
return err return err