Add async feature + Messenger::recv_async()

This commit is contained in:
mars 2022-11-20 11:08:23 -07:00
parent 6c2bfc0ea5
commit 763c55d3e4
2 changed files with 32 additions and 4 deletions

View File

@ -12,6 +12,7 @@ required-features = ["service"]
anyhow = { version = "1", optional = true }
byteorder = "1.4"
canary = { path = "../..", optional = true }
futures-util = { version = "0.3", optional = true, features = ["io"] }
glium = { version = "0.32", optional = true}
mio = { version = "0.8", features = ["net", "os-poll"], optional = true }
mio-signals = { version = "0.2", optional = true }
@ -21,4 +22,5 @@ serde_json = "1"
slab = { version = "0.4", optional = true}
[features]
async = ["dep:futures-util"]
service = ["dep:anyhow", "dep:canary", "dep:glium", "dep:mio", "dep:mio-signals", "dep:parking_lot", "dep:slab"]

View File

@ -139,7 +139,7 @@ pub struct Messenger<T, I, O> {
_output: PhantomData<O>,
}
impl<T: Read + Write, I: DeserializeOwned, O: Serialize> Messenger<T, I, O> {
impl<T, I, O> Messenger<T, I, O> {
pub fn new(transport: T) -> Self {
Self {
transport,
@ -153,6 +153,13 @@ impl<T: Read + Write, I: DeserializeOwned, O: Serialize> Messenger<T, I, O> {
self.closed
}
/// Destroys this messenger and returns the inner transport.
pub fn into_transport(self) -> T {
self.transport
}
}
impl<T: Write, I, O: Serialize> Messenger<T, I, O> {
pub fn send(&mut self, msg: &O) -> std::io::Result<()> {
use byteorder::{LittleEndian, WriteBytesExt};
let payload = serde_json::to_vec(msg).unwrap();
@ -162,7 +169,9 @@ impl<T: Read + Write, I: DeserializeOwned, O: Serialize> Messenger<T, I, O> {
self.transport.flush()?;
Ok(())
}
}
impl<T: Read, I: DeserializeOwned, O> Messenger<T, I, O> {
/// Receives all pending messages and queues them for [recv].
pub fn flush_recv(&mut self) -> std::io::Result<()> {
let mut buf = [0u8; 1024];
@ -193,9 +202,26 @@ impl<T: Read + Write, I: DeserializeOwned, O: Serialize> Messenger<T, I, O> {
pub fn recv(&mut self) -> Option<I> {
self.queue.recv()
}
}
/// Destroys this messenger and returns the inner transport.
pub fn into_transport(self) -> T {
self.transport
#[cfg(feature = "async")]
mod async_messages {
use super::*;
use futures_util::AsyncReadExt;
use std::marker::Unpin;
impl<T: AsyncReadExt + Unpin, I: DeserializeOwned, O: Serialize> Messenger<T, I, O> {
pub async fn recv_async(&mut self) -> std::io::Result<I> {
let mut buf = [0u8; 1024];
loop {
let num = self.transport.read(&mut buf).await?;
self.queue.on_data(&buf[..num]);
if let Some(msg) = self.queue.recv() {
return Ok(msg);
}
}
}
}
}