diff --git a/apps/magpie/src/ipc.rs b/apps/magpie/src/ipc.rs index b41059c..7feaca1 100644 --- a/apps/magpie/src/ipc.rs +++ b/apps/magpie/src/ipc.rs @@ -5,6 +5,7 @@ use std::str::from_utf8; use std::sync::mpsc::{channel, Receiver, Sender}; use std::time::Duration; +use magpie_types::ServerMessenger; use mio::net::{UnixListener, UnixStream}; use mio::{Events, Interest, Poll, Token, Waker}; use mio_signals::{Signal, Signals}; @@ -51,47 +52,26 @@ impl DerefMut for Listener { } pub struct Client { - connection: UnixStream, + messenger: ServerMessenger, + token: Token, } impl Client { - pub fn new(connection: UnixStream) -> Self { - Self { connection } + pub fn new(connection: UnixStream, token: Token) -> Self { + Self { + messenger: ServerMessenger::new(connection), + token, + } } pub fn on_readable(&mut self) -> std::io::Result { - let mut connection_closed = false; - let mut received_data = vec![0; 4096]; - let mut bytes_read = 0; + self.messenger.flush_recv()?; - loop { - match self.connection.read(&mut received_data[bytes_read..]) { - Ok(0) => { - connection_closed = true; - break; - } - Ok(n) => { - bytes_read += n; - if bytes_read >= received_data.len() { - received_data.resize(received_data.len() + 1024, 0); - } - } - Err(ref err) if err.kind() == std::io::ErrorKind::WouldBlock => break, - Err(ref err) if err.kind() == std::io::ErrorKind::Interrupted => continue, - Err(err) => return Err(err), - } + while let Some(msg) = self.messenger.recv() { + println!("Client #{}: {:?}", self.token.0, msg); } - if bytes_read > 0 { - let received_data = &received_data[..bytes_read]; - if let Ok(str_buf) = from_utf8(received_data) { - println!("Received data: {}", str_buf.trim_end()); - } else { - println!("Received (non-UTF-8) data: {:?}", received_data); - } - } - - Ok(connection_closed) + Ok(self.messenger.is_closed()) } } @@ -165,19 +145,19 @@ impl Ipc { if event.token() == self.listener_token { loop { match self.listener.accept() { - Ok((connection, address)) => { + Ok((mut connection, address)) => { let token = Token(self.clients.vacant_key()); println!( "Accepting connection (Client #{}) from {:?}", token.0, address ); - let mut client = Client::new(connection); + let interest = Interest::READABLE; - self.poll.registry().register( - &mut client.connection, - token, - interest, - )?; + self.poll + .registry() + .register(&mut connection, token, interest)?; + + let client = Client::new(connection, token); self.clients.insert(client); } Err(ref err) if err.kind() == std::io::ErrorKind::WouldBlock => break, @@ -195,7 +175,7 @@ impl Ipc { if disconnected { println!("Client #{} disconnected", event.token().0); let mut client = self.clients.remove(event.token().0); - self.poll.registry().deregister(&mut client.connection)?; + self.poll.registry().deregister(&mut client.messenger.transport)?; } } else { panic!("Unrecognized event token: {:?}", event); diff --git a/crates/magpie-client/Cargo.toml b/crates/magpie-client/Cargo.toml index 0c66102..98ec6fc 100644 --- a/crates/magpie-client/Cargo.toml +++ b/crates/magpie-client/Cargo.toml @@ -5,4 +5,3 @@ edition = "2021" [dependencies] magpie-types = { path = "../magpie-types" } -serde_json = "1" diff --git a/crates/magpie-client/src/lib.rs b/crates/magpie-client/src/lib.rs index d4c13b5..9a19133 100644 --- a/crates/magpie-client/src/lib.rs +++ b/crates/magpie-client/src/lib.rs @@ -1,15 +1,16 @@ -use std::io::Write; +pub use magpie_types; + use std::os::unix::net::UnixStream; use std::path::Path; -use magpie_types::MagpieServerMsg; +use magpie_types::ClientMessenger; /// The name of the Magpie server socket. pub const MAGPIE_SOCK: &str = "magpie.sock"; /// A client to a Magpie server. pub struct MagpieClient { - socket: UnixStream, + pub messenger: ClientMessenger, } impl MagpieClient { @@ -18,12 +19,8 @@ impl MagpieClient { let sock_dir = Path::new(&sock_dir); let sock_path = sock_dir.join(MAGPIE_SOCK); let socket = UnixStream::connect(sock_path)?; - Ok(Self { socket }) - } - - pub fn send_msg(&mut self, msg: &MagpieServerMsg) -> std::io::Result<()> { - let bytes = serde_json::to_vec(msg).unwrap(); - self.socket.write_all(&bytes)?; - Ok(()) + Ok(Self { + messenger: ClientMessenger::new(socket), + }) } } diff --git a/crates/magpie-types/Cargo.toml b/crates/magpie-types/Cargo.toml index 12f347a..2a2b256 100644 --- a/crates/magpie-types/Cargo.toml +++ b/crates/magpie-types/Cargo.toml @@ -4,4 +4,6 @@ version = "0.1.0" edition = "2021" [dependencies] +byteorder = "1.4" serde = { version = "1.0", features = ["derive"] } +serde_json = "1" diff --git a/crates/magpie-types/src/lib.rs b/crates/magpie-types/src/lib.rs index 80c070d..e99be2a 100644 --- a/crates/magpie-types/src/lib.rs +++ b/crates/magpie-types/src/lib.rs @@ -1,4 +1,8 @@ -use serde::{Deserialize, Serialize}; +use std::collections::VecDeque; +use std::io::{Read, Write}; +use std::marker::PhantomData; + +use serde::{de::DeserializeOwned, Deserialize, Serialize}; /// An identifier for a Magpie panel. /// @@ -34,3 +38,99 @@ pub enum MagpieServerMsg { #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(tag = "kind")] pub enum MagpieClientMsg {} + +/// A [Messenger] specialized for Magpie clients. +pub type ClientMessenger = Messenger; + +/// A [Messenger] specialized for Magpie servers. +pub type ServerMessenger = Messenger; + +/// Bidirectional, transport-agnostic Magpie IO wrapper struct. +pub struct Messenger { + pub transport: T, + expected_len: Option, + received_buf: VecDeque, + received_queue: VecDeque, + closed: bool, + _output: PhantomData, +} + +impl Messenger { + pub fn new(transport: T) -> Self { + Self { + transport, + expected_len: None, + received_buf: Default::default(), + received_queue: Default::default(), + closed: false, + _output: PhantomData, + } + } + + pub fn is_closed(&self) -> bool { + self.closed + } + + pub fn send(&mut self, msg: &O) -> std::io::Result<()> { + use byteorder::{LittleEndian, WriteBytesExt}; + let payload = serde_json::to_vec(msg).unwrap(); + let len = payload.len() as u32; + self.transport.write_u32::(len)?; + self.transport.write_all(&payload)?; + Ok(()) + } + + /// Receives all pending messages and queues them for [recv]. + pub fn flush_recv(&mut self) -> std::io::Result<()> { + let mut buf = [0u8; 1024]; + + loop { + match self.transport.read(&mut buf) { + Ok(0) => { + self.closed = true; + break; + } + Ok(n) => { + self.received_buf.write(&buf[..n])?; + } + Err(ref err) if err.kind() == std::io::ErrorKind::WouldBlock => break, + Err(ref err) if err.kind() == std::io::ErrorKind::Interrupted => continue, + Err(err) => return Err(err), + } + } + + loop { + if let Some(expected_len) = self.expected_len { + if expected_len < self.received_buf.len() { + break; + } + + self.expected_len = None; + let mut buf = vec![0u8; expected_len]; + self.received_buf.read_exact(&mut buf)?; + match serde_json::from_slice::(&buf) { + Ok(received) => self.received_queue.push_front(received), + Err(e) => { + let kind = std::io::ErrorKind::InvalidData; + let payload = Box::new(e); + let error = std::io::Error::new(kind, payload); + return Err(error); + } + } + } else if self.received_buf.len() >= 4 { + use byteorder::{LittleEndian, ReadBytesExt}; + let expected_len = self.received_buf.read_u32::()?; + self.expected_len = Some(expected_len as usize); + } else { + break; + } + } + + Ok(()) + } + + /// Tries to receive a single input packet. + pub fn recv(&mut self) -> Option { + self.received_queue.pop_back() + } +}