Implement SendWriter for METADAPT-A
This commit is contained in:
		
							parent
							
								
									8a3df95491
								
							
						
					
					
						commit
						41f5cfefab
					
				
							
								
								
									
										76
									
								
								metadapta.go
									
									
									
									
									
								
							
							
						
						
									
										76
									
								
								metadapta.go
									
									
									
									
									
								
							@ -9,7 +9,7 @@ import "git.tebibyte.media/sashakoshka/go-util/sync"
 | 
				
			|||||||
 | 
					
 | 
				
			||||||
const closeMethod = 0xFFFF
 | 
					const closeMethod = 0xFFFF
 | 
				
			||||||
const int64Max = int64((^uint64(0)) >> 1)
 | 
					const int64Max = int64((^uint64(0)) >> 1)
 | 
				
			||||||
 | 
					const defaultChunkSize = 0x1000
 | 
				
			||||||
 | 
					
 | 
				
			||||||
// Party represents a side of a connection.
 | 
					// Party represents a side of a connection.
 | 
				
			||||||
type Party bool; const (
 | 
					type Party bool; const (
 | 
				
			||||||
@ -171,6 +171,8 @@ type transA struct {
 | 
				
			|||||||
	id            int64
 | 
						id            int64
 | 
				
			||||||
	incoming      usync.Gate[incomingMessage]
 | 
						incoming      usync.Gate[incomingMessage]
 | 
				
			||||||
	currentReader io.Reader
 | 
						currentReader io.Reader
 | 
				
			||||||
 | 
						currentWriter io.Closer
 | 
				
			||||||
 | 
						writeBuffer   []byte
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (this *transA) Close() error {
 | 
					func (this *transA) Close() error {
 | 
				
			||||||
@ -192,6 +194,27 @@ func (this *transA) Send(method uint16, data []byte) error {
 | 
				
			|||||||
	return this.parent.sendMessageSafe(this.id, method, data)
 | 
						return this.parent.sendMessageSafe(this.id, method, data)
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (this *transA) SendWriter(method uint16) (io.Writer, error) {
 | 
				
			||||||
 | 
						// close previous writer if necessary
 | 
				
			||||||
 | 
						if this.currentWriter != nil {
 | 
				
			||||||
 | 
							this.currentWriter.Close()
 | 
				
			||||||
 | 
							this.currentWriter = nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// create new writer
 | 
				
			||||||
 | 
						writer := &writerA {
 | 
				
			||||||
 | 
							parent: this,
 | 
				
			||||||
 | 
							// there is only ever one writer at a time, so they can all
 | 
				
			||||||
 | 
							// share a buffer
 | 
				
			||||||
 | 
							buffer: this.writeBuffer[:0],
 | 
				
			||||||
 | 
							method: method,
 | 
				
			||||||
 | 
							chunkSize: defaultChunkSize,
 | 
				
			||||||
 | 
							open: true,
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						this.currentWriter = writer
 | 
				
			||||||
 | 
						return writer, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
func (this *transA) Receive() (method uint16, data []byte, err error) {
 | 
					func (this *transA) Receive() (method uint16, data []byte, err error) {
 | 
				
			||||||
	method, reader, err := this.ReceiveReader()
 | 
						method, reader, err := this.ReceiveReader()
 | 
				
			||||||
	if err != nil { return 0, nil, err }
 | 
						if err != nil { return 0, nil, err }
 | 
				
			||||||
@ -263,6 +286,57 @@ func (this *readerA) Read(buffer []byte) (int, error) {
 | 
				
			|||||||
	return copied, nil
 | 
						return copied, nil
 | 
				
			||||||
}
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					type writerA struct {
 | 
				
			||||||
 | 
						parent    *transA
 | 
				
			||||||
 | 
						buffer    []byte
 | 
				
			||||||
 | 
						method    uint16
 | 
				
			||||||
 | 
						chunkSize int64
 | 
				
			||||||
 | 
						open      bool
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (this *writerA) Write(data []byte) (n int, err error) {
 | 
				
			||||||
 | 
						if !this.open { return 0, io.EOF }
 | 
				
			||||||
 | 
						toSend := data
 | 
				
			||||||
 | 
						for len(toSend) > 0 {
 | 
				
			||||||
 | 
							nn, err := this.writeOne(toSend)
 | 
				
			||||||
 | 
							n += nn
 | 
				
			||||||
 | 
							toSend = toSend[nn:]
 | 
				
			||||||
 | 
							if err != nil { return n, err }
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
						return n, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (this *writerA) Close() error {
 | 
				
			||||||
 | 
						this.open = false
 | 
				
			||||||
 | 
						return nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (this *writerA) writeOne(data []byte) (n int, err error) {
 | 
				
			||||||
 | 
						data = data[:min(len(data), int(this.chunkSize))]
 | 
				
			||||||
 | 
						
 | 
				
			||||||
 | 
						// if there is more room, append to the buffer and exit
 | 
				
			||||||
 | 
						if int64(len(this.buffer) + len(data)) <= this.chunkSize {
 | 
				
			||||||
 | 
							this.buffer = append(this.buffer, data...)
 | 
				
			||||||
 | 
							n = len(data)
 | 
				
			||||||
 | 
							// if have a full chunk, flush
 | 
				
			||||||
 | 
							if int64(len(this.buffer)) == this.chunkSize {
 | 
				
			||||||
 | 
								err = this.flush()
 | 
				
			||||||
 | 
								if err != nil { return n, err }
 | 
				
			||||||
 | 
							}
 | 
				
			||||||
 | 
							return n, nil
 | 
				
			||||||
 | 
						}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
						// if not, flush and store as much as we can in the buffer
 | 
				
			||||||
 | 
						err = this.flush()
 | 
				
			||||||
 | 
						if err != nil { return n, err }
 | 
				
			||||||
 | 
						this.buffer = append(this.buffer, data...)
 | 
				
			||||||
 | 
						return n, nil
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
 | 
					func (this *writerA) flush() error {
 | 
				
			||||||
 | 
						return this.parent.parent.sendMessageSafe(this.parent.id, this.method, this.buffer)
 | 
				
			||||||
 | 
					}
 | 
				
			||||||
 | 
					
 | 
				
			||||||
type incomingMessage struct {
 | 
					type incomingMessage struct {
 | 
				
			||||||
	method  uint16
 | 
						method  uint16
 | 
				
			||||||
	chunked bool
 | 
						chunked bool
 | 
				
			||||||
 | 
				
			|||||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user