Merge pull request 'Magpie' (#12) from magpie into main

Reviewed-on: #12
This commit is contained in:
mars 2022-10-30 06:30:00 +00:00
commit bf2b8360c0
12 changed files with 724 additions and 180 deletions

View File

@ -1,7 +1,10 @@
[workspace]
members = [
"apps/magpie",
"apps/music-player",
"apps/sandbox",
"crates/magpie-client",
"crates/magpie-types",
"crates/script",
"crates/types",
"scripts/music-player",

13
apps/magpie/Cargo.toml Normal file
View File

@ -0,0 +1,13 @@
[package]
name = "magpie"
version = "0.1.0"
edition = "2021"
[dependencies]
canary = { path = "../.." }
glium = "0.32"
magpie-types = { path = "../../crates/magpie-types" }
mio = { version = "0.8", features = ["net", "os-poll"] }
mio-signals = "0.2"
parking_lot = "0.12"
slab = "0.4"

102
apps/magpie/src/gl.rs Normal file
View File

@ -0,0 +1,102 @@
use canary::DrawCommand;
use glium::{glutin, Display, Surface};
#[derive(Copy, Clone)]
pub struct Vertex {
pub position: [f32; 2],
pub color: [u8; 4],
}
glium::implement_vertex!(Vertex, position normalize(false), color normalize(true));
impl From<&canary::MeshVertex> for Vertex {
fn from(v: &canary::MeshVertex) -> Self {
let (r, g, b, a) = v.color.to_rgba_unmultiplied();
Self {
position: [v.position.x, v.position.y],
color: [r, g, b, a],
}
}
}
const VERTEX_SHADER_SRC: &str = r#"
#version 330
in vec2 position;
in vec4 color;
out vec4 frag_color;
void main() {
gl_Position = vec4(position, 0.0, 1.0);
frag_color = color;
}
"#;
const FRAGMENT_SHADER_SRC: &str = r#"
#version 330
in vec4 frag_color;
out vec4 fb_color;
void main() {
fb_color = frag_color;
}
"#;
pub struct Graphics {
pub display: glium::Display,
pub program: glium::Program,
}
impl Graphics {
pub fn new(display: glium::Display) -> Self {
let program =
glium::Program::from_source(&display, VERTEX_SHADER_SRC, FRAGMENT_SHADER_SRC, None)
.unwrap();
Self { display, program }
}
pub fn draw(&mut self, commands: &[DrawCommand]) {
let mut joined_vs: Vec<Vertex> = Vec::new();
let mut joined_is = Vec::new();
for command in commands.iter() {
match command {
canary::DrawCommand::Mesh { vertices, indices } => {
let voff = joined_vs.len() as canary::MeshIndex;
joined_vs.extend(vertices.iter().map(Vertex::from));
joined_is.extend(indices.iter().map(|i| i + voff));
}
_ => unimplemented!(),
}
}
let vertex_buffer = glium::VertexBuffer::new(&self.display, &joined_vs).unwrap();
let index_buffer = glium::IndexBuffer::new(
&self.display,
glium::index::PrimitiveType::TrianglesList,
&joined_is,
)
.unwrap();
let params = glium::DrawParameters {
blend: glium::Blend::alpha_blending(),
..Default::default()
};
let mut target = self.display.draw();
target.clear_color(0.0, 0.0, 0.0, 1.0);
target
.draw(
&vertex_buffer,
&index_buffer,
&self.program,
&glium::uniforms::EmptyUniforms,
&params,
)
.unwrap();
target.finish().unwrap();
}
}

249
apps/magpie/src/ipc.rs Normal file
View File

@ -0,0 +1,249 @@
use std::collections::HashMap;
use std::ops::{Deref, DerefMut};
use std::path::{Path, PathBuf};
use std::sync::mpsc::{channel, Receiver, Sender};
use std::sync::Arc;
use std::time::Duration;
use magpie_types::{CreatePanel, MagpieServerMsg, ServerMessenger};
use mio::net::{UnixListener, UnixStream};
use mio::{Events, Interest, Poll, Token, Waker};
use mio_signals::{Signal, Signals};
use parking_lot::RwLock;
use slab::Slab;
use crate::window::{WindowMessage, WindowMessageSender};
const SOCK_NAME: &str = "magpie.sock";
pub enum IpcMessage {}
pub struct IpcMessageSender {
waker: Waker,
sender: Sender<IpcMessage>,
}
/// Wraps [mio::net::UnixListener] with automatic file deletion on drop.
pub struct Listener {
pub uds: UnixListener,
pub path: PathBuf,
}
impl Drop for Listener {
fn drop(&mut self) {
match std::fs::remove_file(&self.path) {
Ok(_) => {}
Err(e) => eprintln!("Could not delete UnixListener {:?}", e),
}
}
}
impl Deref for Listener {
type Target = UnixListener;
fn deref(&self) -> &UnixListener {
&self.uds
}
}
impl DerefMut for Listener {
fn deref_mut(&mut self) -> &mut UnixListener {
&mut self.uds
}
}
pub struct IpcData {
poll: Poll,
next_window_id: usize,
}
impl IpcData {
pub fn new_window_id(&mut self) -> usize {
let id = self.next_window_id;
self.next_window_id = self.next_window_id.wrapping_add(1);
id
}
}
pub struct Client {
data: Arc<RwLock<IpcData>>,
window_sender: WindowMessageSender,
messenger: ServerMessenger<UnixStream>,
token: Token,
id_to_window: HashMap<u32, usize>,
}
impl Drop for Client {
fn drop(&mut self) {
println!("Client #{} disconnected", self.token.0);
let data = self.data.write();
let _ = data
.poll
.registry()
.deregister(&mut self.messenger.transport);
for (_id, window) in self.id_to_window.drain() {
let msg = WindowMessage::CloseWindow { id: window };
let _ = self.window_sender.send_event(msg);
}
}
}
impl Client {
pub fn on_readable(&mut self) -> std::io::Result<bool> {
self.messenger.flush_recv()?;
while let Some(msg) = self.messenger.recv() {
println!("Client #{}: {:?}", self.token.0, msg);
match msg {
MagpieServerMsg::CreatePanel(CreatePanel { id, script }) => {
let window = self.data.write().new_window_id();
if let Some(old_id) = self.id_to_window.insert(id, window) {
let msg = WindowMessage::CloseWindow { id: old_id };
let _ = self.window_sender.send_event(msg);
}
let msg = WindowMessage::OpenWindow { id: window, script };
let _ = self.window_sender.send_event(msg);
}
_ => unimplemented!(),
}
}
Ok(self.messenger.is_closed())
}
}
pub struct Ipc {
pub data: Arc<RwLock<IpcData>>,
pub window_sender: WindowMessageSender,
pub message_recv: Receiver<IpcMessage>,
pub events: Events,
pub quit: bool,
pub listener: Listener,
pub signals: Signals,
pub listener_token: Token,
pub signals_token: Token,
pub message_recv_token: Token,
pub clients: Slab<Client>,
}
impl Ipc {
pub fn new(window_sender: WindowMessageSender) -> std::io::Result<(Self, IpcMessageSender)> {
let sock_dir = std::env::var("XDG_RUNTIME_DIR").expect("XDG_RUNTIME_DIR not set");
let sock_dir = Path::new(&sock_dir);
let sock_path = sock_dir.join(SOCK_NAME);
eprintln!("Making socket at: {:?}", sock_path);
let mut listener = Listener {
uds: UnixListener::bind(&sock_path)?,
path: sock_path.to_path_buf(),
};
let mut signals = Signals::new(Signal::Interrupt | Signal::Quit)?;
let events = Events::with_capacity(128);
let poll = Poll::new()?;
let listener_token = Token(usize::MAX);
let signals_token = Token(listener_token.0 - 1);
let message_recv_token = Token(signals_token.0 - 1);
let registry = poll.registry();
let interest = Interest::READABLE;
registry.register(&mut listener.uds, listener_token, interest)?;
registry.register(&mut signals, signals_token, interest)?;
let (sender, message_recv) = channel();
let sender = IpcMessageSender {
waker: Waker::new(registry, message_recv_token)?,
sender,
};
let data = IpcData {
poll,
next_window_id: 0,
};
let ipc = Self {
data: Arc::new(RwLock::new(data)),
window_sender,
message_recv,
events,
quit: false,
listener,
signals,
listener_token,
signals_token,
message_recv_token,
clients: Default::default(),
};
Ok((ipc, sender))
}
pub fn poll(&mut self, timeout: Option<Duration>) -> std::io::Result<()> {
self.data.write().poll.poll(&mut self.events, timeout)?;
for event in self.events.iter() {
if event.token() == self.listener_token {
loop {
match self.listener.accept() {
Ok((mut connection, address)) => {
let token = Token(self.clients.vacant_key());
println!(
"Accepting connection (Client #{}) from {:?}",
token.0, address
);
let interest = Interest::READABLE;
self.data.write().poll.registry().register(
&mut connection,
token,
interest,
)?;
self.clients.insert(Client {
messenger: ServerMessenger::new(connection),
token,
data: self.data.clone(),
window_sender: self.window_sender.clone(),
id_to_window: Default::default(),
});
}
Err(ref err) if err.kind() == std::io::ErrorKind::WouldBlock => break,
Err(err) => return Err(err),
}
}
} else if event.token() == self.signals_token {
while let Some(received) = self.signals.receive()? {
eprintln!("Received {:?} signal; exiting...", received);
let _ = self.window_sender.send_event(WindowMessage::Quit);
self.quit = true;
}
} else if let Some(client) = self.clients.get_mut(event.token().0) {
let disconnected = client.on_readable()?;
if disconnected {
self.clients.remove(event.token().0);
}
} else {
panic!("Unrecognized event token: {:?}", event);
}
}
Ok(())
}
pub fn run(mut self) {
while !self.quit {
let wait = Duration::from_millis(100);
match self.poll(Some(wait)) {
Ok(_) => {}
Err(e) => {
eprintln!("IPC poll error: {:?}", e);
}
}
}
}
}

14
apps/magpie/src/main.rs Normal file
View File

@ -0,0 +1,14 @@
use glium::glutin::event_loop::EventLoopBuilder;
pub mod gl;
pub mod ipc;
pub mod window;
fn main() -> std::io::Result<()> {
let event_loop = EventLoopBuilder::<window::WindowMessage>::with_user_event().build();
let window_sender = event_loop.create_proxy();
let (ipc, ipc_sender) = ipc::Ipc::new(window_sender)?;
let _ipc_thread = std::thread::spawn(|| ipc.run());
let window_store = window::WindowStore::new(ipc_sender);
window_store.run(event_loop)
}

130
apps/magpie/src/window.rs Normal file
View File

@ -0,0 +1,130 @@
use std::collections::HashMap;
use std::path::PathBuf;
use std::time::Instant;
use canary::{PanelId, ScriptAbiImpl, ScriptInstance, WasmtimeRuntime, WasmtimeScript};
use glium::backend::glutin::DisplayCreationError;
use glium::{glutin, Surface};
use glutin::event::{Event, WindowEvent};
use glutin::event_loop::{ControlFlow, EventLoop, EventLoopProxy, EventLoopWindowTarget};
use glutin::window::WindowId;
use crate::gl::Graphics;
use crate::ipc::{IpcMessage, IpcMessageSender};
pub enum WindowMessage {
OpenWindow { id: usize, script: PathBuf },
CloseWindow { id: usize },
Quit,
}
pub type WindowMessageSender = EventLoopProxy<WindowMessage>;
pub struct Window {
pub graphics: Graphics,
pub script: WasmtimeScript<ScriptAbiImpl>,
pub panel: PanelId,
pub last_update: Instant,
}
impl Window {
pub fn new(
script: WasmtimeScript<ScriptAbiImpl>,
panel: PanelId,
event_loop: &EventLoopWindowTarget<WindowMessage>,
) -> Result<Self, DisplayCreationError> {
let wb = glutin::window::WindowBuilder::new();
let cb = glutin::ContextBuilder::new();
let display = glium::Display::new(wb, cb, &event_loop)?;
let graphics = Graphics::new(display);
let last_update = Instant::now();
Ok(Self {
graphics,
script,
panel,
last_update,
})
}
pub fn get_id(&self) -> WindowId {
self.graphics.display.gl_window().window().id()
}
pub fn request_redraw(&mut self) {
self.graphics.display.gl_window().window().request_redraw();
}
pub fn update(&mut self) {
let now = Instant::now();
let dt = now.duration_since(self.last_update).as_secs_f32();
self.script.update(self.panel, dt);
}
pub fn draw(&mut self) {
self.script.draw(self.panel, |commands| {
self.graphics.draw(commands);
});
}
}
pub struct WindowStore {
pub ipc_sender: IpcMessageSender,
pub ipc_to_window: HashMap<usize, WindowId>,
pub windows: HashMap<WindowId, Window>,
pub runtime: WasmtimeRuntime,
}
impl WindowStore {
pub fn new(ipc_sender: IpcMessageSender) -> Self {
Self {
ipc_sender,
ipc_to_window: Default::default(),
windows: Default::default(),
runtime: WasmtimeRuntime::new().unwrap(),
}
}
pub fn run(mut self, event_loop: EventLoop<WindowMessage>) -> ! {
event_loop.run(move |event, event_loop, control_flow| match event {
Event::WindowEvent { window_id, event } => {
if let Some(window) = self.windows.get_mut(&window_id) {
match event {
WindowEvent::Resized(_) => window.request_redraw(),
_ => {}
}
}
}
Event::RedrawRequested(id) => {
if let Some(window) = self.windows.get_mut(&id) {
window.draw();
}
}
Event::MainEventsCleared => {
for (_id, window) in self.windows.iter_mut() {
window.update();
window.request_redraw();
}
}
Event::UserEvent(event) => match event {
WindowMessage::OpenWindow { id, script } => {
println!("Opening window {} with script {:?}", id, script);
let abi = Default::default();
let module = std::fs::read(script).unwrap();
let mut script = self.runtime.load_module(abi, &module).unwrap();
let panel = script.bind_panel(vec![]);
let window = Window::new(script, panel, &event_loop).unwrap();
let window_id = window.get_id();
self.windows.insert(window_id, window);
self.ipc_to_window.insert(id, window_id);
}
WindowMessage::CloseWindow { id } => {
if let Some(window_id) = self.ipc_to_window.remove(&id) {
self.windows.remove(&window_id);
}
}
WindowMessage::Quit => *control_flow = ControlFlow::Exit,
},
_ => {}
});
}
}

View File

@ -4,8 +4,6 @@ version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1"
canary = { path = "../../" }
glium = "0.32"
magpie-client = { path = "../../crates/magpie-client" }
mpris = "2.0.0-rc3"
ouroboros = "^0.15"

View File

@ -1,54 +1,8 @@
use canary::ScriptInstance;
use glium::{glutin, Display, Surface};
use glutin::event::{Event, WindowEvent};
use glutin::event_loop::{ControlFlow, EventLoop};
use magpie_client::{magpie_types, MagpieClient};
use magpie_types::{CreatePanel, MagpieServerMsg};
use mpris::{PlayerFinder, ProgressTracker};
use ouroboros::self_referencing;
#[derive(Copy, Clone)]
pub struct Vertex {
pub position: [f32; 2],
pub color: [u8; 4],
}
glium::implement_vertex!(Vertex, position normalize(false), color normalize(true));
impl From<&canary::MeshVertex> for Vertex {
fn from(v: &canary::MeshVertex) -> Self {
let (r, g, b, a) = v.color.to_rgba_unmultiplied();
Self {
position: [v.position.x, v.position.y],
color: [r, g, b, a],
}
}
}
const VERTEX_SHADER_SRC: &str = r#"
#version 330
in vec2 position;
in vec4 color;
out vec4 frag_color;
void main() {
gl_Position = vec4(position, 0.0, 1.0);
frag_color = color;
}
"#;
const FRAGMENT_SHADER_SRC: &str = r#"
#version 330
in vec4 frag_color;
out vec4 fb_color;
void main() {
fb_color = frag_color;
}
"#;
#[self_referencing]
pub struct Player {
player: mpris::Player,
@ -60,6 +14,7 @@ pub struct Player {
impl Player {
pub fn from_player(player: mpris::Player) -> Self {
println!("Connected to {}", player.identity());
PlayerBuilder {
player,
tracker_builder: |player| player.track_progress(100).unwrap(),
@ -72,10 +27,14 @@ impl Player {
let tick = tracker.tick();
if tick.progress_changed {
let p = tick.progress;
println!(
"{:?}/{:?}",
tick.progress.position(),
tick.progress.length()
"({:?}/{:?}) {:?} - {:?} - {:?}",
p.position(),
p.length(),
p.metadata().artists(),
p.metadata().album_name(),
p.metadata().title(),
);
}
@ -88,109 +47,6 @@ impl Player {
}
}
pub struct App {
display: Display,
program: glium::Program,
player_finder: PlayerFinder,
player: Option<Player>,
script: canary::WasmtimeScript<canary::ScriptAbiImpl>,
panel: canary::PanelId,
}
impl App {
pub fn new(module_path: &str, display: Display) -> Self {
let program =
glium::Program::from_source(&display, VERTEX_SHADER_SRC, FRAGMENT_SHADER_SRC, None)
.unwrap();
let runtime = canary::WasmtimeRuntime::new().unwrap();
let abi = Default::default();
let module = std::fs::read(module_path).unwrap();
let mut script = runtime.load_module(abi, &module).unwrap();
let panel = script.bind_panel(vec![]);
Self {
display,
program,
player_finder: PlayerFinder::new().expect("Could not connect to D-Bus"),
player: None,
script,
panel,
}
}
pub fn update(&mut self) {
self.display.gl_window().window().request_redraw();
/*let disconnected = if let Some(player) = self.player.as_mut() {
player.tick()
} else {
match self.player_finder.find_active() {
Ok(player) => {
println!("Connected to MPRIS");
self.player = Some(Player::from_player(player));
}
Err(e) => eprintln!("PlayerFinder: {:?}", e),
}
false
};
if disconnected {
println!("Disconnected from MPRIS");
self.player = None;
}*/
self.script.update(self.panel, 0.01667);
}
pub fn draw(&mut self) {
let mut target = self.display.draw();
target.clear_color(0.0, 0.0, 0.0, 0.0);
self.script.draw(self.panel, |commands| {
let mut joined_vs: Vec<Vertex> = Vec::new();
let mut joined_is = Vec::new();
for command in commands.iter() {
match command {
canary::DrawCommand::Mesh { vertices, indices } => {
let voff = joined_vs.len() as canary::MeshIndex;
joined_vs.extend(vertices.iter().map(Vertex::from));
joined_is.extend(indices.iter().map(|i| i + voff));
}
_ => unimplemented!(),
}
}
let vertex_buffer = glium::VertexBuffer::new(&self.display, &joined_vs).unwrap();
let index_buffer = glium::IndexBuffer::new(
&self.display,
glium::index::PrimitiveType::TrianglesList,
&joined_is,
)
.unwrap();
let params = glium::DrawParameters {
blend: glium::Blend::alpha_blending(),
..Default::default()
};
target
.draw(
&vertex_buffer,
&index_buffer,
&self.program,
&glium::uniforms::EmptyUniforms,
&params,
)
.unwrap();
});
target.finish().unwrap();
}
}
fn main() {
let args: Vec<String> = std::env::args().collect();
let module_path = args
@ -198,30 +54,29 @@ fn main() {
.expect("Please pass a path to a Canary script!")
.to_owned();
let event_loop = EventLoop::new();
let wb = glutin::window::WindowBuilder::new();
let cb = glutin::ContextBuilder::new();
let display = glium::Display::new(wb, cb, &event_loop).unwrap();
let player_finder = PlayerFinder::new().expect("Could not connect to D-Bus");
let mut app = App::new(&module_path, display);
let mut magpie = MagpieClient::new().unwrap();
let script = std::path::PathBuf::from(&module_path);
let msg = CreatePanel { id: 0, script };
let msg = MagpieServerMsg::CreatePanel(msg);
magpie.messenger.send(&msg).unwrap();
event_loop.run(move |ev, _, control_flow| {
*control_flow = ControlFlow::Poll;
match ev {
Event::WindowEvent { event, .. } => match event {
WindowEvent::CloseRequested => {
*control_flow = ControlFlow::Exit;
return;
}
_ => return,
},
Event::MainEventsCleared => {
app.update();
loop {
println!("Connecting to MRPIS...");
let mut player = match player_finder.find_active() {
Ok(player) => Player::from_player(player),
Err(err) => {
eprintln!("Couldn't find player: {:?}", err);
let wait = std::time::Duration::from_secs(1);
std::thread::sleep(wait);
continue;
}
Event::RedrawRequested(_) => {
app.draw();
}
_ => {}
}
});
};
while !player.tick() {}
println!("Disconnected from MPRIS");
}
}

View File

@ -0,0 +1,7 @@
[package]
name = "magpie-client"
version = "0.1.0"
edition = "2021"
[dependencies]
magpie-types = { path = "../magpie-types" }

View File

@ -0,0 +1,26 @@
pub use magpie_types;
use std::os::unix::net::UnixStream;
use std::path::Path;
use magpie_types::ClientMessenger;
/// The name of the Magpie server socket.
pub const MAGPIE_SOCK: &str = "magpie.sock";
/// A client to a Magpie server.
pub struct MagpieClient {
pub messenger: ClientMessenger<UnixStream>,
}
impl MagpieClient {
pub fn new() -> std::io::Result<Self> {
let sock_dir = std::env::var("XDG_RUNTIME_DIR").expect("XDG_RUNTIME_DIR not set");
let sock_dir = Path::new(&sock_dir);
let sock_path = sock_dir.join(MAGPIE_SOCK);
let socket = UnixStream::connect(sock_path)?;
Ok(Self {
messenger: ClientMessenger::new(socket),
})
}
}

View File

@ -0,0 +1,9 @@
[package]
name = "magpie-types"
version = "0.1.0"
edition = "2021"
[dependencies]
byteorder = "1.4"
serde = { version = "1.0", features = ["derive"] }
serde_json = "1"

View File

@ -0,0 +1,138 @@
use std::collections::VecDeque;
use std::io::{Read, Write};
use std::marker::PhantomData;
use std::path::PathBuf;
use serde::{de::DeserializeOwned, Deserialize, Serialize};
/// An identifier for a Magpie panel.
///
/// Only valid on a connection between a single client and its server. Clients
/// are allowed to use arbitrary values for [PanelId].
pub type PanelId = u32;
/// Creates a new Magpie panel with a given ID.
///
/// If the given [PanelId] is already being used on this connection, the server
/// will delete the old panel using that [PanelId].
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct CreatePanel {
pub id: PanelId,
pub script: PathBuf,
}
/// Sends a panel a message.
#[derive(Clone, Debug, Deserialize, Serialize)]
pub struct SendMessage {
pub id: PanelId,
pub msg: Vec<u8>,
}
/// A message sent from a Magpie client to the server.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(tag = "kind")]
pub enum MagpieServerMsg {
CreatePanel(CreatePanel),
SendMessage(SendMessage),
}
/// A message sent from the Magpie server to a client.
#[derive(Clone, Debug, Deserialize, Serialize)]
#[serde(tag = "kind")]
pub enum MagpieClientMsg {}
/// A [Messenger] specialized for Magpie clients.
pub type ClientMessenger<T> = Messenger<T, MagpieClientMsg, MagpieServerMsg>;
/// A [Messenger] specialized for Magpie servers.
pub type ServerMessenger<T> = Messenger<T, MagpieServerMsg, MagpieClientMsg>;
/// Bidirectional, transport-agnostic Magpie IO wrapper struct.
pub struct Messenger<T, I, O> {
pub transport: T,
expected_len: Option<usize>,
received_buf: VecDeque<u8>,
received_queue: VecDeque<I>,
closed: bool,
_output: PhantomData<O>,
}
impl<T: Read + Write, I: DeserializeOwned, O: Serialize> Messenger<T, I, O> {
pub fn new(transport: T) -> Self {
Self {
transport,
expected_len: None,
received_buf: Default::default(),
received_queue: Default::default(),
closed: false,
_output: PhantomData,
}
}
pub fn is_closed(&self) -> bool {
self.closed
}
pub fn send(&mut self, msg: &O) -> std::io::Result<()> {
use byteorder::{LittleEndian, WriteBytesExt};
let payload = serde_json::to_vec(msg).unwrap();
let len = payload.len() as u32;
self.transport.write_u32::<LittleEndian>(len)?;
self.transport.write_all(&payload)?;
Ok(())
}
/// Receives all pending messages and queues them for [recv].
pub fn flush_recv(&mut self) -> std::io::Result<()> {
let mut buf = [0u8; 1024];
loop {
match self.transport.read(&mut buf) {
Ok(0) => {
self.closed = true;
break;
}
Ok(n) => {
self.received_buf.write(&buf[..n])?;
}
Err(ref err) if err.kind() == std::io::ErrorKind::WouldBlock => break,
Err(ref err) if err.kind() == std::io::ErrorKind::Interrupted => continue,
Err(err) => return Err(err),
}
}
loop {
if let Some(expected_len) = self.expected_len {
if expected_len < self.received_buf.len() {
break;
}
self.expected_len = None;
let mut buf = vec![0u8; expected_len];
self.received_buf.read_exact(&mut buf)?;
match serde_json::from_slice::<I>(&buf) {
Ok(received) => self.received_queue.push_front(received),
Err(e) => {
let kind = std::io::ErrorKind::InvalidData;
let payload = Box::new(e);
let error = std::io::Error::new(kind, payload);
return Err(error);
}
}
} else if self.received_buf.len() >= 4 {
use byteorder::{LittleEndian, ReadBytesExt};
let expected_len = self.received_buf.read_u32::<LittleEndian>()?;
self.expected_len = Some(expected_len as usize);
} else {
break;
}
}
Ok(())
}
/// Tries to receive a single input packet.
pub fn recv(&mut self) -> Option<I> {
self.received_queue.pop_back()
}
}