Update channels to use tokio async.

This commit is contained in:
yukirij 2024-11-08 23:38:58 -08:00
parent 6aa8f4ac88
commit 01a4a6d341
5 changed files with 66 additions and 77 deletions

View File

@ -8,4 +8,6 @@ repository="https://git.yukiri.dev/Utility/bus-rs/"
publish = false publish = false
[dependencies] [dependencies]
tokio = { version = "1.41.1", features = ["full"] }
pool = { git = "https://git.tsukiyo.org/Utility/pool" } pool = { git = "https://git.tsukiyo.org/Utility/pool" }

View File

@ -1,10 +1,11 @@
//use std::sync::mpsc::TryRecvError; use tokio::sync::mpsc;
use std::sync::mpsc;
mod packet; use packet::{Packet, QueryPacket}; pub use packet::{Query, Response, Mode}; mod packet; use packet::{Packet, QueryPacket}; pub use packet::{Query, Response, Mode};
mod signal; pub use signal::Signal; mod signal; pub use signal::Signal;
mod manager; use manager::{Manager, Client, manager_thread}; mod manager; use manager::{Manager, Client, manager_thread};
const BUFFER_SIZE :usize = 32;
pub struct Bus<T> where T: 'static + Clone + Send { pub struct Bus<T> where T: 'static + Clone + Send {
sender:Option<mpsc::Sender<Packet<T>>>, sender:Option<mpsc::Sender<Packet<T>>>,
receiver:Option<mpsc::Receiver<Packet<T>>>, receiver:Option<mpsc::Receiver<Packet<T>>>,
@ -17,17 +18,14 @@ impl<T: 'static + Clone + Send> Bus<T> {
{ {
let mut manager = Manager::<T>::new(); let mut manager = Manager::<T>::new();
let cq_sender = manager.query_sender.clone(); let cq_sender = manager.query_sender.clone();
let (cr_sender, cr_receiver) = mpsc::channel::<Response<T>>(); let (cr_sender, cr_receiver) = mpsc::channel::<Response<T>>(BUFFER_SIZE);
let id = manager.senders.add(Client { let id = manager.senders.add(Client {
packet:None, packet:None,
query:cr_sender, query:cr_sender,
signal:None, signal:None,
}); });
std::thread::spawn(move || { tokio::spawn(manager_thread(manager));
let mut manager = manager;
manager_thread(&mut manager);
});
Self { Self {
sender:None, sender:None,
@ -55,19 +53,16 @@ impl<T: 'static + Clone + Send> Bus<T> {
let mut manager = Manager::<T>::new(); let mut manager = Manager::<T>::new();
let sender = manager.sender.clone(); let sender = manager.sender.clone();
let (c_sender, c_receiver) = mpsc::channel::<Packet<T>>(); let (c_sender, c_receiver) = mpsc::channel::<Packet<T>>(BUFFER_SIZE);
let cq_sender = manager.query_sender.clone(); let cq_sender = manager.query_sender.clone();
let (cr_sender, cr_receiver) = mpsc::channel::<Response<T>>(); let (cr_sender, cr_receiver) = mpsc::channel::<Response<T>>(BUFFER_SIZE);
let id = manager.senders.add(Client { let id = manager.senders.add(Client {
packet:if transmitter { Some(c_sender) } else { None }, packet:if transmitter { Some(c_sender) } else { None },
query:cr_sender, query:cr_sender,
signal:None, signal:None,
}); });
std::thread::spawn(move || { tokio::spawn(manager_thread(manager));
let mut manager = manager;
manager_thread(&mut manager);
});
Self { Self {
sender:if transmitter { Some(sender) } else { None }, sender:if transmitter { Some(sender) } else { None },
@ -78,17 +73,17 @@ impl<T: 'static + Clone + Send> Bus<T> {
} }
} }
pub fn connect(&self) -> Result<Bus<T>,()> pub async fn connect(&mut self) -> Result<Bus<T>,()>
{ {
self.connect_as(Mode::Transceiver) self.connect_as(Mode::Transceiver).await
} }
pub fn connect_as(&self, mode:Mode) -> Result<Bus<T>,()> pub async fn connect_as(&mut self, mode:Mode) -> Result<Bus<T>,()>
{ {
if self.query.send(QueryPacket::new(self.id, Query::Connect(mode))).is_ok() { if self.query.send(QueryPacket::new(self.id, Query::Connect(mode))).await.is_ok() {
let recv = self.response.recv(); let recv = self.response.recv().await;
match recv { match recv {
Ok(response) => { Some(response) => {
match response { match response {
Response::<T>::Connect(bus) => { return Ok(bus); } Response::<T>::Connect(bus) => { return Ok(bus); }
_ => { } _ => { }
@ -106,16 +101,16 @@ impl<T: 'static + Clone + Send> Bus<T> {
} }
pub fn signal(&self, signal:Option<&Signal>) -> std::result::Result<(),()> pub async fn signal(&mut self, signal:Option<&Signal>) -> std::result::Result<(),()>
{ {
let sender :Option<mpsc::Sender<()>> = match signal { let sender :Option<mpsc::Sender<()>> = match signal {
Some(signal) => Some(signal.register()), Some(signal) => Some(signal.register()),
None => None, None => None,
}; };
if self.query.send(QueryPacket::new(self.id, Query::Signal(sender))).is_ok() { if self.query.send(QueryPacket::new(self.id, Query::Signal(sender))).await.is_ok() {
let recv = self.response.recv(); let recv = self.response.recv().await;
match recv { match recv {
Ok(response) => { Some(response) => {
match response { match response {
Response::<T>::Ok => { return Ok(()); } Response::<T>::Ok => { return Ok(()); }
_ => { } _ => { }
@ -127,13 +122,13 @@ impl<T: 'static + Clone + Send> Bus<T> {
Err(()) Err(())
} }
pub fn send_from(&self, from:u32, to:u32, data:T) -> std::result::Result<(),()> pub async fn send_from(&self, from:u32, to:u32, data:T) -> std::result::Result<(),()>
// Send a packet to the bus. // Send a packet to the bus.
{ {
match &self.sender { match &self.sender {
Some(transmitter) => { Some(transmitter) => {
if transmitter.send(Packet::<T>::new(from, to, data)).is_ok() { if transmitter.send(Packet::<T>::new(from, to, data)).await.is_ok() {
if self.query.send(QueryPacket::new(from, Query::Notify)).is_ok() { if self.query.send(QueryPacket::new(from, Query::Notify)).await.is_ok() {
return Ok(()); return Ok(());
} }
} }
@ -143,16 +138,16 @@ impl<T: 'static + Clone + Send> Bus<T> {
Err(()) Err(())
} }
pub fn send(&self, to:u32, data:T) -> std::result::Result<(),()> pub async fn send(&self, to:u32, data:T) -> std::result::Result<(),()>
// Send a packet to the bus. // Send a packet to the bus.
{ {
return self.send_from(self.id, to, data); return self.send_from(self.id, to, data).await;
} }
pub fn receive(&self) -> Option<Packet<T>> pub fn receive(&mut self) -> Option<Packet<T>>
// Return the next packet in the queue. // Return the next packet in the queue.
{ {
return match &self.receiver { return match &mut self.receiver {
Some(receiver) => { Some(receiver) => {
let result = receiver.try_recv(); let result = receiver.try_recv();
match result { match result {
@ -164,26 +159,26 @@ impl<T: 'static + Clone + Send> Bus<T> {
}; };
} }
pub fn receive_wait(&self) -> Option<Packet<T>> pub async fn receive_wait(&mut self) -> Option<Packet<T>>
// Return the next packet in the queue. // Return the next packet in the queue.
{ {
return match &self.receiver { return match &mut self.receiver {
Some(receiver) => { Some(receiver) => {
let result = receiver.recv(); let result = receiver.recv().await;
match result { match result {
Ok(result) => Some(result), Some(result) => Some(result),
Err(_) => None, _ => None,
} }
} }
None => None, None => None,
}; };
} }
pub fn receive_all(&self) -> Vec<Packet<T>> pub fn receive_all(&mut self) -> Vec<Packet<T>>
// Return all packets in the queue. // Return all packets in the queue.
{ {
let mut results :Vec<Packet<T>> = vec![]; let mut results :Vec<Packet<T>> = vec![];
match &self.receiver { match &mut self.receiver {
Some(receiver) => { Some(receiver) => {
loop { loop {
let result = receiver.try_recv(); let result = receiver.try_recv();
@ -200,13 +195,13 @@ impl<T: 'static + Clone + Send> Bus<T> {
return results; return results;
} }
pub fn set_mailbox(&self, id:u32) -> std::result::Result<(),()> pub async fn set_mailbox(&mut self, id:u32) -> std::result::Result<(),()>
// Map a mailbox number to this endpoint. // Map a mailbox number to this endpoint.
{ {
if self.query.send(QueryPacket::new(self.id, Query::SetPort(id))).is_ok() { if self.query.send(QueryPacket::new(self.id, Query::SetPort(id))).await.is_ok() {
let recv = self.response.recv(); let recv = self.response.recv().await;
match recv { match recv {
Ok(response) => { Some(response) => {
match response { match response {
Response::<T>::Ok => { return Ok(()); } Response::<T>::Ok => { return Ok(()); }
_ => { } _ => { }
@ -218,13 +213,13 @@ impl<T: 'static + Clone + Send> Bus<T> {
Err(()) Err(())
} }
pub fn mailbox(&self, id:u32) -> Option<u32> pub async fn mailbox(&mut self, id:u32) -> Option<u32>
// Return the endpoint address mapped to the specified mailbox number. // Return the endpoint address mapped to the specified mailbox number.
{ {
if self.query.send(QueryPacket::new(self.id, Query::GetPort(id))).is_ok() { if self.query.send(QueryPacket::new(self.id, Query::GetPort(id))).await.is_ok() {
let recv = self.response.recv(); let recv = self.response.recv().await;
match recv { match recv {
Ok(response) => { Some(response) => {
match response { match response {
Response::<T>::GetPort(id) => { return id; } Response::<T>::GetPort(id) => { return id; }
_ => { } _ => { }
@ -241,8 +236,3 @@ impl<T: 'static + Clone + Send> Bus<T> {
self.id self.id
} }
} }
impl<T: 'static + Clone + Send> Clone for Bus<T> {
fn clone(&self) -> Self {
self.connect().unwrap()
}
}

View File

@ -1,6 +1,5 @@
use std::collections::HashMap; use std::collections::HashMap;
use std::sync::mpsc; use tokio::sync::mpsc;
//use std::sync::mpsc::TryRecvError;
use pool::Pool; use pool::Pool;
use super::{Bus, Packet, QueryPacket, Query, Response, Mode}; use super::{Bus, Packet, QueryPacket, Query, Response, Mode};
@ -24,9 +23,9 @@ pub struct Manager<T> where T: 'static + Clone + Send {
impl<T: 'static + Clone + Send> Manager<T> { impl<T: 'static + Clone + Send> Manager<T> {
pub fn new() -> Self pub fn new() -> Self
{ {
let (p_sender, p_receiver) = mpsc::channel::<Packet<T>>(); let (p_sender, p_receiver) = mpsc::channel::<Packet<T>>(32);
let (q_sender, q_receiver) = mpsc::channel::<QueryPacket>(); let (q_sender, q_receiver) = mpsc::channel::<QueryPacket>(32);
let (r_sender, r_receiver) = mpsc::channel::<Response<T>>(); let (r_sender, r_receiver) = mpsc::channel::<Response<T>>(32);
Self { Self {
receiver:p_receiver, receiver:p_receiver,
@ -41,14 +40,14 @@ impl<T: 'static + Clone + Send> Manager<T> {
} }
} }
pub fn manager_thread<T>(manager:&mut Manager<T>) where T: 'static + Clone + Send pub async fn manager_thread<T>(mut manager:Manager<T>) where T: 'static + Clone + Send
{ {
// listen for queries until all clients disconnect // listen for queries until all clients disconnect
while manager.senders.size() > 0 { while manager.senders.size() > 0 {
let query = manager.query.recv(); let query = manager.query.recv().await;
match query { match query {
Ok(packet) => { Some(packet) => {
// process query packet // process query packet
match packet.data { match packet.data {
Query::Notify => { Query::Notify => {
@ -63,11 +62,11 @@ pub fn manager_thread<T>(manager:&mut Manager<T>) where T: 'static + Clone + Sen
match dest { match dest {
Some(sender) => { Some(sender) => {
match &sender.packet { match &sender.packet {
Some(transmitter) => { transmitter.send(Clone::clone(&packet)).ok(); } Some(transmitter) => { transmitter.send(Clone::clone(&packet)).await.ok(); }
None => { } None => { }
} }
match &sender.signal { match &sender.signal {
Some(signal) => { signal.send(()).ok(); } Some(signal) => { signal.send(()).await.ok(); }
None => { } None => { }
} }
} }
@ -81,11 +80,11 @@ pub fn manager_thread<T>(manager:&mut Manager<T>) where T: 'static + Clone + Sen
match dest { match dest {
Some(sender) => { Some(sender) => {
match &sender.packet { match &sender.packet {
Some(transmitter) => { transmitter.send(Clone::clone(&packet)).ok(); } Some(transmitter) => { transmitter.send(Clone::clone(&packet)).await.ok(); }
None => { } None => { }
} }
match &sender.signal { match &sender.signal {
Some(signal) => { signal.send(()).ok(); } Some(signal) => { signal.send(()).await.ok(); }
None => { } None => { }
} }
} }
@ -117,9 +116,9 @@ pub fn manager_thread<T>(manager:&mut Manager<T>) where T: 'static + Clone + Sen
} }
let sender = manager.sender.clone(); let sender = manager.sender.clone();
let (c_sender, c_receiver) = mpsc::channel::<Packet<T>>(); let (c_sender, c_receiver) = mpsc::channel::<Packet<T>>(32);
let cq_sender = manager.query_sender.clone(); let cq_sender = manager.query_sender.clone();
let (cr_sender, cr_receiver) = mpsc::channel::<Response<T>>(); let (cr_sender, cr_receiver) = mpsc::channel::<Response<T>>(32);
let id = manager.senders.add(Client { let id = manager.senders.add(Client {
packet:if receiver { Some(c_sender) } else { None }, packet:if receiver { Some(c_sender) } else { None },
@ -137,7 +136,7 @@ pub fn manager_thread<T>(manager:&mut Manager<T>) where T: 'static + Clone + Sen
let client = manager.senders.get(packet.from as usize); let client = manager.senders.get(packet.from as usize);
match client { match client {
Some(client) => { client.query.send(Response::Connect(bus)).ok(); } Some(client) => { client.query.send(Response::Connect(bus)).await.ok(); }
_ => { } _ => { }
} }
} }
@ -147,7 +146,7 @@ pub fn manager_thread<T>(manager:&mut Manager<T>) where T: 'static + Clone + Sen
match client { match client {
Some(client) => { Some(client) => {
client.signal = signal; client.signal = signal;
client.query.send(Response::Ok).ok(); client.query.send(Response::Ok).await.ok();
} }
_ => { } _ => { }
} }
@ -157,7 +156,7 @@ pub fn manager_thread<T>(manager:&mut Manager<T>) where T: 'static + Clone + Sen
// return a list of valid clients // return a list of valid clients
let client = manager.senders.get(packet.from as usize); let client = manager.senders.get(packet.from as usize);
match client { match client {
Some(client) => { client.query.send(Response::List(manager.senders.list())).ok(); } Some(client) => { client.query.send(Response::List(manager.senders.list())).await.ok(); }
_ => { } _ => { }
} }
} }
@ -168,7 +167,7 @@ pub fn manager_thread<T>(manager:&mut Manager<T>) where T: 'static + Clone + Sen
match client { match client {
Some(client) => { Some(client) => {
manager.ports.insert(id, packet.from); manager.ports.insert(id, packet.from);
client.query.send(Response::Ok).ok(); client.query.send(Response::Ok).await.ok();
} }
_ => { } _ => { }
} }
@ -183,16 +182,14 @@ pub fn manager_thread<T>(manager:&mut Manager<T>) where T: 'static + Clone + Sen
Some(id) => Some(*id), Some(id) => Some(*id),
None => None None => None
}; };
client.query.send(Response::GetPort(port)).ok(); client.query.send(Response::GetPort(port)).await.ok();
} }
_ => { } _ => { }
} }
} }
} }
} }
Err(_) => { None => { }
}
} }
} }
} }

View File

@ -1,4 +1,4 @@
use std::sync::mpsc; use tokio::sync::mpsc;
use super::Bus; use super::Bus;
#[derive(Clone, Copy)] #[derive(Clone, Copy)]

View File

@ -1,4 +1,4 @@
use std::sync::mpsc; use tokio::sync::mpsc;
pub struct Signal { pub struct Signal {
receiver:mpsc::Receiver<()>, receiver:mpsc::Receiver<()>,
@ -7,16 +7,16 @@ pub struct Signal {
impl Signal { impl Signal {
pub fn new() -> Self pub fn new() -> Self
{ {
let (sender, receiver) = mpsc::channel::<()>(); let (sender, receiver) = mpsc::channel::<()>(32);
Self { Self {
receiver:receiver, receiver:receiver,
sender:sender, sender:sender,
} }
} }
pub fn wait(&self) pub async fn wait(&mut self)
{ {
self.receiver.recv().ok(); self.receiver.recv().await;
} }
pub fn register(&self) -> mpsc::Sender<()> pub fn register(&self) -> mpsc::Sender<()>