// Copyright (c) 2022 Marceline Cramer // SPDX-License-Identifier: AGPL-3.0-or-later use std::collections::VecDeque; use std::io::{Read, Write}; use std::marker::PhantomData; use std::path::PathBuf; use serde::{de::DeserializeOwned, Deserialize, Serialize}; /// The name of the Magpie server socket. pub const MAGPIE_SOCK: &str = "magpie.sock"; /// An identifier for a Magpie panel. /// /// Only valid on a connection between a single client and its server. Clients /// are allowed to use arbitrary values for [PanelId]. pub type PanelId = u32; /// Creates a new Magpie panel with a given ID. /// /// If the given [PanelId] is already being used on this connection, the server /// will delete the old panel using that [PanelId]. #[derive(Clone, Debug, Deserialize, Serialize)] pub struct CreatePanel { pub id: PanelId, pub protocol: String, pub script: PathBuf, } /// Sends a panel a message. #[derive(Clone, Debug, Deserialize, Serialize)] pub struct SendMessage { pub id: PanelId, pub msg: Vec, } /// A message sent from a Magpie client to the server. #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(tag = "kind")] pub enum MagpieServerMsg { CreatePanel(CreatePanel), SendMessage(SendMessage), } /// A message sent from a script's panel to a client. #[derive(Clone, Debug, Deserialize, Serialize)] pub struct RecvMessage { pub id: PanelId, pub msg: Vec, } /// A message sent from the Magpie server to a client. #[derive(Clone, Debug, Deserialize, Serialize)] #[serde(tag = "kind")] pub enum MagpieClientMsg { RecvMessage(RecvMessage), } /// A [Messenger] specialized for Magpie clients. pub type ClientMessenger = Messenger; impl ClientMessenger { pub fn send_panel_json(&mut self, id: PanelId, msg: &O) { let msg = serde_json::to_string(msg).unwrap(); eprintln!("Sending message: {:?}", msg); let _ = self.send(&MagpieServerMsg::SendMessage(SendMessage { id, msg: msg.into_bytes(), })); } } /// A [Messenger] specialized for Magpie servers. pub type ServerMessenger = Messenger; /// Piecewise packet assembler for [Messenger]. pub struct MessageQueue { expected_len: Option, received_buf: VecDeque, received_queue: VecDeque, } impl Default for MessageQueue { fn default() -> Self { Self { expected_len: None, received_buf: Default::default(), received_queue: Default::default(), } } } impl MessageQueue { pub fn on_data(&mut self, data: &[u8]) -> std::io::Result<()> { self.received_buf.write_all(data)?; loop { if let Some(expected_len) = self.expected_len { if self.received_buf.len() < expected_len { break; } self.expected_len = None; let mut buf = vec![0u8; expected_len]; self.received_buf.read_exact(&mut buf)?; match serde_json::from_slice::(&buf) { Ok(received) => self.received_queue.push_front(received), Err(e) => { let kind = std::io::ErrorKind::InvalidData; let payload = Box::new(e); let error = std::io::Error::new(kind, payload); return Err(error); } } } else if self.received_buf.len() >= 4 { use byteorder::{LittleEndian, ReadBytesExt}; let expected_len = self.received_buf.read_u32::()?; self.expected_len = Some(expected_len as usize); } else { break; } } Ok(()) } pub fn recv(&mut self) -> Option { self.received_queue.pop_back() } } /// Bidirectional, transport-agnostic Magpie IO wrapper struct. pub struct Messenger { transport: T, queue: MessageQueue, closed: bool, _output: PhantomData, } impl Messenger { pub fn new(transport: T) -> Self { Self { transport, queue: Default::default(), closed: false, _output: PhantomData, } } pub fn is_closed(&self) -> bool { self.closed } /// Destroys this messenger and returns the inner transport. pub fn into_transport(self) -> T { self.transport } } impl Messenger { pub fn send(&mut self, msg: &O) -> std::io::Result<()> { use byteorder::{LittleEndian, WriteBytesExt}; let payload = serde_json::to_vec(msg).unwrap(); let len = payload.len() as u32; self.transport.write_u32::(len)?; self.transport.write_all(&payload)?; self.transport.flush()?; Ok(()) } } impl Messenger { /// Synchronously receives all pending messages and queues them for [recv]. /// /// This function only works if the transport is in non-blocking mode. /// Otherwise, this may block while waiting for more data, even if the /// data it receives does not add up to a full message. pub fn flush_recv(&mut self) -> std::io::Result<()> { let mut buf = [0u8; 1024]; loop { match self.transport.read(&mut buf) { Ok(0) => { self.closed = true; break; } Err(ref err) if err.kind() == std::io::ErrorKind::ConnectionReset => { self.closed = true; break; } Ok(n) => { self.queue.on_data(&buf[..n])?; } Err(ref err) if err.kind() == std::io::ErrorKind::WouldBlock => break, Err(ref err) if err.kind() == std::io::ErrorKind::Interrupted => continue, Err(err) => return Err(err), } } Ok(()) } /// Tries to receive a single input packet. /// /// For messages to be received here, [flush_recv] must be called to /// continuously read pending data from the transport. pub fn try_recv(&mut self) -> Option { self.queue.recv() } } #[cfg(feature = "async")] mod async_messages { use super::*; use futures_util::{AsyncReadExt, AsyncWriteExt}; use std::marker::Unpin; impl ClientMessenger { pub async fn send_panel_json_async(&mut self, id: PanelId, msg: &O) { let msg = serde_json::to_string(msg).unwrap(); eprintln!("Sending message: {:?}", msg); let _ = self .send_async(&MagpieServerMsg::SendMessage(SendMessage { id, msg: msg.into_bytes(), })) .await; } } impl Messenger { pub async fn send_async(&mut self, msg: &O) -> std::io::Result<()> { use byteorder::{LittleEndian, WriteBytesExt}; let payload = serde_json::to_vec(msg).unwrap(); let len = payload.len() as u32; let mut msg = Vec::with_capacity(4 + payload.len()); msg.write_u32::(len)?; msg.extend_from_slice(&payload); self.transport.write_all(&msg).await?; self.transport.flush().await?; Ok(()) } } impl Messenger { pub async fn recv(&mut self) -> std::io::Result { let mut buf = [0u8; 1024]; loop { if let Some(msg) = self.queue.recv() { return Ok(msg); } let num = self.transport.read(&mut buf).await?; self.queue.on_data(&buf[..num])?; } } } }