Split Messenger into MessageQueue
This commit is contained in:
parent
f40d251ca4
commit
6c2bfc0ea5
|
@ -75,12 +75,66 @@ impl<T: Read + Write> ClientMessenger<T> {
|
||||||
/// A [Messenger] specialized for Magpie servers.
|
/// A [Messenger] specialized for Magpie servers.
|
||||||
pub type ServerMessenger<T> = Messenger<T, MagpieServerMsg, MagpieClientMsg>;
|
pub type ServerMessenger<T> = Messenger<T, MagpieServerMsg, MagpieClientMsg>;
|
||||||
|
|
||||||
/// Bidirectional, transport-agnostic Magpie IO wrapper struct.
|
/// Piecewise packet assembler for [Messenger].
|
||||||
pub struct Messenger<T, I, O> {
|
pub struct MessageQueue<I> {
|
||||||
transport: T,
|
|
||||||
expected_len: Option<usize>,
|
expected_len: Option<usize>,
|
||||||
received_buf: VecDeque<u8>,
|
received_buf: VecDeque<u8>,
|
||||||
received_queue: VecDeque<I>,
|
received_queue: VecDeque<I>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<I> Default for MessageQueue<I> {
|
||||||
|
fn default() -> Self {
|
||||||
|
Self {
|
||||||
|
expected_len: None,
|
||||||
|
received_buf: Default::default(),
|
||||||
|
received_queue: Default::default(),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<I: DeserializeOwned> MessageQueue<I> {
|
||||||
|
pub fn on_data(&mut self, data: &[u8]) -> std::io::Result<()> {
|
||||||
|
self.received_buf.write_all(data)?;
|
||||||
|
|
||||||
|
loop {
|
||||||
|
if let Some(expected_len) = self.expected_len {
|
||||||
|
if self.received_buf.len() < expected_len {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.expected_len = None;
|
||||||
|
let mut buf = vec![0u8; expected_len];
|
||||||
|
self.received_buf.read_exact(&mut buf)?;
|
||||||
|
match serde_json::from_slice::<I>(&buf) {
|
||||||
|
Ok(received) => self.received_queue.push_front(received),
|
||||||
|
Err(e) => {
|
||||||
|
let kind = std::io::ErrorKind::InvalidData;
|
||||||
|
let payload = Box::new(e);
|
||||||
|
let error = std::io::Error::new(kind, payload);
|
||||||
|
return Err(error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if self.received_buf.len() >= 4 {
|
||||||
|
use byteorder::{LittleEndian, ReadBytesExt};
|
||||||
|
let expected_len = self.received_buf.read_u32::<LittleEndian>()?;
|
||||||
|
self.expected_len = Some(expected_len as usize);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn recv(&mut self) -> Option<I> {
|
||||||
|
self.received_queue.pop_back()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Bidirectional, transport-agnostic Magpie IO wrapper struct.
|
||||||
|
pub struct Messenger<T, I, O> {
|
||||||
|
transport: T,
|
||||||
|
queue: MessageQueue<I>,
|
||||||
closed: bool,
|
closed: bool,
|
||||||
_output: PhantomData<O>,
|
_output: PhantomData<O>,
|
||||||
}
|
}
|
||||||
|
@ -89,9 +143,7 @@ impl<T: Read + Write, I: DeserializeOwned, O: Serialize> Messenger<T, I, O> {
|
||||||
pub fn new(transport: T) -> Self {
|
pub fn new(transport: T) -> Self {
|
||||||
Self {
|
Self {
|
||||||
transport,
|
transport,
|
||||||
expected_len: None,
|
queue: Default::default(),
|
||||||
received_buf: Default::default(),
|
|
||||||
received_queue: Default::default(),
|
|
||||||
closed: false,
|
closed: false,
|
||||||
_output: PhantomData,
|
_output: PhantomData,
|
||||||
}
|
}
|
||||||
|
@ -126,7 +178,7 @@ impl<T: Read + Write, I: DeserializeOwned, O: Serialize> Messenger<T, I, O> {
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
Ok(n) => {
|
Ok(n) => {
|
||||||
self.received_buf.write(&buf[..n])?;
|
self.queue.on_data(&buf[..n])?;
|
||||||
}
|
}
|
||||||
Err(ref err) if err.kind() == std::io::ErrorKind::WouldBlock => break,
|
Err(ref err) if err.kind() == std::io::ErrorKind::WouldBlock => break,
|
||||||
Err(ref err) if err.kind() == std::io::ErrorKind::Interrupted => continue,
|
Err(ref err) if err.kind() == std::io::ErrorKind::Interrupted => continue,
|
||||||
|
@ -134,39 +186,12 @@ impl<T: Read + Write, I: DeserializeOwned, O: Serialize> Messenger<T, I, O> {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
loop {
|
|
||||||
if let Some(expected_len) = self.expected_len {
|
|
||||||
if self.received_buf.len() < expected_len {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
|
|
||||||
self.expected_len = None;
|
|
||||||
let mut buf = vec![0u8; expected_len];
|
|
||||||
self.received_buf.read_exact(&mut buf)?;
|
|
||||||
match serde_json::from_slice::<I>(&buf) {
|
|
||||||
Ok(received) => self.received_queue.push_front(received),
|
|
||||||
Err(e) => {
|
|
||||||
let kind = std::io::ErrorKind::InvalidData;
|
|
||||||
let payload = Box::new(e);
|
|
||||||
let error = std::io::Error::new(kind, payload);
|
|
||||||
return Err(error);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
} else if self.received_buf.len() >= 4 {
|
|
||||||
use byteorder::{LittleEndian, ReadBytesExt};
|
|
||||||
let expected_len = self.received_buf.read_u32::<LittleEndian>()?;
|
|
||||||
self.expected_len = Some(expected_len as usize);
|
|
||||||
} else {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Tries to receive a single input packet.
|
/// Tries to receive a single input packet.
|
||||||
pub fn recv(&mut self) -> Option<I> {
|
pub fn recv(&mut self) -> Option<I> {
|
||||||
self.received_queue.pop_back()
|
self.queue.recv()
|
||||||
}
|
}
|
||||||
|
|
||||||
/// Destroys this messenger and returns the inner transport.
|
/// Destroys this messenger and returns the inner transport.
|
||||||
|
|
Loading…
Reference in New Issue