Compare commits
	
		
			No commits in common. "7a0bf64c17a4dd86e8ad518addda487805b83fe7" and "8a3df95491f47b1f85d5e38b5f78f31e24ccec65" have entirely different histories.
		
	
	
		
			7a0bf64c17
			...
			8a3df95491
		
	
		
| @ -44,12 +44,10 @@ type Trans interface { | ||||
| 		 | ||||
| 	// Send sends a message. This method is not safe for concurrent use. | ||||
| 	Send(method uint16, data []byte) error | ||||
| 	// SendWriter sends data written to an [io.Writer]. The writer must be | ||||
| 	// closed after use. Closing the writer flushes any data that hasn't | ||||
| 	// been written yet. Any writer previously opened through this function | ||||
| 	// will be discarded. This method is not safe for concurrent use, and | ||||
| 	// neither is its result. | ||||
| 	SendWriter(method uint16) (io.WriteCloser, error) | ||||
| 	// SendWriter sends data written to an [io.Writer]. Any writer | ||||
| 	// previously opened through this function will be discarded. This | ||||
| 	// method is not safe for concurrent use, and neither is its result. | ||||
| 	SendWriter(method uint16) (io.Writer, error) | ||||
| 	// Receive receives a message. This method is not safe for concurrent | ||||
| 	// use. | ||||
| 	Receive() (method uint16, data []byte, err error) | ||||
|  | ||||
							
								
								
									
										76
									
								
								metadapta.go
									
									
									
									
									
								
							
							
						
						
									
										76
									
								
								metadapta.go
									
									
									
									
									
								
							| @ -9,7 +9,7 @@ import "git.tebibyte.media/sashakoshka/go-util/sync" | ||||
| 
 | ||||
| const closeMethod = 0xFFFF | ||||
| const int64Max = int64((^uint64(0)) >> 1) | ||||
| const defaultChunkSize = 0x1000 | ||||
| 
 | ||||
| 
 | ||||
| // Party represents a side of a connection. | ||||
| type Party bool; const ( | ||||
| @ -171,8 +171,6 @@ type transA struct { | ||||
| 	id            int64 | ||||
| 	incoming      usync.Gate[incomingMessage] | ||||
| 	currentReader io.Reader | ||||
| 	currentWriter io.Closer | ||||
| 	writeBuffer   []byte | ||||
| } | ||||
| 
 | ||||
| func (this *transA) Close() error { | ||||
| @ -194,27 +192,6 @@ func (this *transA) Send(method uint16, data []byte) error { | ||||
| 	return this.parent.sendMessageSafe(this.id, method, data) | ||||
| } | ||||
| 
 | ||||
| func (this *transA) SendWriter(method uint16) (io.WriteCloser, error) { | ||||
| 	// close previous writer if necessary | ||||
| 	if this.currentWriter != nil { | ||||
| 		this.currentWriter.Close() | ||||
| 		this.currentWriter = nil | ||||
| 	} | ||||
| 
 | ||||
| 	// create new writer | ||||
| 	writer := &writerA { | ||||
| 		parent: this, | ||||
| 		// there is only ever one writer at a time, so they can all | ||||
| 		// share a buffer | ||||
| 		buffer: this.writeBuffer[:0], | ||||
| 		method: method, | ||||
| 		chunkSize: defaultChunkSize, | ||||
| 		open: true, | ||||
| 	} | ||||
| 	this.currentWriter = writer | ||||
| 	return writer, nil | ||||
| } | ||||
| 
 | ||||
| func (this *transA) Receive() (method uint16, data []byte, err error) { | ||||
| 	method, reader, err := this.ReceiveReader() | ||||
| 	if err != nil { return 0, nil, err } | ||||
| @ -286,57 +263,6 @@ func (this *readerA) Read(buffer []byte) (int, error) { | ||||
| 	return copied, nil | ||||
| } | ||||
| 
 | ||||
| type writerA struct { | ||||
| 	parent    *transA | ||||
| 	buffer    []byte | ||||
| 	method    uint16 | ||||
| 	chunkSize int64 | ||||
| 	open      bool | ||||
| } | ||||
| 
 | ||||
| func (this *writerA) Write(data []byte) (n int, err error) { | ||||
| 	if !this.open { return 0, io.EOF } | ||||
| 	toSend := data | ||||
| 	for len(toSend) > 0 { | ||||
| 		nn, err := this.writeOne(toSend) | ||||
| 		n += nn | ||||
| 		toSend = toSend[nn:] | ||||
| 		if err != nil { return n, err } | ||||
| 	} | ||||
| 	return n, nil | ||||
| } | ||||
| 
 | ||||
| func (this *writerA) Close() error { | ||||
| 	this.open = false | ||||
| 	return nil | ||||
| } | ||||
| 
 | ||||
| func (this *writerA) writeOne(data []byte) (n int, err error) { | ||||
| 	data = data[:min(len(data), int(this.chunkSize))] | ||||
| 	 | ||||
| 	// if there is more room, append to the buffer and exit | ||||
| 	if int64(len(this.buffer) + len(data)) <= this.chunkSize { | ||||
| 		this.buffer = append(this.buffer, data...) | ||||
| 		n = len(data) | ||||
| 		// if have a full chunk, flush | ||||
| 		if int64(len(this.buffer)) == this.chunkSize { | ||||
| 			err = this.flush() | ||||
| 			if err != nil { return n, err } | ||||
| 		} | ||||
| 		return n, nil | ||||
| 	} | ||||
| 
 | ||||
| 	// if not, flush and store as much as we can in the buffer | ||||
| 	err = this.flush() | ||||
| 	if err != nil { return n, err } | ||||
| 	this.buffer = append(this.buffer, data...) | ||||
| 	return n, nil | ||||
| } | ||||
| 
 | ||||
| func (this *writerA) flush() error { | ||||
| 	return this.parent.parent.sendMessageSafe(this.parent.id, this.method, this.buffer) | ||||
| } | ||||
| 
 | ||||
| type incomingMessage struct { | ||||
| 	method  uint16 | ||||
| 	chunked bool | ||||
|  | ||||
							
								
								
									
										40
									
								
								metadaptb.go
									
									
									
									
									
								
							
							
						
						
									
										40
									
								
								metadaptb.go
									
									
									
									
									
								
							| @ -2,7 +2,6 @@ package hopp | ||||
| 
 | ||||
| import "io" | ||||
| import "net" | ||||
| import "bytes" | ||||
| import "errors" | ||||
| import "context" | ||||
| import "git.tebibyte.media/sashakoshka/hopp/tape" | ||||
| @ -59,10 +58,9 @@ func (this *b) newTrans(underlying Stream) *transB { | ||||
| } | ||||
| 
 | ||||
| type transB struct { | ||||
| 	sizeLimit     int64 | ||||
| 	underlying    Stream | ||||
| 	currentData   io.Reader | ||||
| 	currentWriter *writerB | ||||
| 	sizeLimit int64 | ||||
| 	underlying Stream | ||||
| 	currentData io.Reader | ||||
| } | ||||
| 
 | ||||
| func (this *transB) Close() error { | ||||
| @ -77,24 +75,6 @@ func (this *transB) Send(method uint16, data []byte) error { | ||||
| 	return encodeMessageB(this.underlying, this.sizeLimit, method, data) | ||||
| } | ||||
| 
 | ||||
| func (this *transB) SendWriter(method uint16) (io.WriteCloser, error) { | ||||
| 	if this.currentWriter != nil { | ||||
| 		this.currentWriter.Close() | ||||
| 	} | ||||
| 	// TODO: come up with a fix that allows us to pipe data through the | ||||
| 	// writer. as of now, it just reads whatever is written into a buffer | ||||
| 	// and sends the message on close. we should probably introduce chunked | ||||
| 	// encoding to METADAPT-B to fix this. the implementation would be | ||||
| 	// simpler than on METADAPT-A, but most of the code could just be | ||||
| 	// copied over. | ||||
| 	writer := &writerB { | ||||
| 		parent: this, | ||||
| 		method: method, | ||||
| 	} | ||||
| 	this.currentWriter = writer | ||||
| 	return writer, nil | ||||
| } | ||||
| 
 | ||||
| func (this *transB) Receive() (uint16, []byte, error) { | ||||
| 	// get a reader for the next message | ||||
| 	method, size, data, err := this.receiveReader() | ||||
| @ -125,20 +105,6 @@ func (this *transB) receiveReader() (uint16, int64, io.Reader, error) { | ||||
| 	return method, size, data, nil | ||||
| } | ||||
| 
 | ||||
| type writerB struct { | ||||
| 	parent *transB | ||||
| 	buffer bytes.Buffer | ||||
| 	method uint16 | ||||
| } | ||||
| 
 | ||||
| func (this *writerB) Write(data []byte) (int, error) { | ||||
| 	return this.buffer.Write(data) | ||||
| } | ||||
| 
 | ||||
| func (this *writerB) Close() error { | ||||
| 	return this.parent.Send(this.method, this.buffer.Bytes()) | ||||
| } | ||||
| 
 | ||||
| // MultiConn represens a multiplexed stream-oriented transport for use in | ||||
| // [AdaptB]. | ||||
| type MultiConn interface { | ||||
|  | ||||
		Loading…
	
	
			
			x
			
			
		
	
		Reference in New Issue
	
	Block a user