Initialize repository.
This commit is contained in:
commit
7f4159f3fd
2
.gitignore
vendored
Normal file
2
.gitignore
vendored
Normal file
@ -0,0 +1,2 @@
|
||||
/target
|
||||
/Cargo.lock
|
11
Cargo.toml
Normal file
11
Cargo.toml
Normal file
@ -0,0 +1,11 @@
|
||||
[package]
|
||||
name = "bus"
|
||||
version = "0.1.0"
|
||||
edition = "2021"
|
||||
readme="README.md"
|
||||
license-file="LICENSE.txt"
|
||||
repository="https://git.yukiri.dev/Util/bus-rs/"
|
||||
publish = false
|
||||
|
||||
[dependencies]
|
||||
pool = { git = "https://git.yukiri.dev/Util/pool-rs.git" }
|
1
LICENSE.txt
Normal file
1
LICENSE.txt
Normal file
@ -0,0 +1 @@
|
||||
https://ykr.info/license/
|
7
project.code-workspace
Normal file
7
project.code-workspace
Normal file
@ -0,0 +1,7 @@
|
||||
{
|
||||
"folders": [
|
||||
{
|
||||
"path": "."
|
||||
}
|
||||
]
|
||||
}
|
243
src/lib.rs
Normal file
243
src/lib.rs
Normal file
@ -0,0 +1,243 @@
|
||||
//use std::sync::mpsc::TryRecvError;
|
||||
use std::sync::mpsc;
|
||||
|
||||
mod packet; use packet::{Packet, QueryPacket}; pub use packet::{Query, Response, Mode};
|
||||
mod signal; pub use signal::Signal;
|
||||
mod manager; use manager::{Manager, Client, manager_thread};
|
||||
|
||||
pub struct Bus<T> where T: 'static + Clone + Send {
|
||||
sender:Option<mpsc::Sender<Packet<T>>>,
|
||||
receiver:Option<mpsc::Receiver<Packet<T>>>,
|
||||
query:mpsc::Sender<QueryPacket>,
|
||||
response:mpsc::Receiver<Response<T>>,
|
||||
id:u32,
|
||||
}
|
||||
impl<T: 'static + Clone + Send> Bus<T> {
|
||||
pub fn new() -> Self
|
||||
{
|
||||
let mut manager = Manager::<T>::new();
|
||||
let cq_sender = manager.query_sender.clone();
|
||||
let (cr_sender, cr_receiver) = mpsc::channel::<Response<T>>();
|
||||
let id = manager.senders.add(Client {
|
||||
packet:None,
|
||||
query:cr_sender,
|
||||
signal:None,
|
||||
});
|
||||
|
||||
std::thread::spawn(move || {
|
||||
let mut manager = manager;
|
||||
manager_thread(&mut manager);
|
||||
});
|
||||
|
||||
Self {
|
||||
sender:None,
|
||||
receiver:None,
|
||||
query:cq_sender,
|
||||
response:cr_receiver,
|
||||
id:id as u32,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn new_as(mode:Mode) -> Self
|
||||
{
|
||||
let transmitter :bool;
|
||||
let receiver :bool;
|
||||
match mode {
|
||||
Mode::Transmitter => {
|
||||
transmitter = true;
|
||||
receiver = false;
|
||||
}
|
||||
Mode::Transceiver => {
|
||||
transmitter = true;
|
||||
receiver = true;
|
||||
}
|
||||
}
|
||||
|
||||
let mut manager = Manager::<T>::new();
|
||||
let sender = manager.sender.clone();
|
||||
let (c_sender, c_receiver) = mpsc::channel::<Packet<T>>();
|
||||
let cq_sender = manager.query_sender.clone();
|
||||
let (cr_sender, cr_receiver) = mpsc::channel::<Response<T>>();
|
||||
let id = manager.senders.add(Client {
|
||||
packet:if transmitter { Some(c_sender) } else { None },
|
||||
query:cr_sender,
|
||||
signal:None,
|
||||
});
|
||||
|
||||
std::thread::spawn(move || {
|
||||
let mut manager = manager;
|
||||
manager_thread(&mut manager);
|
||||
});
|
||||
|
||||
Self {
|
||||
sender:if transmitter { Some(sender) } else { None },
|
||||
receiver:if receiver { Some(c_receiver) } else { None },
|
||||
query:cq_sender,
|
||||
response:cr_receiver,
|
||||
id:id as u32,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn connect(&self) -> Result<Bus<T>,()>
|
||||
{
|
||||
self.connect_as(Mode::Transceiver)
|
||||
}
|
||||
|
||||
pub fn connect_as(&self, mode:Mode) -> Result<Bus<T>,()>
|
||||
{
|
||||
if self.query.send(QueryPacket::new(self.id, Query::Connect(mode))).is_ok() {
|
||||
let recv = self.response.recv();
|
||||
match recv {
|
||||
Ok(response) => {
|
||||
match response {
|
||||
Response::<T>::Connect(bus) => { return Ok(bus); }
|
||||
_ => { }
|
||||
}
|
||||
}
|
||||
_ => { }
|
||||
}
|
||||
}
|
||||
Err(())
|
||||
}
|
||||
|
||||
pub fn disconnect(&self)
|
||||
// Remove the socket from the bus.
|
||||
{
|
||||
|
||||
}
|
||||
|
||||
pub fn signal(&self, signal:Option<&Signal>) -> std::result::Result<(),()>
|
||||
{
|
||||
let sender :Option<mpsc::Sender<()>> = match signal {
|
||||
Some(signal) => Some(signal.register()),
|
||||
None => None,
|
||||
};
|
||||
if self.query.send(QueryPacket::new(self.id, Query::Signal(sender))).is_ok() {
|
||||
let recv = self.response.recv();
|
||||
match recv {
|
||||
Ok(response) => {
|
||||
match response {
|
||||
Response::<T>::Ok => { return Ok(()); }
|
||||
_ => { }
|
||||
}
|
||||
}
|
||||
_ => { }
|
||||
}
|
||||
}
|
||||
Err(())
|
||||
}
|
||||
|
||||
pub fn send_from(&self, from:u32, to:u32, data:T) -> std::result::Result<(),()>
|
||||
// Send a packet to the bus.
|
||||
{
|
||||
match &self.sender {
|
||||
Some(transmitter) => {
|
||||
if transmitter.send(Packet::<T>::new(from, to, data)).is_ok() {
|
||||
if self.query.send(QueryPacket::new(from, Query::Notify)).is_ok() {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
None => { }
|
||||
}
|
||||
Err(())
|
||||
}
|
||||
|
||||
pub fn send(&self, to:u32, data:T) -> std::result::Result<(),()>
|
||||
// Send a packet to the bus.
|
||||
{
|
||||
return self.send_from(self.id, to, data);
|
||||
}
|
||||
|
||||
pub fn receive(&self) -> Option<Packet<T>>
|
||||
// Return the next packet in the queue.
|
||||
{
|
||||
return match &self.receiver {
|
||||
Some(receiver) => {
|
||||
let result = receiver.try_recv();
|
||||
match result {
|
||||
Ok(result) => Some(result),
|
||||
Err(_) => None,
|
||||
}
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn receive_wait(&self) -> Option<Packet<T>>
|
||||
// Return the next packet in the queue.
|
||||
{
|
||||
return match &self.receiver {
|
||||
Some(receiver) => {
|
||||
let result = receiver.recv();
|
||||
match result {
|
||||
Ok(result) => Some(result),
|
||||
Err(_) => None,
|
||||
}
|
||||
}
|
||||
None => None,
|
||||
};
|
||||
}
|
||||
|
||||
pub fn receive_all(&self) -> Vec<Packet<T>>
|
||||
// Return all packets in the queue.
|
||||
{
|
||||
let mut results :Vec<Packet<T>> = vec![];
|
||||
match &self.receiver {
|
||||
Some(receiver) => {
|
||||
loop {
|
||||
let result = receiver.try_recv();
|
||||
match result {
|
||||
Ok(packet) => {
|
||||
results.push(packet);
|
||||
}
|
||||
Err(_) => { break; }
|
||||
}
|
||||
}
|
||||
}
|
||||
None => { },
|
||||
}
|
||||
return results;
|
||||
}
|
||||
|
||||
pub fn set_mailbox(&self, id:u32) -> std::result::Result<(),()>
|
||||
// Map a mailbox number to this endpoint.
|
||||
{
|
||||
if self.query.send(QueryPacket::new(self.id, Query::SetPort(id))).is_ok() {
|
||||
let recv = self.response.recv();
|
||||
match recv {
|
||||
Ok(response) => {
|
||||
match response {
|
||||
Response::<T>::Ok => { return Ok(()); }
|
||||
_ => { }
|
||||
}
|
||||
}
|
||||
_ => { }
|
||||
}
|
||||
}
|
||||
Err(())
|
||||
}
|
||||
|
||||
pub fn mailbox(&self, id:u32) -> Option<u32>
|
||||
// Return the endpoint address mapped to the specified mailbox number.
|
||||
{
|
||||
if self.query.send(QueryPacket::new(self.id, Query::GetPort(id))).is_ok() {
|
||||
let recv = self.response.recv();
|
||||
match recv {
|
||||
Ok(response) => {
|
||||
match response {
|
||||
Response::<T>::GetPort(id) => { return id; }
|
||||
_ => { }
|
||||
}
|
||||
}
|
||||
_ => { }
|
||||
}
|
||||
}
|
||||
None
|
||||
}
|
||||
|
||||
pub fn id(&self) -> u32
|
||||
{
|
||||
self.id
|
||||
}
|
||||
}
|
198
src/manager.rs
Normal file
198
src/manager.rs
Normal file
@ -0,0 +1,198 @@
|
||||
use std::collections::HashMap;
|
||||
use std::sync::mpsc;
|
||||
//use std::sync::mpsc::TryRecvError;
|
||||
|
||||
use pool::Pool;
|
||||
use super::{Bus, Packet, QueryPacket, Query, Response, Mode};
|
||||
|
||||
pub struct Client<T> where T: 'static + Clone + Send {
|
||||
pub packet:Option<mpsc::Sender<Packet<T>>>,
|
||||
pub query:mpsc::Sender<Response<T>>,
|
||||
pub signal:Option<mpsc::Sender<()>>,
|
||||
}
|
||||
|
||||
pub struct Manager<T> where T: 'static + Clone + Send {
|
||||
pub receiver:mpsc::Receiver<Packet<T>>,
|
||||
pub sender:mpsc::Sender<Packet<T>>,
|
||||
pub senders:Pool<Client<T>>,
|
||||
pub query:mpsc::Receiver<QueryPacket>,
|
||||
pub query_sender:mpsc::Sender<QueryPacket>,
|
||||
pub response:mpsc::Receiver<Response<T>>,
|
||||
pub response_sender:mpsc::Sender<Response<T>>,
|
||||
pub ports:HashMap<u32, u32>,
|
||||
}
|
||||
impl<T: 'static + Clone + Send> Manager<T> {
|
||||
pub fn new() -> Self
|
||||
{
|
||||
let (p_sender, p_receiver) = mpsc::channel::<Packet<T>>();
|
||||
let (q_sender, q_receiver) = mpsc::channel::<QueryPacket>();
|
||||
let (r_sender, r_receiver) = mpsc::channel::<Response<T>>();
|
||||
|
||||
Self {
|
||||
receiver:p_receiver,
|
||||
sender:p_sender,
|
||||
senders:Pool::<Client<T>>::new(),
|
||||
query:q_receiver,
|
||||
query_sender:q_sender,
|
||||
response:r_receiver,
|
||||
response_sender:r_sender,
|
||||
ports:HashMap::<u32,u32>::new(),
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub fn manager_thread<T>(manager:&mut Manager<T>) where T: 'static + Clone + Send
|
||||
{
|
||||
// listen for queries until all clients disconnect
|
||||
while manager.senders.size() > 0 {
|
||||
let query = manager.query.recv();
|
||||
|
||||
match query {
|
||||
Ok(packet) => {
|
||||
// process query packet
|
||||
match packet.data {
|
||||
Query::Notify => {
|
||||
// receive and process all packets in the queue
|
||||
loop {
|
||||
let recv = manager.receiver.try_recv();
|
||||
match recv {
|
||||
Ok(packet) => {
|
||||
// forward packet to receiver if exists
|
||||
if packet.to != 0 {
|
||||
let dest = manager.senders.get(packet.to as usize);
|
||||
match dest {
|
||||
Some(sender) => {
|
||||
match &sender.packet {
|
||||
Some(transmitter) => { transmitter.send(Clone::clone(&packet)).ok(); }
|
||||
None => { }
|
||||
}
|
||||
match &sender.signal {
|
||||
Some(signal) => { signal.send(()).ok(); }
|
||||
None => { }
|
||||
}
|
||||
}
|
||||
None => { }
|
||||
}
|
||||
}
|
||||
else {
|
||||
for to in manager.senders.list() {
|
||||
if to != packet.from as usize {
|
||||
let dest = manager.senders.get(to);
|
||||
match dest {
|
||||
Some(sender) => {
|
||||
match &sender.packet {
|
||||
Some(transmitter) => { transmitter.send(Clone::clone(&packet)).ok(); }
|
||||
None => { }
|
||||
}
|
||||
match &sender.signal {
|
||||
Some(signal) => { signal.send(()).ok(); }
|
||||
None => { }
|
||||
}
|
||||
}
|
||||
None => { }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => { break; }
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
Query::Connect(mode) => {
|
||||
// return a new bus endpoint
|
||||
|
||||
let transmitter :bool;
|
||||
let receiver :bool;
|
||||
match mode {
|
||||
Mode::Transmitter => {
|
||||
transmitter = true;
|
||||
receiver = false;
|
||||
}
|
||||
Mode::Transceiver => {
|
||||
transmitter = true;
|
||||
receiver = true;
|
||||
}
|
||||
}
|
||||
|
||||
let sender = manager.sender.clone();
|
||||
let (c_sender, c_receiver) = mpsc::channel::<Packet<T>>();
|
||||
let cq_sender = manager.query_sender.clone();
|
||||
let (cr_sender, cr_receiver) = mpsc::channel::<Response<T>>();
|
||||
|
||||
let id = manager.senders.add(Client {
|
||||
packet:if receiver { Some(c_sender) } else { None },
|
||||
query:cr_sender,
|
||||
signal:None,
|
||||
});
|
||||
|
||||
let bus = Bus::<T> {
|
||||
sender:if transmitter { Some(sender) } else { None },
|
||||
receiver:if receiver { Some(c_receiver) } else { None },
|
||||
query:cq_sender,
|
||||
response:cr_receiver,
|
||||
id:id as u32,
|
||||
};
|
||||
|
||||
let client = manager.senders.get(packet.from as usize);
|
||||
match client {
|
||||
Some(client) => { client.query.send(Response::Connect(bus)).ok(); }
|
||||
_ => { }
|
||||
}
|
||||
}
|
||||
|
||||
Query::Signal(signal) => {
|
||||
let client = manager.senders.get_mut(packet.from as usize);
|
||||
match client {
|
||||
Some(client) => {
|
||||
client.signal = signal;
|
||||
client.query.send(Response::Ok).ok();
|
||||
}
|
||||
_ => { }
|
||||
}
|
||||
}
|
||||
|
||||
Query::List => {
|
||||
// return a list of valid clients
|
||||
let client = manager.senders.get(packet.from as usize);
|
||||
match client {
|
||||
Some(client) => { client.query.send(Response::List(manager.senders.list())).ok(); }
|
||||
_ => { }
|
||||
}
|
||||
}
|
||||
|
||||
Query::SetPort(id) => {
|
||||
// return a list of valid clients
|
||||
let client = manager.senders.get(packet.from as usize);
|
||||
match client {
|
||||
Some(client) => {
|
||||
manager.ports.insert(id, packet.from);
|
||||
client.query.send(Response::Ok).ok();
|
||||
}
|
||||
_ => { }
|
||||
}
|
||||
}
|
||||
|
||||
Query::GetPort(id) => {
|
||||
// return
|
||||
let client = manager.senders.get(packet.from as usize);
|
||||
match client {
|
||||
Some(client) => {
|
||||
let port = match manager.ports.get(&id) {
|
||||
Some(id) => Some(*id),
|
||||
None => None
|
||||
};
|
||||
client.query.send(Response::GetPort(port)).ok();
|
||||
}
|
||||
_ => { }
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
Err(_) => {
|
||||
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
56
src/packet.rs
Normal file
56
src/packet.rs
Normal file
@ -0,0 +1,56 @@
|
||||
use std::sync::mpsc;
|
||||
use super::Bus;
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub enum Mode {
|
||||
Transmitter,
|
||||
Transceiver,
|
||||
}
|
||||
|
||||
#[derive(Clone, Copy)]
|
||||
pub struct Packet<T> where T:Clone {
|
||||
pub from:u32,
|
||||
pub to:u32,
|
||||
pub data:T,
|
||||
}
|
||||
impl<T:Clone> Packet<T> {
|
||||
pub fn new(from:u32, to:u32, data:T) -> Self
|
||||
{
|
||||
Self {
|
||||
from:from,
|
||||
to:to,
|
||||
data:data,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
pub enum Query {
|
||||
Notify,
|
||||
List,
|
||||
Connect(Mode),
|
||||
Signal(Option<mpsc::Sender<()>>),
|
||||
SetPort(u32),
|
||||
GetPort(u32),
|
||||
}
|
||||
|
||||
pub enum Response<T> where T: 'static + Clone + Send {
|
||||
Ok,
|
||||
Error,
|
||||
List(Vec<usize>),
|
||||
Connect(Bus<T>),
|
||||
GetPort(Option<u32>),
|
||||
}
|
||||
|
||||
pub struct QueryPacket {
|
||||
pub from:u32,
|
||||
pub data:Query,
|
||||
}
|
||||
impl QueryPacket {
|
||||
pub fn new(from:u32, data:Query) -> Self
|
||||
{
|
||||
Self {
|
||||
from:from,
|
||||
data:data,
|
||||
}
|
||||
}
|
||||
}
|
26
src/signal.rs
Normal file
26
src/signal.rs
Normal file
@ -0,0 +1,26 @@
|
||||
use std::sync::mpsc;
|
||||
|
||||
pub struct Signal {
|
||||
receiver:mpsc::Receiver<()>,
|
||||
sender:mpsc::Sender<()>,
|
||||
}
|
||||
impl Signal {
|
||||
pub fn new() -> Self
|
||||
{
|
||||
let (sender, receiver) = mpsc::channel::<()>();
|
||||
Self {
|
||||
receiver:receiver,
|
||||
sender:sender,
|
||||
}
|
||||
}
|
||||
|
||||
pub fn wait(&self)
|
||||
{
|
||||
self.receiver.recv().ok();
|
||||
}
|
||||
|
||||
pub fn register(&self) -> mpsc::Sender<()>
|
||||
{
|
||||
self.sender.clone()
|
||||
}
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user