diff --git a/Cargo.toml b/Cargo.toml index f48ad87..ba1c06f 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,4 +8,6 @@ repository="https://git.yukiri.dev/Utility/bus-rs/" publish = false [dependencies] +tokio = { version = "1.41.1", features = ["full"] } + pool = { git = "https://git.tsukiyo.org/Utility/pool" } diff --git a/src/lib.rs b/src/lib.rs index 3c4e79e..426a445 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -1,10 +1,11 @@ -//use std::sync::mpsc::TryRecvError; -use std::sync::mpsc; +use tokio::sync::mpsc; mod packet; use packet::{Packet, QueryPacket}; pub use packet::{Query, Response, Mode}; mod signal; pub use signal::Signal; mod manager; use manager::{Manager, Client, manager_thread}; +const BUFFER_SIZE :usize = 32; + pub struct Bus where T: 'static + Clone + Send { sender:Option>>, receiver:Option>>, @@ -17,17 +18,14 @@ impl Bus { { let mut manager = Manager::::new(); let cq_sender = manager.query_sender.clone(); - let (cr_sender, cr_receiver) = mpsc::channel::>(); + let (cr_sender, cr_receiver) = mpsc::channel::>(BUFFER_SIZE); let id = manager.senders.add(Client { packet:None, query:cr_sender, signal:None, }); - std::thread::spawn(move || { - let mut manager = manager; - manager_thread(&mut manager); - }); + tokio::spawn(manager_thread(manager)); Self { sender:None, @@ -55,19 +53,16 @@ impl Bus { let mut manager = Manager::::new(); let sender = manager.sender.clone(); - let (c_sender, c_receiver) = mpsc::channel::>(); + let (c_sender, c_receiver) = mpsc::channel::>(BUFFER_SIZE); let cq_sender = manager.query_sender.clone(); - let (cr_sender, cr_receiver) = mpsc::channel::>(); + let (cr_sender, cr_receiver) = mpsc::channel::>(BUFFER_SIZE); let id = manager.senders.add(Client { packet:if transmitter { Some(c_sender) } else { None }, query:cr_sender, signal:None, }); - std::thread::spawn(move || { - let mut manager = manager; - manager_thread(&mut manager); - }); + tokio::spawn(manager_thread(manager)); Self { sender:if transmitter { Some(sender) } else { None }, @@ -78,17 +73,17 @@ impl Bus { } } - pub fn connect(&self) -> Result,()> + pub async fn connect(&mut self) -> Result,()> { - self.connect_as(Mode::Transceiver) + self.connect_as(Mode::Transceiver).await } - pub fn connect_as(&self, mode:Mode) -> Result,()> + pub async fn connect_as(&mut self, mode:Mode) -> Result,()> { - if self.query.send(QueryPacket::new(self.id, Query::Connect(mode))).is_ok() { - let recv = self.response.recv(); + if self.query.send(QueryPacket::new(self.id, Query::Connect(mode))).await.is_ok() { + let recv = self.response.recv().await; match recv { - Ok(response) => { + Some(response) => { match response { Response::::Connect(bus) => { return Ok(bus); } _ => { } @@ -106,16 +101,16 @@ impl Bus { } - pub fn signal(&self, signal:Option<&Signal>) -> std::result::Result<(),()> + pub async fn signal(&mut self, signal:Option<&Signal>) -> std::result::Result<(),()> { let sender :Option> = match signal { Some(signal) => Some(signal.register()), None => None, }; - if self.query.send(QueryPacket::new(self.id, Query::Signal(sender))).is_ok() { - let recv = self.response.recv(); + if self.query.send(QueryPacket::new(self.id, Query::Signal(sender))).await.is_ok() { + let recv = self.response.recv().await; match recv { - Ok(response) => { + Some(response) => { match response { Response::::Ok => { return Ok(()); } _ => { } @@ -127,13 +122,13 @@ impl Bus { Err(()) } - pub fn send_from(&self, from:u32, to:u32, data:T) -> std::result::Result<(),()> + pub async fn send_from(&self, from:u32, to:u32, data:T) -> std::result::Result<(),()> // Send a packet to the bus. { match &self.sender { Some(transmitter) => { - if transmitter.send(Packet::::new(from, to, data)).is_ok() { - if self.query.send(QueryPacket::new(from, Query::Notify)).is_ok() { + if transmitter.send(Packet::::new(from, to, data)).await.is_ok() { + if self.query.send(QueryPacket::new(from, Query::Notify)).await.is_ok() { return Ok(()); } } @@ -143,16 +138,16 @@ impl Bus { Err(()) } - pub fn send(&self, to:u32, data:T) -> std::result::Result<(),()> + pub async fn send(&self, to:u32, data:T) -> std::result::Result<(),()> // Send a packet to the bus. { - return self.send_from(self.id, to, data); + return self.send_from(self.id, to, data).await; } - pub fn receive(&self) -> Option> + pub fn receive(&mut self) -> Option> // Return the next packet in the queue. { - return match &self.receiver { + return match &mut self.receiver { Some(receiver) => { let result = receiver.try_recv(); match result { @@ -164,26 +159,26 @@ impl Bus { }; } - pub fn receive_wait(&self) -> Option> + pub async fn receive_wait(&mut self) -> Option> // Return the next packet in the queue. { - return match &self.receiver { + return match &mut self.receiver { Some(receiver) => { - let result = receiver.recv(); + let result = receiver.recv().await; match result { - Ok(result) => Some(result), - Err(_) => None, + Some(result) => Some(result), + _ => None, } } None => None, }; } - pub fn receive_all(&self) -> Vec> + pub fn receive_all(&mut self) -> Vec> // Return all packets in the queue. { let mut results :Vec> = vec![]; - match &self.receiver { + match &mut self.receiver { Some(receiver) => { loop { let result = receiver.try_recv(); @@ -200,13 +195,13 @@ impl Bus { return results; } - pub fn set_mailbox(&self, id:u32) -> std::result::Result<(),()> + pub async fn set_mailbox(&mut self, id:u32) -> std::result::Result<(),()> // Map a mailbox number to this endpoint. { - if self.query.send(QueryPacket::new(self.id, Query::SetPort(id))).is_ok() { - let recv = self.response.recv(); + if self.query.send(QueryPacket::new(self.id, Query::SetPort(id))).await.is_ok() { + let recv = self.response.recv().await; match recv { - Ok(response) => { + Some(response) => { match response { Response::::Ok => { return Ok(()); } _ => { } @@ -218,13 +213,13 @@ impl Bus { Err(()) } - pub fn mailbox(&self, id:u32) -> Option + pub async fn mailbox(&mut self, id:u32) -> Option // Return the endpoint address mapped to the specified mailbox number. { - if self.query.send(QueryPacket::new(self.id, Query::GetPort(id))).is_ok() { - let recv = self.response.recv(); + if self.query.send(QueryPacket::new(self.id, Query::GetPort(id))).await.is_ok() { + let recv = self.response.recv().await; match recv { - Ok(response) => { + Some(response) => { match response { Response::::GetPort(id) => { return id; } _ => { } @@ -241,8 +236,3 @@ impl Bus { self.id } } -impl Clone for Bus { - fn clone(&self) -> Self { - self.connect().unwrap() - } -} diff --git a/src/manager.rs b/src/manager.rs index 6b09f12..7b2f09d 100644 --- a/src/manager.rs +++ b/src/manager.rs @@ -1,6 +1,5 @@ use std::collections::HashMap; -use std::sync::mpsc; -//use std::sync::mpsc::TryRecvError; +use tokio::sync::mpsc; use pool::Pool; use super::{Bus, Packet, QueryPacket, Query, Response, Mode}; @@ -24,9 +23,9 @@ pub struct Manager where T: 'static + Clone + Send { impl Manager { pub fn new() -> Self { - let (p_sender, p_receiver) = mpsc::channel::>(); - let (q_sender, q_receiver) = mpsc::channel::(); - let (r_sender, r_receiver) = mpsc::channel::>(); + let (p_sender, p_receiver) = mpsc::channel::>(32); + let (q_sender, q_receiver) = mpsc::channel::(32); + let (r_sender, r_receiver) = mpsc::channel::>(32); Self { receiver:p_receiver, @@ -41,14 +40,14 @@ impl Manager { } } -pub fn manager_thread(manager:&mut Manager) where T: 'static + Clone + Send +pub async fn manager_thread(mut manager:Manager) where T: 'static + Clone + Send { // listen for queries until all clients disconnect while manager.senders.size() > 0 { - let query = manager.query.recv(); + let query = manager.query.recv().await; match query { - Ok(packet) => { + Some(packet) => { // process query packet match packet.data { Query::Notify => { @@ -63,11 +62,11 @@ pub fn manager_thread(manager:&mut Manager) where T: 'static + Clone + Sen match dest { Some(sender) => { match &sender.packet { - Some(transmitter) => { transmitter.send(Clone::clone(&packet)).ok(); } + Some(transmitter) => { transmitter.send(Clone::clone(&packet)).await.ok(); } None => { } } match &sender.signal { - Some(signal) => { signal.send(()).ok(); } + Some(signal) => { signal.send(()).await.ok(); } None => { } } } @@ -81,11 +80,11 @@ pub fn manager_thread(manager:&mut Manager) where T: 'static + Clone + Sen match dest { Some(sender) => { match &sender.packet { - Some(transmitter) => { transmitter.send(Clone::clone(&packet)).ok(); } + Some(transmitter) => { transmitter.send(Clone::clone(&packet)).await.ok(); } None => { } } match &sender.signal { - Some(signal) => { signal.send(()).ok(); } + Some(signal) => { signal.send(()).await.ok(); } None => { } } } @@ -117,9 +116,9 @@ pub fn manager_thread(manager:&mut Manager) where T: 'static + Clone + Sen } let sender = manager.sender.clone(); - let (c_sender, c_receiver) = mpsc::channel::>(); + let (c_sender, c_receiver) = mpsc::channel::>(32); let cq_sender = manager.query_sender.clone(); - let (cr_sender, cr_receiver) = mpsc::channel::>(); + let (cr_sender, cr_receiver) = mpsc::channel::>(32); let id = manager.senders.add(Client { packet:if receiver { Some(c_sender) } else { None }, @@ -137,7 +136,7 @@ pub fn manager_thread(manager:&mut Manager) where T: 'static + Clone + Sen let client = manager.senders.get(packet.from as usize); match client { - Some(client) => { client.query.send(Response::Connect(bus)).ok(); } + Some(client) => { client.query.send(Response::Connect(bus)).await.ok(); } _ => { } } } @@ -147,7 +146,7 @@ pub fn manager_thread(manager:&mut Manager) where T: 'static + Clone + Sen match client { Some(client) => { client.signal = signal; - client.query.send(Response::Ok).ok(); + client.query.send(Response::Ok).await.ok(); } _ => { } } @@ -157,7 +156,7 @@ pub fn manager_thread(manager:&mut Manager) where T: 'static + Clone + Sen // return a list of valid clients let client = manager.senders.get(packet.from as usize); match client { - Some(client) => { client.query.send(Response::List(manager.senders.list())).ok(); } + Some(client) => { client.query.send(Response::List(manager.senders.list())).await.ok(); } _ => { } } } @@ -168,7 +167,7 @@ pub fn manager_thread(manager:&mut Manager) where T: 'static + Clone + Sen match client { Some(client) => { manager.ports.insert(id, packet.from); - client.query.send(Response::Ok).ok(); + client.query.send(Response::Ok).await.ok(); } _ => { } } @@ -183,16 +182,14 @@ pub fn manager_thread(manager:&mut Manager) where T: 'static + Clone + Sen Some(id) => Some(*id), None => None }; - client.query.send(Response::GetPort(port)).ok(); + client.query.send(Response::GetPort(port)).await.ok(); } _ => { } } } } } - Err(_) => { - - } + None => { } } } } diff --git a/src/packet.rs b/src/packet.rs index a1ba4f4..db1934d 100644 --- a/src/packet.rs +++ b/src/packet.rs @@ -1,4 +1,4 @@ -use std::sync::mpsc; +use tokio::sync::mpsc; use super::Bus; #[derive(Clone, Copy)] diff --git a/src/signal.rs b/src/signal.rs index c855e0d..3d0d149 100644 --- a/src/signal.rs +++ b/src/signal.rs @@ -1,4 +1,4 @@ -use std::sync::mpsc; +use tokio::sync::mpsc; pub struct Signal { receiver:mpsc::Receiver<()>, @@ -7,16 +7,16 @@ pub struct Signal { impl Signal { pub fn new() -> Self { - let (sender, receiver) = mpsc::channel::<()>(); + let (sender, receiver) = mpsc::channel::<()>(32); Self { receiver:receiver, sender:sender, } } - pub fn wait(&self) + pub async fn wait(&mut self) { - self.receiver.recv().ok(); + self.receiver.recv().await; } pub fn register(&self) -> mpsc::Sender<()>