diff --git a/server/Cargo.toml b/server/Cargo.toml index 38241f1..538e0df 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -26,7 +26,6 @@ regex = "1.11.0" game = { path = "../game" } -bus = { git = "https://git.tsukiyo.org/Utility/bus", branch = "tokio" } sparse = { git = "https://git.tsukiyo.org/Utility/sparse" } trie = { git = "https://git.tsukiyo.org/Utility/trie" } pool = { git = "https://git.tsukiyo.org/Utility/pool" } diff --git a/server/src/app/connection.rs b/server/src/app/connection.rs index bc6b94b..80887bb 100644 --- a/server/src/app/connection.rs +++ b/server/src/app/connection.rs @@ -14,7 +14,6 @@ type StreamType = Arc>, Messa #[derive(Clone)] pub struct Connection { - pub bus:u32, pub stream:StreamType, pub auth:Option, diff --git a/server/src/app/mod.rs b/server/src/app/mod.rs index 360d556..c70128d 100644 --- a/server/src/app/mod.rs +++ b/server/src/app/mod.rs @@ -160,102 +160,105 @@ impl App { use tokio_tungstenite::tungstenite::Message; use futures::SinkExt; - if match response.data { QRPacketData::None => false, _ => true } { - if let Some(conn) = self.connections.get(response.id as usize) { - let mut socket = conn.stream.write().await; + match response.data { + QRPacketData::None => { } + _ => { + if let Some(conn) = self.connections.get(response.id as usize) { + let mut socket = conn.stream.write().await; - match response.data { - QRPacketData::RHello(response) => { - socket.send(Message::Binary( - encode_response(CODE_HELLO, response.encode()) - )).await.ok(); + match response.data { + QRPacketData::RHello(response) => { + socket.send(Message::Binary( + encode_response(CODE_HELLO, response.encode()) + )).await.ok(); + } + + QRPacketData::RRegister(response) => { + socket.send(Message::Binary( + encode_response(CODE_REGISTER, response.encode()) + )).await.ok(); + } + + QRPacketData::RAuth(response) => { + socket.send(Message::Binary( + encode_response(CODE_AUTH, response.encode()) + )).await.ok(); + } + + QRPacketData::RAuthResume(response) => { + socket.send(Message::Binary( + encode_response(CODE_AUTH_RESUME, response.encode()) + )).await.ok(); + } + + QRPacketData::RSessionList(response) => { + socket.send(Message::Binary( + encode_response(CODE_SESSION_LIST, response.encode()) + )).await.ok(); + } + + QRPacketData::RSessionView(response) => { + socket.send(Message::Binary( + encode_response(CODE_SESSION_VIEW, response.encode()) + )).await.ok(); + } + + QRPacketData::RGameState(response) => { + socket.send(Message::Binary( + encode_response(CODE_GAME_STATE, response.encode()) + )).await.ok(); + } + + QRPacketData::GameMessage(response) => { + socket.send(Message::Binary( + encode_response(CODE_GAME_MESSAGE, response.encode()) + )).await.ok(); + } + + QRPacketData::RChallengeAnswer(response) => { + socket.send(Message::Binary( + encode_response(CODE_CHALLENGE_ANSWER, response.encode()) + )).await.ok(); + } + + QRPacketData::RChallengeList(response) => { + socket.send(Message::Binary( + encode_response(CODE_CHALLENGE_LIST, response.encode()) + )).await.ok(); + } + + QRPacketData::RUserList(response) => { + socket.send(Message::Binary( + encode_response(CODE_USER_LIST, response.encode()) + )).await.ok(); + } + + QRPacketData::RUserProfile(response) => { + socket.send(Message::Binary( + encode_response(CODE_USER_PROFILE, response.encode()) + )).await.ok(); + } + + QRPacketData::RInviteList(response) => { + socket.send(Message::Binary( + encode_response(CODE_INVITE_LIST, response.encode()) + )).await.ok(); + } + + QRPacketData::RInviteAcquire(response) => { + socket.send(Message::Binary( + encode_response(CODE_INVITE_ACQUIRE, response.encode()) + )).await.ok(); + } + + QRPacketData::TestResult(response) => { + socket.send(Message::Binary( + encode_response(CODE_TEST_RESULT, response.encode()) + )).await.ok(); + } + + _ => { } } - - QRPacketData::RRegister(response) => { - socket.send(Message::Binary( - encode_response(CODE_REGISTER, response.encode()) - )).await.ok(); - } - - QRPacketData::RAuth(response) => { - socket.send(Message::Binary( - encode_response(CODE_AUTH, response.encode()) - )).await.ok(); - } - - QRPacketData::RAuthResume(response) => { - socket.send(Message::Binary( - encode_response(CODE_AUTH_RESUME, response.encode()) - )).await.ok(); - } - - QRPacketData::RSessionList(response) => { - socket.send(Message::Binary( - encode_response(CODE_SESSION_LIST, response.encode()) - )).await.ok(); - } - - QRPacketData::RSessionView(response) => { - socket.send(Message::Binary( - encode_response(CODE_SESSION_VIEW, response.encode()) - )).await.ok(); - } - - QRPacketData::RGameState(response) => { - socket.send(Message::Binary( - encode_response(CODE_GAME_STATE, response.encode()) - )).await.ok(); - } - - QRPacketData::GameMessage(response) => { - socket.send(Message::Binary( - encode_response(CODE_GAME_MESSAGE, response.encode()) - )).await.ok(); - } - - QRPacketData::RChallengeAnswer(response) => { - socket.send(Message::Binary( - encode_response(CODE_CHALLENGE_ANSWER, response.encode()) - )).await.ok(); - } - - QRPacketData::RChallengeList(response) => { - socket.send(Message::Binary( - encode_response(CODE_CHALLENGE_LIST, response.encode()) - )).await.ok(); - } - - QRPacketData::RUserList(response) => { - socket.send(Message::Binary( - encode_response(CODE_USER_LIST, response.encode()) - )).await.ok(); - } - - QRPacketData::RUserProfile(response) => { - socket.send(Message::Binary( - encode_response(CODE_USER_PROFILE, response.encode()) - )).await.ok(); - } - - QRPacketData::RInviteList(response) => { - socket.send(Message::Binary( - encode_response(CODE_INVITE_LIST, response.encode()) - )).await.ok(); - } - - QRPacketData::RInviteAcquire(response) => { - socket.send(Message::Binary( - encode_response(CODE_INVITE_ACQUIRE, response.encode()) - )).await.ok(); - } - - QRPacketData::TestResult(response) => { - socket.send(Message::Binary( - encode_response(CODE_TEST_RESULT, response.encode()) - )).await.ok(); - } - - _ => { } } } } diff --git a/server/src/main.rs b/server/src/main.rs index 6f14ee2..509db5c 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -13,6 +13,7 @@ mod manager; use app::App; use hyper::body::Bytes; use hyper_util::rt::TokioIo; +use tokio::sync::mpsc::{self, Sender}; use system::{ cache::WebCache, net::{ @@ -21,16 +22,17 @@ use system::{ tls::*, }, }; +use protocol::QRPacket; struct HttpServiceArgs { - bus:Bus, + tx:Sender, cache:WebCache, } impl HttpServiceArgs { pub async fn clone(&mut self) -> Self { Self { - bus:self.bus.connect().await.unwrap(), + tx:self.tx.clone(), cache:self.cache.clone(), } } @@ -175,18 +177,10 @@ async fn main() /* ** Initialize central bus and data serivce. */ - let mut b_main :Bus = Bus::new_as(bus::Mode::Transmitter); - match b_main.connect().await { - Ok(bus) => { - tokio::spawn(async move { - manager::thread_system(app, bus).await; - }); - } - Err(_) => { - println!("fatal: failed to initialize bus."); - return; - } - } + let (data_tx, data_rx) = mpsc::channel::(64); + tokio::spawn(async move { + manager::thread_system(app, data_rx).await; + }); /* @@ -287,9 +281,10 @@ async fn main() Ok(_) => { let mut b = bus.connect().await.unwrap(); let c = cache.clone(); + let data_tx = data_tx.clone(); tokio::spawn(async move { while tcp_server.accept(handle_tcp, HttpServiceArgs { - bus:b.connect().await.unwrap(), + tx:data_tx, cache:c.clone(), }).await.is_ok() { } }); @@ -310,11 +305,14 @@ async fn main() } match tls_server.bind("0.0.0.0:38612").await { Ok(_) => { + let (tx, rx) = tokio::sync::mpsc::channel::<>(24); let mut b = bus.connect().await.unwrap(); let c = cache.clone(); + + let data_tx = data_tx.clone(); tokio::spawn(async move { while tls_server.accept(handle_tls, HttpServiceArgs { - bus:b.connect().await.unwrap(), + tx:data_tx, cache:c.clone(), }).await.is_ok() { } }); diff --git a/server/src/manager/data.rs b/server/src/manager/data.rs index dd0ddf2..4f44216 100644 --- a/server/src/manager/data.rs +++ b/server/src/manager/data.rs @@ -1,4 +1,4 @@ -use bus::Bus; +use tokio::sync::mpsc::Receiver; use game::history::Play; use crate::{ config, @@ -10,13 +10,12 @@ use crate::{ session::{Session, SessionFilter}, context::{self, Context}, }, - protocol, + protocol::*, }; -pub async fn thread_system(mut app:App, mut bus:Bus) +pub async fn thread_system(mut app:App, mut rx:Receiver) { use futures::SinkExt; - use protocol::*; use ring::rand::{SecureRandom, SystemRandom}; let rng = SystemRandom::new(); @@ -24,7 +23,7 @@ pub async fn thread_system(mut app:App, mut bus:Bus) let mut send_user_status = Vec::::new(); - while let Some(packet) = bus.receive_wait().await { + while let Some(packet) = rx.recv().await { let qr = packet.data; let mut user_id = None; @@ -40,9 +39,11 @@ pub async fn thread_system(mut app:App, mut bus:Bus) } match match qr.data { - QRPacketData::QConn(request) => { + QRPacketData::QConn { + tx, + stream, + } => { let id = app.connections.add(Connection { - bus: request.bus_id, stream: request.stream, auth: None, @@ -58,8 +59,7 @@ pub async fn thread_system(mut app:App, mut bus:Bus) app.log.log(&format!("Connect: {}", id)); - bus.send( - packet.from, + tx.send( QRPacket::new(id as u32, QRPacketData::RConn) ).await.ok(); diff --git a/server/src/manager/ws.rs b/server/src/manager/ws.rs index 1f7c777..fb88bab 100644 --- a/server/src/manager/ws.rs +++ b/server/src/manager/ws.rs @@ -1,5 +1,5 @@ use std::sync::Arc; -use tokio::sync::RwLock; +use tokio::sync::{RwLock, mpsc}; use hyper::upgrade::Upgraded; use hyper_util::rt::TokioIo; use tokio_tungstenite::WebSocketStream; @@ -21,28 +21,30 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi let conn_id :u32; let (sink, mut stream) = ws.split(); - - let bus_ds = args.bus.mailbox(1).await.unwrap_or(1); + let (tx, rx) = mpsc::channel::(1); // Perform connection handshake with data system. // - Provide system with connection/bus pairing. // - Acquire connection id. // - args.bus.send(bus_ds, QRPacket::new(0, QRPacketData::QConn(LocalPacketConnect { - bus_id:args.bus.id(), + args.tx.send(QRPacket::new(0, QRPacketData::QConn { + tx:tx.clone(), stream:Arc::new(RwLock::new(sink)), - }))).await?; - match args.bus.receive_wait().await { - Some(resp) => { - let qr = &resp.data; - match qr.data { - QRPacketData::RConn => { conn_id = qr.id; } - _ => { return Err(()); } - } + })).await?; + + if let Some(resp) = rx.blocking_recv() { + let qr = &resp.data; + match qr.data { + QRPacketData::RConn => { conn_id = qr.id; } + _ => { return Err(()); } } - None => { return Err(()); } + } else { + return Err(()); } + drop(tx); + drop(rx); + // Decode client requests from websocket, // pass requests to data system, // and return responses to client. @@ -55,15 +57,14 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi match code { CODE_HELLO => { - args.bus.send( - bus_ds, QRPacket::new(conn_id, QRPacketData::QHello) + args.tx.send( + QRPacket::new(conn_id, QRPacketData::QHello) ).await.ok(); } CODE_REGISTER => match PacketRegister::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QRegister(packet)) ).await.ok(); } @@ -72,8 +73,7 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi CODE_AUTH => match PacketAuth::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QAuth(packet)) ).await.ok(); } @@ -82,8 +82,7 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi CODE_AUTH_RESUME => match PacketAuthResume::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QAuthResume(packet)) ).await.ok(); } @@ -91,16 +90,14 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi } CODE_AUTH_REVOKE => { - args.bus.send( - bus_ds, QRPacket::new(conn_id, - QRPacketData::QAuthRevoke) + args.tx.send( + QRPacket::new(conn_id, QRPacketData::QAuthRevoke) ).await.ok(); } CODE_SESSION_LIST => match PacketSessionList::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QSessionList(packet)) ).await.ok(); } @@ -109,7 +106,7 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi /*CODE_SESSION_JOIN => match PacketSessionCreate::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( + args.tx.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QSessionCreate(packet)) ).await.ok(); @@ -119,8 +116,7 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi CODE_SESSION_VIEW => match PacketSessionView::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QSessionView(packet)) ).await.ok(); } @@ -128,16 +124,14 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi } CODE_SESSION_LEAVE => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QSessionLeave) ).await.ok(); } CODE_SESSION_RETIRE => match PacketSessionRetire::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QSessionResign(packet)) ).await.ok(); } @@ -146,8 +140,7 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi CODE_GAME_STATE => match PacketGameState::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QGameState(packet)) ).await.ok(); } @@ -156,8 +149,7 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi CODE_GAME_MESSAGE => match PacketGameMessage::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::GameMessage(packet)) ).await.ok(); } @@ -166,8 +158,7 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi CODE_CHALLENGE => match PacketChallenge::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QChallenge(packet)) ).await.ok(); } @@ -176,8 +167,7 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi CODE_CHALLENGE_ANSWER => match PacketChallengeAnswer::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QChallengeAnswer(packet)) ).await.ok(); } @@ -185,23 +175,20 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi } CODE_CHALLENGE_LIST => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QChallengeList) ).await.ok(); } CODE_USER_LIST => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QUserList) ).await.ok(); } CODE_USER_PROFILE => match PacketUserProfile::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QUserProfile(packet)) ).await.ok(); } @@ -209,15 +196,13 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi } CODE_INVITE_LIST => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QInviteList) ).await.ok(); } CODE_INVITE_ACQUIRE => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QInviteAcquire) ).await.ok(); } @@ -242,6 +227,6 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi None => false, } { } - args.bus.send(bus_ds, QRPacket::new(conn_id, QRPacketData::QDisconn)).await.ok(); + args.tx.send(QRPacket::new(conn_id, QRPacketData::QDisconn)).await.ok(); Ok(()) } diff --git a/server/src/protocol/mod.rs b/server/src/protocol/mod.rs index 5bdcbf5..3b3532b 100644 --- a/server/src/protocol/mod.rs +++ b/server/src/protocol/mod.rs @@ -1,5 +1,12 @@ #![allow(dead_code)] +use std::sync::Arc; +use tokio::sync::{mpsc::Sender, RwLock}; +use futures::stream::SplitSink; +use hyper::upgrade::Upgraded; +use hyper_util::rt::TokioIo; +use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; + pub mod code; pub use code::*; pub mod packet; pub use packet::*; @@ -7,7 +14,10 @@ pub mod packet; pub use packet::*; pub enum QRPacketData { None, - QConn(LocalPacketConnect), + QConn { + tx:Sender, + stream:Arc>, Message>>>, + }, RConn, QDisconn, diff --git a/server/src/protocol/packet/connect.rs b/server/src/protocol/packet/connect.rs index 299c597..912c942 100644 --- a/server/src/protocol/packet/connect.rs +++ b/server/src/protocol/packet/connect.rs @@ -1,12 +1,14 @@ use std::sync::Arc; -use tokio::sync::RwLock; +use tokio::sync::{mpsc::Sender, RwLock}; use futures::stream::SplitSink; use hyper::upgrade::Upgraded; use hyper_util::rt::TokioIo; use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; +use crate::protocol::QRPacket; + #[derive(Clone)] pub struct LocalPacketConnect { - pub bus_id:u32, + pub tx:Sender, pub stream:Arc>, Message>>>, } diff --git a/server/src/protocol/packet/mod.rs b/server/src/protocol/packet/mod.rs index 050fddc..90eefaa 100644 --- a/server/src/protocol/packet/mod.rs +++ b/server/src/protocol/packet/mod.rs @@ -1,5 +1,5 @@ mod hello; pub use hello::*; -mod connect; pub use connect::*; +//mod connect; pub use connect::*; mod register; pub use register::*; mod auth; pub use auth::*;