From d4ccdb282e3171c2a136243916d308397328e7e8 Mon Sep 17 00:00:00 2001 From: Sasha Koshka Date: Mon, 20 Oct 2025 21:19:21 -0400 Subject: [PATCH] Fix CCB and flushing in METADAPT-A --- metadapta.go | 21 +++++++++++++-------- 1 file changed, 13 insertions(+), 8 deletions(-) diff --git a/metadapta.go b/metadapta.go index cae8b12..3b0ec61 100644 --- a/metadapta.go +++ b/metadapta.go @@ -124,10 +124,10 @@ func (this *a) unlistTransactionSafe(id int64) { delete(this.transMap, id) } -func (this *a) sendMessageSafe(trans int64, method uint16, data []byte) error { +func (this *a) sendMessageSafe(trans int64, method uint16, ccb uint64, data []byte) error { this.sendLock.Lock() defer this.sendLock.Unlock() - return encodeMessageA(this.underlying, this.sizeLimit, trans, method, data) + return encodeMessageA(this.underlying, this.sizeLimit, trans, method, 0, data) } func (this *a) receive() { @@ -251,7 +251,7 @@ func (this *transA) ID() int64 { } func (this *transA) Send(method uint16, data []byte) error { - return this.parent.sendMessageSafe(this.id, method, data) + return this.parent.sendMessageSafe(this.id, method, 0, data) } func (this *transA) SendWriter(method uint16) (io.WriteCloser, error) { @@ -417,6 +417,7 @@ func (this *writerA) Write(data []byte) (n int, err error) { } func (this *writerA) Close() error { + this.flush(0) this.open = false return nil } @@ -430,21 +431,21 @@ func (this *writerA) writeOne(data []byte) (n int, err error) { n = len(data) // if have a full chunk, flush if int64(len(this.buffer)) == this.chunkSize { - err = this.flush() + err = this.flush(1) if err != nil { return n, err } } return n, nil } // if not, flush and store as much as we can in the buffer - err = this.flush() + err = this.flush(1) if err != nil { return n, err } this.buffer = append(this.buffer, data...) return n, nil } -func (this *writerA) flush() error { - return this.parent.parent.sendMessageSafe(this.parent.id, this.method, this.buffer) +func (this *writerA) flush(ccb uint64) error { + return this.parent.parent.sendMessageSafe(this.parent.id, this.method, ccb, this.buffer) } type incomingMessage struct { @@ -458,15 +459,19 @@ func encodeMessageA( sizeLimit int64, trans int64, method uint16, + ccb uint64, data []byte, ) error { if int64(len(data)) > sizeLimit { return ErrPayloadTooLarge } buffer := make([]byte, 18 + len(data)) + // transaction ID field encodeI64(buffer[:8], trans) + // method field encodeI16(buffer[8:10], method) - encodeI64(buffer[10:18], uint64(len(data))) + // payload size field + encodeI64(buffer[10:18], uint64(len(data)) & 0x7FFFFFFFFFFFFFFF | ccb << 63) copy(buffer[18:], data) _, err := writer.Write(buffer) return err