package usync import "slices" import "context" import "reflect" // ChannelRecv is a type constraint that includes all channel types that can be // recieved from. type ChannelRecv[T any] interface { ~chan T | ~<- chan T } // A type-safe wrapper around reflect.Select. Taken from: // https://stackoverflow.com/questions/19992334 func Select[T any, C ChannelRecv[T]] (ctx context.Context, channels ...C) (int, T, bool) { var zero T // add all channels as select cases cases := make([]reflect.SelectCase, len(channels) + 1) for i, ch := range channels { cases[i] = reflect.SelectCase { Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ch), } } // add ctx.Done() as another select case to stop listening when the // context is closed cases[len(channels)] = reflect.SelectCase { Dir: reflect.SelectRecv, Chan: reflect.ValueOf(ctx.Done()), } // read from the channel chosen, value, ok := reflect.Select(cases) if !ok { if ctx.Err() != nil { return -1, zero, false } return chosen, zero, false } // cast return value if ret, ok := value.Interface().(T); ok { return chosen, ret, true } return chosen, zero, false } // Combine returns a channel that continuously returns the result of the select // until all input sources are exhauste, or the context is canceled. func Combine[T any, C ChannelRecv[T]] (ctx context.Context, channels ...C) <- chan T { channel := make(chan T) // our silly slection routine go func () { for { if len(channels) < 2 { // only the context is left, stop everything close(channel) return } // read new value chosen, value, ok := Select(ctx, channels...) if ok { // we have a value channel <- value } else { // a channel has been closed and we need to do // something about it if chosen == len(channels) - 1 { // the context has expired, stop // everything close(channel) return } else { // a normal channel has closed, remove // it from the list channels = slices.Delete(channels, chosen, chosen + 1) } } } } () return channel }