From 6c2bfc0ea536eff2d626f7212c5c87facf8c442b Mon Sep 17 00:00:00 2001 From: mars Date: Sun, 20 Nov 2022 10:48:49 -0700 Subject: [PATCH] Split Messenger into MessageQueue --- apps/magpie/src/protocol.rs | 95 +++++++++++++++++++++++-------------- 1 file changed, 60 insertions(+), 35 deletions(-) diff --git a/apps/magpie/src/protocol.rs b/apps/magpie/src/protocol.rs index 650fb80..e466258 100644 --- a/apps/magpie/src/protocol.rs +++ b/apps/magpie/src/protocol.rs @@ -75,12 +75,66 @@ impl ClientMessenger { /// A [Messenger] specialized for Magpie servers. pub type ServerMessenger = Messenger; -/// Bidirectional, transport-agnostic Magpie IO wrapper struct. -pub struct Messenger { - transport: T, +/// Piecewise packet assembler for [Messenger]. +pub struct MessageQueue { expected_len: Option, received_buf: VecDeque, received_queue: VecDeque, +} + +impl Default for MessageQueue { + fn default() -> Self { + Self { + expected_len: None, + received_buf: Default::default(), + received_queue: Default::default(), + } + } +} + +impl MessageQueue { + pub fn on_data(&mut self, data: &[u8]) -> std::io::Result<()> { + self.received_buf.write_all(data)?; + + loop { + if let Some(expected_len) = self.expected_len { + if self.received_buf.len() < expected_len { + break; + } + + self.expected_len = None; + let mut buf = vec![0u8; expected_len]; + self.received_buf.read_exact(&mut buf)?; + match serde_json::from_slice::(&buf) { + Ok(received) => self.received_queue.push_front(received), + Err(e) => { + let kind = std::io::ErrorKind::InvalidData; + let payload = Box::new(e); + let error = std::io::Error::new(kind, payload); + return Err(error); + } + } + } else if self.received_buf.len() >= 4 { + use byteorder::{LittleEndian, ReadBytesExt}; + let expected_len = self.received_buf.read_u32::()?; + self.expected_len = Some(expected_len as usize); + } else { + break; + } + } + + Ok(()) + } + + pub fn recv(&mut self) -> Option { + self.received_queue.pop_back() + } +} + +/// Bidirectional, transport-agnostic Magpie IO wrapper struct. +pub struct Messenger { + transport: T, + queue: MessageQueue, closed: bool, _output: PhantomData, } @@ -89,9 +143,7 @@ impl Messenger { pub fn new(transport: T) -> Self { Self { transport, - expected_len: None, - received_buf: Default::default(), - received_queue: Default::default(), + queue: Default::default(), closed: false, _output: PhantomData, } @@ -126,7 +178,7 @@ impl Messenger { break; } Ok(n) => { - self.received_buf.write(&buf[..n])?; + self.queue.on_data(&buf[..n])?; } Err(ref err) if err.kind() == std::io::ErrorKind::WouldBlock => break, Err(ref err) if err.kind() == std::io::ErrorKind::Interrupted => continue, @@ -134,39 +186,12 @@ impl Messenger { } } - loop { - if let Some(expected_len) = self.expected_len { - if self.received_buf.len() < expected_len { - break; - } - - self.expected_len = None; - let mut buf = vec![0u8; expected_len]; - self.received_buf.read_exact(&mut buf)?; - match serde_json::from_slice::(&buf) { - Ok(received) => self.received_queue.push_front(received), - Err(e) => { - let kind = std::io::ErrorKind::InvalidData; - let payload = Box::new(e); - let error = std::io::Error::new(kind, payload); - return Err(error); - } - } - } else if self.received_buf.len() >= 4 { - use byteorder::{LittleEndian, ReadBytesExt}; - let expected_len = self.received_buf.read_u32::()?; - self.expected_len = Some(expected_len as usize); - } else { - break; - } - } - Ok(()) } /// Tries to receive a single input packet. pub fn recv(&mut self) -> Option { - self.received_queue.pop_back() + self.queue.recv() } /// Destroys this messenger and returns the inner transport.