Use bytes.Buffer instead of a slice for METADAPT-A writer
This commit is contained in:
parent
d6dc6d6e78
commit
9651fed635
29
metadapta.go
29
metadapta.go
@ -6,6 +6,7 @@ import "fmt"
|
|||||||
import "net"
|
import "net"
|
||||||
import "sync"
|
import "sync"
|
||||||
import "time"
|
import "time"
|
||||||
|
import "bytes"
|
||||||
import "context"
|
import "context"
|
||||||
import "sync/atomic"
|
import "sync/atomic"
|
||||||
import "git.tebibyte.media/sashakoshka/go-util/sync"
|
import "git.tebibyte.media/sashakoshka/go-util/sync"
|
||||||
@ -224,7 +225,7 @@ type transA struct {
|
|||||||
incoming usync.Gate[incomingMessage]
|
incoming usync.Gate[incomingMessage]
|
||||||
currentReader io.Reader
|
currentReader io.Reader
|
||||||
currentWriter io.Closer
|
currentWriter io.Closer
|
||||||
writeBuffer []byte
|
writeBuffer bytes.Buffer
|
||||||
closed atomic.Bool
|
closed atomic.Bool
|
||||||
closeErr error
|
closeErr error
|
||||||
|
|
||||||
@ -270,11 +271,16 @@ func (this *transA) SendWriter(method uint16) (io.WriteCloser, error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// create new writer
|
// create new writer
|
||||||
|
this.writeBuffer.Reset()
|
||||||
writer := &writerA {
|
writer := &writerA {
|
||||||
parent: this,
|
parent: this,
|
||||||
// there is only ever one writer at a time, so they can all
|
// there is only ever one writer at a time, so they can all
|
||||||
// share a buffer
|
// share a buffer
|
||||||
buffer: this.writeBuffer[:0],
|
// FIXME: use a buffer pool, and just reset the buffers before putting them
|
||||||
|
// back in. it will work just fine bc we dont ever allocate more than the chunk
|
||||||
|
// size anyway. perhaps create some sort of config value that disables this
|
||||||
|
// behavior and just uses one buffer
|
||||||
|
buffer: &this.writeBuffer,
|
||||||
method: method,
|
method: method,
|
||||||
chunkSize: defaultChunkSize,
|
chunkSize: defaultChunkSize,
|
||||||
open: true,
|
open: true,
|
||||||
@ -406,7 +412,7 @@ func (this *readerA) Read(buffer []byte) (int, error) {
|
|||||||
|
|
||||||
type writerA struct {
|
type writerA struct {
|
||||||
parent *transA
|
parent *transA
|
||||||
buffer []byte
|
buffer *bytes.Buffer
|
||||||
method uint16
|
method uint16
|
||||||
chunkSize int64
|
chunkSize int64
|
||||||
open bool
|
open bool
|
||||||
@ -425,7 +431,7 @@ func (this *writerA) Write(data []byte) (n int, err error) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (this *writerA) Close() error {
|
func (this *writerA) Close() error {
|
||||||
if len(this.buffer) > 0 {
|
if this.buffer.Len() > 0 {
|
||||||
this.flush(0)
|
this.flush(0)
|
||||||
}
|
}
|
||||||
this.open = false
|
this.open = false
|
||||||
@ -436,11 +442,11 @@ func (this *writerA) writeOne(data []byte) (n int, err error) {
|
|||||||
data = data[:min(len(data), int(this.chunkSize))]
|
data = data[:min(len(data), int(this.chunkSize))]
|
||||||
|
|
||||||
// if there is more room, append to the buffer and exit
|
// if there is more room, append to the buffer and exit
|
||||||
if int64(len(this.buffer) + len(data)) <= this.chunkSize {
|
if int64(this.buffer.Len() + len(data)) <= this.chunkSize {
|
||||||
this.buffer = append(this.buffer, data...)
|
this.buffer.Write(data)
|
||||||
n = len(data)
|
n = len(data)
|
||||||
// if have a full chunk, flush
|
// if have a full chunk, flush
|
||||||
if int64(len(this.buffer)) == this.chunkSize {
|
if int64(this.buffer.Len()) == this.chunkSize {
|
||||||
err = this.flush(1)
|
err = this.flush(1)
|
||||||
if err != nil { return n, err }
|
if err != nil { return n, err }
|
||||||
}
|
}
|
||||||
@ -450,13 +456,16 @@ func (this *writerA) writeOne(data []byte) (n int, err error) {
|
|||||||
// if not, flush and store as much as we can in the buffer
|
// if not, flush and store as much as we can in the buffer
|
||||||
err = this.flush(1)
|
err = this.flush(1)
|
||||||
if err != nil { return n, err }
|
if err != nil { return n, err }
|
||||||
this.buffer = append(this.buffer, data...)
|
n = int(min(int64(len(data)), this.chunkSize))
|
||||||
|
this.buffer.Write(data[:n])
|
||||||
return n, nil
|
return n, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *writerA) flush(ccb uint64) error {
|
func (this *writerA) flush(ccb uint64) error {
|
||||||
err := this.parent.parent.sendMessageSafe(this.parent.id, this.method, ccb, this.buffer)
|
err := this.parent.parent.sendMessageSafe(
|
||||||
this.buffer = this.buffer[0:0]
|
this.parent.id, this.method, ccb,
|
||||||
|
this.buffer.Bytes())
|
||||||
|
this.buffer.Reset()
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user