Update METADAPT-A and METADAPT-B accordingly
This commit is contained in:
parent
92b3da4282
commit
044a1a6119
10
metadapta.go
10
metadapta.go
@ -4,7 +4,6 @@ import "io"
|
|||||||
import "fmt"
|
import "fmt"
|
||||||
import "net"
|
import "net"
|
||||||
import "sync"
|
import "sync"
|
||||||
import "context"
|
|
||||||
import "git.tebibyte.media/sashakoshka/hopp/tape"
|
import "git.tebibyte.media/sashakoshka/hopp/tape"
|
||||||
import "git.tebibyte.media/sashakoshka/go-util/sync"
|
import "git.tebibyte.media/sashakoshka/go-util/sync"
|
||||||
|
|
||||||
@ -24,6 +23,7 @@ type a struct {
|
|||||||
sendLock sync.Mutex
|
sendLock sync.Mutex
|
||||||
transMap map[int64] *transA
|
transMap map[int64] *transA
|
||||||
transChan chan *transA
|
transChan chan *transA
|
||||||
|
done chan struct { }
|
||||||
err error
|
err error
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -35,12 +35,14 @@ func AdaptA(underlying net.Conn, party Party) Conn {
|
|||||||
party: party,
|
party: party,
|
||||||
transMap: make(map[int64] *transA),
|
transMap: make(map[int64] *transA),
|
||||||
transChan: make(chan *transA),
|
transChan: make(chan *transA),
|
||||||
|
done: make(chan struct { }),
|
||||||
}
|
}
|
||||||
go conn.receive()
|
go conn.receive()
|
||||||
return conn
|
return conn
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *a) Close() error {
|
func (this *a) Close() error {
|
||||||
|
close(this.done)
|
||||||
return this.underlying.Close()
|
return this.underlying.Close()
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -61,12 +63,12 @@ func (this *a) OpenTrans() (Trans, error) {
|
|||||||
return trans, nil
|
return trans, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *a) AcceptTrans(ctx context.Context) (Trans, error) {
|
func (this *a) AcceptTrans() (Trans, error) {
|
||||||
select {
|
select {
|
||||||
case trans := <- this.transChan:
|
case trans := <- this.transChan:
|
||||||
return trans, nil
|
return trans, nil
|
||||||
case <- ctx.Done():
|
case <- this.done:
|
||||||
return nil, ctx.Err()
|
return nil, fmt.Errorf("could not accept transaction: %w", io.EOF)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -29,8 +29,8 @@ func (this *b) OpenTrans() (Trans, error) {
|
|||||||
return transB { underlying: stream }, nil
|
return transB { underlying: stream }, nil
|
||||||
}
|
}
|
||||||
|
|
||||||
func (this *b) AcceptTrans(ctx context.Context) (Trans, error) {
|
func (this *b) AcceptTrans() (Trans, error) {
|
||||||
stream, err := this.underlying.AcceptStream(ctx)
|
stream, err := this.underlying.AcceptStream(context.Background())
|
||||||
if err != nil { return nil, err }
|
if err != nil { return nil, err }
|
||||||
return transB { underlying: stream }, nil
|
return transB { underlying: stream }, nil
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user