Add Messenger struct
This commit is contained in:
parent
4e46832cd0
commit
63691ebc28
|
@ -5,6 +5,7 @@ use std::str::from_utf8;
|
||||||
use std::sync::mpsc::{channel, Receiver, Sender};
|
use std::sync::mpsc::{channel, Receiver, Sender};
|
||||||
use std::time::Duration;
|
use std::time::Duration;
|
||||||
|
|
||||||
|
use magpie_types::ServerMessenger;
|
||||||
use mio::net::{UnixListener, UnixStream};
|
use mio::net::{UnixListener, UnixStream};
|
||||||
use mio::{Events, Interest, Poll, Token, Waker};
|
use mio::{Events, Interest, Poll, Token, Waker};
|
||||||
use mio_signals::{Signal, Signals};
|
use mio_signals::{Signal, Signals};
|
||||||
|
@ -51,47 +52,26 @@ impl DerefMut for Listener {
|
||||||
}
|
}
|
||||||
|
|
||||||
pub struct Client {
|
pub struct Client {
|
||||||
connection: UnixStream,
|
messenger: ServerMessenger<UnixStream>,
|
||||||
|
token: Token,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl Client {
|
impl Client {
|
||||||
pub fn new(connection: UnixStream) -> Self {
|
pub fn new(connection: UnixStream, token: Token) -> Self {
|
||||||
Self { connection }
|
Self {
|
||||||
|
messenger: ServerMessenger::new(connection),
|
||||||
|
token,
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
pub fn on_readable(&mut self) -> std::io::Result<bool> {
|
pub fn on_readable(&mut self) -> std::io::Result<bool> {
|
||||||
let mut connection_closed = false;
|
self.messenger.flush_recv()?;
|
||||||
let mut received_data = vec![0; 4096];
|
|
||||||
let mut bytes_read = 0;
|
|
||||||
|
|
||||||
loop {
|
while let Some(msg) = self.messenger.recv() {
|
||||||
match self.connection.read(&mut received_data[bytes_read..]) {
|
println!("Client #{}: {:?}", self.token.0, msg);
|
||||||
Ok(0) => {
|
|
||||||
connection_closed = true;
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
Ok(n) => {
|
|
||||||
bytes_read += n;
|
|
||||||
if bytes_read >= received_data.len() {
|
|
||||||
received_data.resize(received_data.len() + 1024, 0);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
Err(ref err) if err.kind() == std::io::ErrorKind::WouldBlock => break,
|
|
||||||
Err(ref err) if err.kind() == std::io::ErrorKind::Interrupted => continue,
|
|
||||||
Err(err) => return Err(err),
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
if bytes_read > 0 {
|
Ok(self.messenger.is_closed())
|
||||||
let received_data = &received_data[..bytes_read];
|
|
||||||
if let Ok(str_buf) = from_utf8(received_data) {
|
|
||||||
println!("Received data: {}", str_buf.trim_end());
|
|
||||||
} else {
|
|
||||||
println!("Received (non-UTF-8) data: {:?}", received_data);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
Ok(connection_closed)
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -165,19 +145,19 @@ impl Ipc {
|
||||||
if event.token() == self.listener_token {
|
if event.token() == self.listener_token {
|
||||||
loop {
|
loop {
|
||||||
match self.listener.accept() {
|
match self.listener.accept() {
|
||||||
Ok((connection, address)) => {
|
Ok((mut connection, address)) => {
|
||||||
let token = Token(self.clients.vacant_key());
|
let token = Token(self.clients.vacant_key());
|
||||||
println!(
|
println!(
|
||||||
"Accepting connection (Client #{}) from {:?}",
|
"Accepting connection (Client #{}) from {:?}",
|
||||||
token.0, address
|
token.0, address
|
||||||
);
|
);
|
||||||
let mut client = Client::new(connection);
|
|
||||||
let interest = Interest::READABLE;
|
let interest = Interest::READABLE;
|
||||||
self.poll.registry().register(
|
self.poll
|
||||||
&mut client.connection,
|
.registry()
|
||||||
token,
|
.register(&mut connection, token, interest)?;
|
||||||
interest,
|
|
||||||
)?;
|
let client = Client::new(connection, token);
|
||||||
self.clients.insert(client);
|
self.clients.insert(client);
|
||||||
}
|
}
|
||||||
Err(ref err) if err.kind() == std::io::ErrorKind::WouldBlock => break,
|
Err(ref err) if err.kind() == std::io::ErrorKind::WouldBlock => break,
|
||||||
|
@ -195,7 +175,7 @@ impl Ipc {
|
||||||
if disconnected {
|
if disconnected {
|
||||||
println!("Client #{} disconnected", event.token().0);
|
println!("Client #{} disconnected", event.token().0);
|
||||||
let mut client = self.clients.remove(event.token().0);
|
let mut client = self.clients.remove(event.token().0);
|
||||||
self.poll.registry().deregister(&mut client.connection)?;
|
self.poll.registry().deregister(&mut client.messenger.transport)?;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
panic!("Unrecognized event token: {:?}", event);
|
panic!("Unrecognized event token: {:?}", event);
|
||||||
|
|
|
@ -5,4 +5,3 @@ edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
magpie-types = { path = "../magpie-types" }
|
magpie-types = { path = "../magpie-types" }
|
||||||
serde_json = "1"
|
|
||||||
|
|
|
@ -1,15 +1,16 @@
|
||||||
use std::io::Write;
|
pub use magpie_types;
|
||||||
|
|
||||||
use std::os::unix::net::UnixStream;
|
use std::os::unix::net::UnixStream;
|
||||||
use std::path::Path;
|
use std::path::Path;
|
||||||
|
|
||||||
use magpie_types::MagpieServerMsg;
|
use magpie_types::ClientMessenger;
|
||||||
|
|
||||||
/// The name of the Magpie server socket.
|
/// The name of the Magpie server socket.
|
||||||
pub const MAGPIE_SOCK: &str = "magpie.sock";
|
pub const MAGPIE_SOCK: &str = "magpie.sock";
|
||||||
|
|
||||||
/// A client to a Magpie server.
|
/// A client to a Magpie server.
|
||||||
pub struct MagpieClient {
|
pub struct MagpieClient {
|
||||||
socket: UnixStream,
|
pub messenger: ClientMessenger<UnixStream>,
|
||||||
}
|
}
|
||||||
|
|
||||||
impl MagpieClient {
|
impl MagpieClient {
|
||||||
|
@ -18,12 +19,8 @@ impl MagpieClient {
|
||||||
let sock_dir = Path::new(&sock_dir);
|
let sock_dir = Path::new(&sock_dir);
|
||||||
let sock_path = sock_dir.join(MAGPIE_SOCK);
|
let sock_path = sock_dir.join(MAGPIE_SOCK);
|
||||||
let socket = UnixStream::connect(sock_path)?;
|
let socket = UnixStream::connect(sock_path)?;
|
||||||
Ok(Self { socket })
|
Ok(Self {
|
||||||
}
|
messenger: ClientMessenger::new(socket),
|
||||||
|
})
|
||||||
pub fn send_msg(&mut self, msg: &MagpieServerMsg) -> std::io::Result<()> {
|
|
||||||
let bytes = serde_json::to_vec(msg).unwrap();
|
|
||||||
self.socket.write_all(&bytes)?;
|
|
||||||
Ok(())
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
|
@ -4,4 +4,6 @@ version = "0.1.0"
|
||||||
edition = "2021"
|
edition = "2021"
|
||||||
|
|
||||||
[dependencies]
|
[dependencies]
|
||||||
|
byteorder = "1.4"
|
||||||
serde = { version = "1.0", features = ["derive"] }
|
serde = { version = "1.0", features = ["derive"] }
|
||||||
|
serde_json = "1"
|
||||||
|
|
|
@ -1,4 +1,8 @@
|
||||||
use serde::{Deserialize, Serialize};
|
use std::collections::VecDeque;
|
||||||
|
use std::io::{Read, Write};
|
||||||
|
use std::marker::PhantomData;
|
||||||
|
|
||||||
|
use serde::{de::DeserializeOwned, Deserialize, Serialize};
|
||||||
|
|
||||||
/// An identifier for a Magpie panel.
|
/// An identifier for a Magpie panel.
|
||||||
///
|
///
|
||||||
|
@ -34,3 +38,99 @@ pub enum MagpieServerMsg {
|
||||||
#[derive(Clone, Debug, Deserialize, Serialize)]
|
#[derive(Clone, Debug, Deserialize, Serialize)]
|
||||||
#[serde(tag = "kind")]
|
#[serde(tag = "kind")]
|
||||||
pub enum MagpieClientMsg {}
|
pub enum MagpieClientMsg {}
|
||||||
|
|
||||||
|
/// A [Messenger] specialized for Magpie clients.
|
||||||
|
pub type ClientMessenger<T> = Messenger<T, MagpieClientMsg, MagpieServerMsg>;
|
||||||
|
|
||||||
|
/// A [Messenger] specialized for Magpie servers.
|
||||||
|
pub type ServerMessenger<T> = Messenger<T, MagpieServerMsg, MagpieClientMsg>;
|
||||||
|
|
||||||
|
/// Bidirectional, transport-agnostic Magpie IO wrapper struct.
|
||||||
|
pub struct Messenger<T, I, O> {
|
||||||
|
pub transport: T,
|
||||||
|
expected_len: Option<usize>,
|
||||||
|
received_buf: VecDeque<u8>,
|
||||||
|
received_queue: VecDeque<I>,
|
||||||
|
closed: bool,
|
||||||
|
_output: PhantomData<O>,
|
||||||
|
}
|
||||||
|
|
||||||
|
impl<T: Read + Write, I: DeserializeOwned, O: Serialize> Messenger<T, I, O> {
|
||||||
|
pub fn new(transport: T) -> Self {
|
||||||
|
Self {
|
||||||
|
transport,
|
||||||
|
expected_len: None,
|
||||||
|
received_buf: Default::default(),
|
||||||
|
received_queue: Default::default(),
|
||||||
|
closed: false,
|
||||||
|
_output: PhantomData,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn is_closed(&self) -> bool {
|
||||||
|
self.closed
|
||||||
|
}
|
||||||
|
|
||||||
|
pub fn send(&mut self, msg: &O) -> std::io::Result<()> {
|
||||||
|
use byteorder::{LittleEndian, WriteBytesExt};
|
||||||
|
let payload = serde_json::to_vec(msg).unwrap();
|
||||||
|
let len = payload.len() as u32;
|
||||||
|
self.transport.write_u32::<LittleEndian>(len)?;
|
||||||
|
self.transport.write_all(&payload)?;
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Receives all pending messages and queues them for [recv].
|
||||||
|
pub fn flush_recv(&mut self) -> std::io::Result<()> {
|
||||||
|
let mut buf = [0u8; 1024];
|
||||||
|
|
||||||
|
loop {
|
||||||
|
match self.transport.read(&mut buf) {
|
||||||
|
Ok(0) => {
|
||||||
|
self.closed = true;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
Ok(n) => {
|
||||||
|
self.received_buf.write(&buf[..n])?;
|
||||||
|
}
|
||||||
|
Err(ref err) if err.kind() == std::io::ErrorKind::WouldBlock => break,
|
||||||
|
Err(ref err) if err.kind() == std::io::ErrorKind::Interrupted => continue,
|
||||||
|
Err(err) => return Err(err),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
loop {
|
||||||
|
if let Some(expected_len) = self.expected_len {
|
||||||
|
if expected_len < self.received_buf.len() {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
self.expected_len = None;
|
||||||
|
let mut buf = vec![0u8; expected_len];
|
||||||
|
self.received_buf.read_exact(&mut buf)?;
|
||||||
|
match serde_json::from_slice::<I>(&buf) {
|
||||||
|
Ok(received) => self.received_queue.push_front(received),
|
||||||
|
Err(e) => {
|
||||||
|
let kind = std::io::ErrorKind::InvalidData;
|
||||||
|
let payload = Box::new(e);
|
||||||
|
let error = std::io::Error::new(kind, payload);
|
||||||
|
return Err(error);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} else if self.received_buf.len() >= 4 {
|
||||||
|
use byteorder::{LittleEndian, ReadBytesExt};
|
||||||
|
let expected_len = self.received_buf.read_u32::<LittleEndian>()?;
|
||||||
|
self.expected_len = Some(expected_len as usize);
|
||||||
|
} else {
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
Ok(())
|
||||||
|
}
|
||||||
|
|
||||||
|
/// Tries to receive a single input packet.
|
||||||
|
pub fn recv(&mut self) -> Option<I> {
|
||||||
|
self.received_queue.pop_back()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
Loading…
Reference in New Issue