From 45366ff330a789fe199aed29df935d24eb3bcf71 Mon Sep 17 00:00:00 2001 From: yukirij Date: Sun, 24 Nov 2024 13:18:10 -0800 Subject: [PATCH 1/3] #5: Update to use async bus. --- server/Cargo.toml | 2 +- server/src/main.rs | 35 ++++++++++++++++----------- server/src/manager/data.rs | 6 ++--- server/src/manager/ws.rs | 48 +++++++++++++++++++------------------- 4 files changed, 50 insertions(+), 41 deletions(-) diff --git a/server/Cargo.toml b/server/Cargo.toml index 209256a..38241f1 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -26,7 +26,7 @@ regex = "1.11.0" game = { path = "../game" } -bus = { git = "https://git.tsukiyo.org/Utility/bus" } +bus = { git = "https://git.tsukiyo.org/Utility/bus", branch = "tokio" } sparse = { git = "https://git.tsukiyo.org/Utility/sparse" } trie = { git = "https://git.tsukiyo.org/Utility/trie" } pool = { git = "https://git.tsukiyo.org/Utility/pool" } diff --git a/server/src/main.rs b/server/src/main.rs index e38e78a..6f14ee2 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -22,11 +22,19 @@ use system::{ }, }; -#[derive(Clone)] struct HttpServiceArgs { bus:Bus, cache:WebCache, } +impl HttpServiceArgs { + pub async fn clone(&mut self) -> Self + { + Self { + bus:self.bus.connect().await.unwrap(), + cache:self.cache.clone(), + } + } +} async fn service_http(mut request:hyper::Request, args:HttpServiceArgs) -> Result>, std::convert::Infallible> // Serve cached files and upgrade websocket connections. @@ -110,7 +118,7 @@ async fn service_http(mut request:hyper::Request, args:Ht } } -async fn handle_http(stream:S, addr:SocketAddr, args:HttpServiceArgs) -> Result<(),()> +async fn handle_http(stream:S, addr:SocketAddr, mut args:HttpServiceArgs) -> Result<(),()> where S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static // Hand off socket connection to Hyper server. // @@ -122,9 +130,10 @@ where S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static let io = TokioIo::new(stream); + let args = args.clone().await; let conn = http1::Builder::new() .serve_connection(io, service_fn(move |req| { - service_http(req, args.clone()) + service_http(req, args) })); conn.with_upgrades().await.ok(); @@ -166,8 +175,8 @@ async fn main() /* ** Initialize central bus and data serivce. */ - let b_main :Bus = Bus::new_as(bus::Mode::Transmitter); - match b_main.connect() { + let mut b_main :Bus = Bus::new_as(bus::Mode::Transmitter); + match b_main.connect().await { Ok(bus) => { tokio::spawn(async move { manager::thread_system(app, bus).await; @@ -183,7 +192,7 @@ async fn main() /* ** Load image assets. */ - let mut js_asset_data = String::from("const GAME_ASSET = { Image: {"); + let mut js_asset_data = String::from("const GAME_ASSET={Image:{"); let asset_path = Path::new("www/asset/"); for name in [ "Promote", @@ -202,10 +211,10 @@ async fn main() println!("error: failed to load asset: {}", name); } } - js_asset_data += "} };"; + js_asset_data += "}};"; - match b_main.connect() { - Ok(bus) => { + match b_main.connect().await { + Ok(mut bus) => { /* ** Cache source files. @@ -276,11 +285,11 @@ async fn main() let mut tcp_server = TcpServer::new(); match tcp_server.bind("127.0.0.1:38611").await { Ok(_) => { - let b = bus.connect().unwrap(); + let mut b = bus.connect().await.unwrap(); let c = cache.clone(); tokio::spawn(async move { while tcp_server.accept(handle_tcp, HttpServiceArgs { - bus:b.connect().unwrap(), + bus:b.connect().await.unwrap(), cache:c.clone(), }).await.is_ok() { } }); @@ -301,11 +310,11 @@ async fn main() } match tls_server.bind("0.0.0.0:38612").await { Ok(_) => { - let b = bus.connect().unwrap(); + let mut b = bus.connect().await.unwrap(); let c = cache.clone(); tokio::spawn(async move { while tls_server.accept(handle_tls, HttpServiceArgs { - bus:b.connect().unwrap(), + bus:b.connect().await.unwrap(), cache:c.clone(), }).await.is_ok() { } }); diff --git a/server/src/manager/data.rs b/server/src/manager/data.rs index 8a6a7f5..dd0ddf2 100644 --- a/server/src/manager/data.rs +++ b/server/src/manager/data.rs @@ -13,7 +13,7 @@ use crate::{ protocol, }; -pub async fn thread_system(mut app:App, bus:Bus) +pub async fn thread_system(mut app:App, mut bus:Bus) { use futures::SinkExt; use protocol::*; @@ -24,7 +24,7 @@ pub async fn thread_system(mut app:App, bus:Bus) let mut send_user_status = Vec::::new(); - while let Some(packet) = bus.receive_wait() { + while let Some(packet) = bus.receive_wait().await { let qr = packet.data; let mut user_id = None; @@ -61,7 +61,7 @@ pub async fn thread_system(mut app:App, bus:Bus) bus.send( packet.from, QRPacket::new(id as u32, QRPacketData::RConn) - ).ok(); + ).await.ok(); Some(QRPacket::new(0, QRPacketData::None)) } diff --git a/server/src/manager/ws.rs b/server/src/manager/ws.rs index f519efc..1f7c777 100644 --- a/server/src/manager/ws.rs +++ b/server/src/manager/ws.rs @@ -11,7 +11,7 @@ use crate::{ HttpServiceArgs, }; -pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceArgs) -> Result<(),()> +pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServiceArgs) -> Result<(),()> // Handle websocket connection. // { @@ -22,7 +22,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr let (sink, mut stream) = ws.split(); - let bus_ds = args.bus.mailbox(1).unwrap_or(1); + let bus_ds = args.bus.mailbox(1).await.unwrap_or(1); // Perform connection handshake with data system. // - Provide system with connection/bus pairing. @@ -31,8 +31,8 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send(bus_ds, QRPacket::new(0, QRPacketData::QConn(LocalPacketConnect { bus_id:args.bus.id(), stream:Arc::new(RwLock::new(sink)), - })))?; - match args.bus.receive_wait() { + }))).await?; + match args.bus.receive_wait().await { Some(resp) => { let qr = &resp.data; match qr.data { @@ -57,7 +57,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr CODE_HELLO => { args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QHello) - ).ok(); + ).await.ok(); } CODE_REGISTER => match PacketRegister::decode(&data, &mut index) { @@ -65,7 +65,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QRegister(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } @@ -75,7 +75,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QAuth(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } @@ -85,7 +85,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QAuthResume(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } @@ -94,7 +94,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QAuthRevoke) - ).ok(); + ).await.ok(); } CODE_SESSION_LIST => match PacketSessionList::decode(&data, &mut index) { @@ -102,7 +102,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QSessionList(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } @@ -112,7 +112,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QSessionCreate(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } }*/ @@ -122,7 +122,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QSessionView(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } @@ -131,7 +131,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QSessionLeave) - ).ok(); + ).await.ok(); } CODE_SESSION_RETIRE => match PacketSessionRetire::decode(&data, &mut index) { @@ -139,7 +139,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QSessionResign(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } @@ -149,7 +149,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QGameState(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } @@ -159,7 +159,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::GameMessage(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } @@ -169,7 +169,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QChallenge(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } @@ -179,7 +179,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QChallengeAnswer(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } @@ -188,14 +188,14 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QChallengeList) - ).ok(); + ).await.ok(); } CODE_USER_LIST => { args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QUserList) - ).ok(); + ).await.ok(); } CODE_USER_PROFILE => match PacketUserProfile::decode(&data, &mut index) { @@ -203,7 +203,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QUserProfile(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } @@ -212,14 +212,14 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QInviteList) - ).ok(); + ).await.ok(); } CODE_INVITE_ACQUIRE => { args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QInviteAcquire) - ).ok(); + ).await.ok(); } _ => { } @@ -242,6 +242,6 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr None => false, } { } - args.bus.send(bus_ds, QRPacket::new(conn_id, QRPacketData::QDisconn)).ok(); + args.bus.send(bus_ds, QRPacket::new(conn_id, QRPacketData::QDisconn)).await.ok(); Ok(()) } From 9c0c4005466299f518904f0975389dbd17de1f24 Mon Sep 17 00:00:00 2001 From: yukirij Date: Tue, 17 Dec 2024 12:52:41 -0800 Subject: [PATCH 2/3] Replace bus with tokio mpsc. --- server/Cargo.toml | 1 - server/src/app/connection.rs | 1 - server/src/app/mod.rs | 191 +++++++++++++------------- server/src/main.rs | 30 ++-- server/src/manager/data.rs | 18 +-- server/src/manager/ws.rs | 91 +++++------- server/src/protocol/mod.rs | 12 +- server/src/protocol/packet/connect.rs | 6 +- server/src/protocol/packet/mod.rs | 2 +- 9 files changed, 174 insertions(+), 178 deletions(-) diff --git a/server/Cargo.toml b/server/Cargo.toml index 38241f1..538e0df 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -26,7 +26,6 @@ regex = "1.11.0" game = { path = "../game" } -bus = { git = "https://git.tsukiyo.org/Utility/bus", branch = "tokio" } sparse = { git = "https://git.tsukiyo.org/Utility/sparse" } trie = { git = "https://git.tsukiyo.org/Utility/trie" } pool = { git = "https://git.tsukiyo.org/Utility/pool" } diff --git a/server/src/app/connection.rs b/server/src/app/connection.rs index bc6b94b..80887bb 100644 --- a/server/src/app/connection.rs +++ b/server/src/app/connection.rs @@ -14,7 +14,6 @@ type StreamType = Arc>, Messa #[derive(Clone)] pub struct Connection { - pub bus:u32, pub stream:StreamType, pub auth:Option, diff --git a/server/src/app/mod.rs b/server/src/app/mod.rs index 360d556..c70128d 100644 --- a/server/src/app/mod.rs +++ b/server/src/app/mod.rs @@ -160,102 +160,105 @@ impl App { use tokio_tungstenite::tungstenite::Message; use futures::SinkExt; - if match response.data { QRPacketData::None => false, _ => true } { - if let Some(conn) = self.connections.get(response.id as usize) { - let mut socket = conn.stream.write().await; + match response.data { + QRPacketData::None => { } + _ => { + if let Some(conn) = self.connections.get(response.id as usize) { + let mut socket = conn.stream.write().await; - match response.data { - QRPacketData::RHello(response) => { - socket.send(Message::Binary( - encode_response(CODE_HELLO, response.encode()) - )).await.ok(); + match response.data { + QRPacketData::RHello(response) => { + socket.send(Message::Binary( + encode_response(CODE_HELLO, response.encode()) + )).await.ok(); + } + + QRPacketData::RRegister(response) => { + socket.send(Message::Binary( + encode_response(CODE_REGISTER, response.encode()) + )).await.ok(); + } + + QRPacketData::RAuth(response) => { + socket.send(Message::Binary( + encode_response(CODE_AUTH, response.encode()) + )).await.ok(); + } + + QRPacketData::RAuthResume(response) => { + socket.send(Message::Binary( + encode_response(CODE_AUTH_RESUME, response.encode()) + )).await.ok(); + } + + QRPacketData::RSessionList(response) => { + socket.send(Message::Binary( + encode_response(CODE_SESSION_LIST, response.encode()) + )).await.ok(); + } + + QRPacketData::RSessionView(response) => { + socket.send(Message::Binary( + encode_response(CODE_SESSION_VIEW, response.encode()) + )).await.ok(); + } + + QRPacketData::RGameState(response) => { + socket.send(Message::Binary( + encode_response(CODE_GAME_STATE, response.encode()) + )).await.ok(); + } + + QRPacketData::GameMessage(response) => { + socket.send(Message::Binary( + encode_response(CODE_GAME_MESSAGE, response.encode()) + )).await.ok(); + } + + QRPacketData::RChallengeAnswer(response) => { + socket.send(Message::Binary( + encode_response(CODE_CHALLENGE_ANSWER, response.encode()) + )).await.ok(); + } + + QRPacketData::RChallengeList(response) => { + socket.send(Message::Binary( + encode_response(CODE_CHALLENGE_LIST, response.encode()) + )).await.ok(); + } + + QRPacketData::RUserList(response) => { + socket.send(Message::Binary( + encode_response(CODE_USER_LIST, response.encode()) + )).await.ok(); + } + + QRPacketData::RUserProfile(response) => { + socket.send(Message::Binary( + encode_response(CODE_USER_PROFILE, response.encode()) + )).await.ok(); + } + + QRPacketData::RInviteList(response) => { + socket.send(Message::Binary( + encode_response(CODE_INVITE_LIST, response.encode()) + )).await.ok(); + } + + QRPacketData::RInviteAcquire(response) => { + socket.send(Message::Binary( + encode_response(CODE_INVITE_ACQUIRE, response.encode()) + )).await.ok(); + } + + QRPacketData::TestResult(response) => { + socket.send(Message::Binary( + encode_response(CODE_TEST_RESULT, response.encode()) + )).await.ok(); + } + + _ => { } } - - QRPacketData::RRegister(response) => { - socket.send(Message::Binary( - encode_response(CODE_REGISTER, response.encode()) - )).await.ok(); - } - - QRPacketData::RAuth(response) => { - socket.send(Message::Binary( - encode_response(CODE_AUTH, response.encode()) - )).await.ok(); - } - - QRPacketData::RAuthResume(response) => { - socket.send(Message::Binary( - encode_response(CODE_AUTH_RESUME, response.encode()) - )).await.ok(); - } - - QRPacketData::RSessionList(response) => { - socket.send(Message::Binary( - encode_response(CODE_SESSION_LIST, response.encode()) - )).await.ok(); - } - - QRPacketData::RSessionView(response) => { - socket.send(Message::Binary( - encode_response(CODE_SESSION_VIEW, response.encode()) - )).await.ok(); - } - - QRPacketData::RGameState(response) => { - socket.send(Message::Binary( - encode_response(CODE_GAME_STATE, response.encode()) - )).await.ok(); - } - - QRPacketData::GameMessage(response) => { - socket.send(Message::Binary( - encode_response(CODE_GAME_MESSAGE, response.encode()) - )).await.ok(); - } - - QRPacketData::RChallengeAnswer(response) => { - socket.send(Message::Binary( - encode_response(CODE_CHALLENGE_ANSWER, response.encode()) - )).await.ok(); - } - - QRPacketData::RChallengeList(response) => { - socket.send(Message::Binary( - encode_response(CODE_CHALLENGE_LIST, response.encode()) - )).await.ok(); - } - - QRPacketData::RUserList(response) => { - socket.send(Message::Binary( - encode_response(CODE_USER_LIST, response.encode()) - )).await.ok(); - } - - QRPacketData::RUserProfile(response) => { - socket.send(Message::Binary( - encode_response(CODE_USER_PROFILE, response.encode()) - )).await.ok(); - } - - QRPacketData::RInviteList(response) => { - socket.send(Message::Binary( - encode_response(CODE_INVITE_LIST, response.encode()) - )).await.ok(); - } - - QRPacketData::RInviteAcquire(response) => { - socket.send(Message::Binary( - encode_response(CODE_INVITE_ACQUIRE, response.encode()) - )).await.ok(); - } - - QRPacketData::TestResult(response) => { - socket.send(Message::Binary( - encode_response(CODE_TEST_RESULT, response.encode()) - )).await.ok(); - } - - _ => { } } } } diff --git a/server/src/main.rs b/server/src/main.rs index 6f14ee2..509db5c 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -13,6 +13,7 @@ mod manager; use app::App; use hyper::body::Bytes; use hyper_util::rt::TokioIo; +use tokio::sync::mpsc::{self, Sender}; use system::{ cache::WebCache, net::{ @@ -21,16 +22,17 @@ use system::{ tls::*, }, }; +use protocol::QRPacket; struct HttpServiceArgs { - bus:Bus, + tx:Sender, cache:WebCache, } impl HttpServiceArgs { pub async fn clone(&mut self) -> Self { Self { - bus:self.bus.connect().await.unwrap(), + tx:self.tx.clone(), cache:self.cache.clone(), } } @@ -175,18 +177,10 @@ async fn main() /* ** Initialize central bus and data serivce. */ - let mut b_main :Bus = Bus::new_as(bus::Mode::Transmitter); - match b_main.connect().await { - Ok(bus) => { - tokio::spawn(async move { - manager::thread_system(app, bus).await; - }); - } - Err(_) => { - println!("fatal: failed to initialize bus."); - return; - } - } + let (data_tx, data_rx) = mpsc::channel::(64); + tokio::spawn(async move { + manager::thread_system(app, data_rx).await; + }); /* @@ -287,9 +281,10 @@ async fn main() Ok(_) => { let mut b = bus.connect().await.unwrap(); let c = cache.clone(); + let data_tx = data_tx.clone(); tokio::spawn(async move { while tcp_server.accept(handle_tcp, HttpServiceArgs { - bus:b.connect().await.unwrap(), + tx:data_tx, cache:c.clone(), }).await.is_ok() { } }); @@ -310,11 +305,14 @@ async fn main() } match tls_server.bind("0.0.0.0:38612").await { Ok(_) => { + let (tx, rx) = tokio::sync::mpsc::channel::<>(24); let mut b = bus.connect().await.unwrap(); let c = cache.clone(); + + let data_tx = data_tx.clone(); tokio::spawn(async move { while tls_server.accept(handle_tls, HttpServiceArgs { - bus:b.connect().await.unwrap(), + tx:data_tx, cache:c.clone(), }).await.is_ok() { } }); diff --git a/server/src/manager/data.rs b/server/src/manager/data.rs index dd0ddf2..4f44216 100644 --- a/server/src/manager/data.rs +++ b/server/src/manager/data.rs @@ -1,4 +1,4 @@ -use bus::Bus; +use tokio::sync::mpsc::Receiver; use game::history::Play; use crate::{ config, @@ -10,13 +10,12 @@ use crate::{ session::{Session, SessionFilter}, context::{self, Context}, }, - protocol, + protocol::*, }; -pub async fn thread_system(mut app:App, mut bus:Bus) +pub async fn thread_system(mut app:App, mut rx:Receiver) { use futures::SinkExt; - use protocol::*; use ring::rand::{SecureRandom, SystemRandom}; let rng = SystemRandom::new(); @@ -24,7 +23,7 @@ pub async fn thread_system(mut app:App, mut bus:Bus) let mut send_user_status = Vec::::new(); - while let Some(packet) = bus.receive_wait().await { + while let Some(packet) = rx.recv().await { let qr = packet.data; let mut user_id = None; @@ -40,9 +39,11 @@ pub async fn thread_system(mut app:App, mut bus:Bus) } match match qr.data { - QRPacketData::QConn(request) => { + QRPacketData::QConn { + tx, + stream, + } => { let id = app.connections.add(Connection { - bus: request.bus_id, stream: request.stream, auth: None, @@ -58,8 +59,7 @@ pub async fn thread_system(mut app:App, mut bus:Bus) app.log.log(&format!("Connect: {}", id)); - bus.send( - packet.from, + tx.send( QRPacket::new(id as u32, QRPacketData::RConn) ).await.ok(); diff --git a/server/src/manager/ws.rs b/server/src/manager/ws.rs index 1f7c777..fb88bab 100644 --- a/server/src/manager/ws.rs +++ b/server/src/manager/ws.rs @@ -1,5 +1,5 @@ use std::sync::Arc; -use tokio::sync::RwLock; +use tokio::sync::{RwLock, mpsc}; use hyper::upgrade::Upgraded; use hyper_util::rt::TokioIo; use tokio_tungstenite::WebSocketStream; @@ -21,28 +21,30 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi let conn_id :u32; let (sink, mut stream) = ws.split(); - - let bus_ds = args.bus.mailbox(1).await.unwrap_or(1); + let (tx, rx) = mpsc::channel::(1); // Perform connection handshake with data system. // - Provide system with connection/bus pairing. // - Acquire connection id. // - args.bus.send(bus_ds, QRPacket::new(0, QRPacketData::QConn(LocalPacketConnect { - bus_id:args.bus.id(), + args.tx.send(QRPacket::new(0, QRPacketData::QConn { + tx:tx.clone(), stream:Arc::new(RwLock::new(sink)), - }))).await?; - match args.bus.receive_wait().await { - Some(resp) => { - let qr = &resp.data; - match qr.data { - QRPacketData::RConn => { conn_id = qr.id; } - _ => { return Err(()); } - } + })).await?; + + if let Some(resp) = rx.blocking_recv() { + let qr = &resp.data; + match qr.data { + QRPacketData::RConn => { conn_id = qr.id; } + _ => { return Err(()); } } - None => { return Err(()); } + } else { + return Err(()); } + drop(tx); + drop(rx); + // Decode client requests from websocket, // pass requests to data system, // and return responses to client. @@ -55,15 +57,14 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi match code { CODE_HELLO => { - args.bus.send( - bus_ds, QRPacket::new(conn_id, QRPacketData::QHello) + args.tx.send( + QRPacket::new(conn_id, QRPacketData::QHello) ).await.ok(); } CODE_REGISTER => match PacketRegister::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QRegister(packet)) ).await.ok(); } @@ -72,8 +73,7 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi CODE_AUTH => match PacketAuth::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QAuth(packet)) ).await.ok(); } @@ -82,8 +82,7 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi CODE_AUTH_RESUME => match PacketAuthResume::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QAuthResume(packet)) ).await.ok(); } @@ -91,16 +90,14 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi } CODE_AUTH_REVOKE => { - args.bus.send( - bus_ds, QRPacket::new(conn_id, - QRPacketData::QAuthRevoke) + args.tx.send( + QRPacket::new(conn_id, QRPacketData::QAuthRevoke) ).await.ok(); } CODE_SESSION_LIST => match PacketSessionList::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QSessionList(packet)) ).await.ok(); } @@ -109,7 +106,7 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi /*CODE_SESSION_JOIN => match PacketSessionCreate::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( + args.tx.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QSessionCreate(packet)) ).await.ok(); @@ -119,8 +116,7 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi CODE_SESSION_VIEW => match PacketSessionView::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QSessionView(packet)) ).await.ok(); } @@ -128,16 +124,14 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi } CODE_SESSION_LEAVE => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QSessionLeave) ).await.ok(); } CODE_SESSION_RETIRE => match PacketSessionRetire::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QSessionResign(packet)) ).await.ok(); } @@ -146,8 +140,7 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi CODE_GAME_STATE => match PacketGameState::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QGameState(packet)) ).await.ok(); } @@ -156,8 +149,7 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi CODE_GAME_MESSAGE => match PacketGameMessage::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::GameMessage(packet)) ).await.ok(); } @@ -166,8 +158,7 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi CODE_CHALLENGE => match PacketChallenge::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QChallenge(packet)) ).await.ok(); } @@ -176,8 +167,7 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi CODE_CHALLENGE_ANSWER => match PacketChallengeAnswer::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QChallengeAnswer(packet)) ).await.ok(); } @@ -185,23 +175,20 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi } CODE_CHALLENGE_LIST => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QChallengeList) ).await.ok(); } CODE_USER_LIST => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QUserList) ).await.ok(); } CODE_USER_PROFILE => match PacketUserProfile::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QUserProfile(packet)) ).await.ok(); } @@ -209,15 +196,13 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi } CODE_INVITE_LIST => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QInviteList) ).await.ok(); } CODE_INVITE_ACQUIRE => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QInviteAcquire) ).await.ok(); } @@ -242,6 +227,6 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi None => false, } { } - args.bus.send(bus_ds, QRPacket::new(conn_id, QRPacketData::QDisconn)).await.ok(); + args.tx.send(QRPacket::new(conn_id, QRPacketData::QDisconn)).await.ok(); Ok(()) } diff --git a/server/src/protocol/mod.rs b/server/src/protocol/mod.rs index 5bdcbf5..3b3532b 100644 --- a/server/src/protocol/mod.rs +++ b/server/src/protocol/mod.rs @@ -1,5 +1,12 @@ #![allow(dead_code)] +use std::sync::Arc; +use tokio::sync::{mpsc::Sender, RwLock}; +use futures::stream::SplitSink; +use hyper::upgrade::Upgraded; +use hyper_util::rt::TokioIo; +use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; + pub mod code; pub use code::*; pub mod packet; pub use packet::*; @@ -7,7 +14,10 @@ pub mod packet; pub use packet::*; pub enum QRPacketData { None, - QConn(LocalPacketConnect), + QConn { + tx:Sender, + stream:Arc>, Message>>>, + }, RConn, QDisconn, diff --git a/server/src/protocol/packet/connect.rs b/server/src/protocol/packet/connect.rs index 299c597..912c942 100644 --- a/server/src/protocol/packet/connect.rs +++ b/server/src/protocol/packet/connect.rs @@ -1,12 +1,14 @@ use std::sync::Arc; -use tokio::sync::RwLock; +use tokio::sync::{mpsc::Sender, RwLock}; use futures::stream::SplitSink; use hyper::upgrade::Upgraded; use hyper_util::rt::TokioIo; use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; +use crate::protocol::QRPacket; + #[derive(Clone)] pub struct LocalPacketConnect { - pub bus_id:u32, + pub tx:Sender, pub stream:Arc>, Message>>>, } diff --git a/server/src/protocol/packet/mod.rs b/server/src/protocol/packet/mod.rs index 050fddc..90eefaa 100644 --- a/server/src/protocol/packet/mod.rs +++ b/server/src/protocol/packet/mod.rs @@ -1,5 +1,5 @@ mod hello; pub use hello::*; -mod connect; pub use connect::*; +//mod connect; pub use connect::*; mod register; pub use register::*; mod auth; pub use auth::*; From 614616193cc11b0b1e5e2ac8c8e223a65dcee1cf Mon Sep 17 00:00:00 2001 From: yukirij Date: Tue, 17 Dec 2024 13:31:24 -0800 Subject: [PATCH 3/3] Fix build errors. --- server/Cargo.toml | 1 - server/src/main.rs | 229 +++++++++++++++++-------------------- server/src/manager/data.rs | 9 +- server/src/manager/ws.rs | 11 +- 4 files changed, 111 insertions(+), 139 deletions(-) diff --git a/server/Cargo.toml b/server/Cargo.toml index 538e0df..9a8a330 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -12,7 +12,6 @@ hyper-tungstenite = "0.14.0" rustls = "0.23.5" rustls-pemfile = "2.1.2" webpki-roots = "0.26" -opaque-ke = "2.0.0" hyper = { version = "1.4.1", features = ["full"] } hyper-util = { version = "0.1.7", features = ["tokio"] } http-body-util = "0.1.2" diff --git a/server/src/main.rs b/server/src/main.rs index 509db5c..0cdea50 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,8 +1,6 @@ use std::net::SocketAddr; use std::path::Path; -use bus::Bus; - mod config; mod util; mod app; @@ -24,19 +22,11 @@ use system::{ }; use protocol::QRPacket; +#[derive(Clone)] struct HttpServiceArgs { tx:Sender, cache:WebCache, } -impl HttpServiceArgs { - pub async fn clone(&mut self) -> Self - { - Self { - tx:self.tx.clone(), - cache:self.cache.clone(), - } - } -} async fn service_http(mut request:hyper::Request, args:HttpServiceArgs) -> Result>, std::convert::Infallible> // Serve cached files and upgrade websocket connections. @@ -120,7 +110,7 @@ async fn service_http(mut request:hyper::Request, args:Ht } } -async fn handle_http(stream:S, addr:SocketAddr, mut args:HttpServiceArgs) -> Result<(),()> +async fn handle_http(stream:S, addr:SocketAddr, args:HttpServiceArgs) -> Result<(),()> where S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static // Hand off socket connection to Hyper server. // @@ -132,10 +122,9 @@ where S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static let io = TokioIo::new(stream); - let args = args.clone().await; let conn = http1::Builder::new() .serve_connection(io, service_fn(move |req| { - service_http(req, args) + service_http(req, args.clone()) })); conn.with_upgrades().await.ok(); @@ -207,123 +196,111 @@ async fn main() } js_asset_data += "}};"; - match b_main.connect().await { - Ok(mut bus) => { + /* + ** Cache source files. + */ + let cache = WebCache::new(); + cache.cache("text/html", "/.html", &[ + WebCache::file("www/.html"), + ]).ok(); + cache.cache_whitespace_minimize("/.html").ok(); + cache.cache("text/css", "/.css", &[ + WebCache::file("www/css/main.css"), + WebCache::file("www/css/ui.css"), + WebCache::file("www/css/form.css"), + WebCache::file("www/css/game.css"), + WebCache::file("www/css/text.css"), + WebCache::file("www/css/profile.css"), + WebCache::file("www/css/util.css"), + ]).ok(); + cache.cache("text/javascript", "/.js", &[ + WebCache::file("www/js/const.js"), + WebCache::file("www/js/language.js"), + WebCache::file("www/js/util.js"), + WebCache::file("www/js/badge.js"), + WebCache::file("www/js/game_asset.js"), + WebCache::string(&js_asset_data), + WebCache::file("www/js/game.js"), + WebCache::file("www/js/interface.js"), + WebCache::file("www/js/ui.js"), + WebCache::file("www/js/scene.js"), + WebCache::file("www/js/system.js"), + WebCache::file("www/js/main.js"), + ]).ok(); + cache.cache("image/png", "/favicon.png", &[ + WebCache::file("www/asset/favicon.png"), + ]).ok(); + cache.cache("image/png", "/favicon_notify.png", &[ + WebCache::file("www/asset/favicon_notify.png"), + ]).ok(); - /* - ** Cache source files. - */ - let cache = WebCache::new(); - cache.cache("text/html", "/.html", &[ - WebCache::file("www/.html"), - ]).ok(); - cache.cache_whitespace_minimize("/.html").ok(); - cache.cache("text/css", "/.css", &[ - WebCache::file("www/css/main.css"), - WebCache::file("www/css/ui.css"), - WebCache::file("www/css/form.css"), - WebCache::file("www/css/game.css"), - WebCache::file("www/css/text.css"), - WebCache::file("www/css/profile.css"), - WebCache::file("www/css/util.css"), - ]).ok(); - cache.cache("text/javascript", "/.js", &[ - WebCache::file("www/js/const.js"), - WebCache::file("www/js/language.js"), - WebCache::file("www/js/util.js"), - WebCache::file("www/js/badge.js"), - WebCache::file("www/js/game_asset.js"), - WebCache::string(&js_asset_data), - WebCache::file("www/js/game.js"), - WebCache::file("www/js/interface.js"), - WebCache::file("www/js/ui.js"), - WebCache::file("www/js/scene.js"), - WebCache::file("www/js/system.js"), - WebCache::file("www/js/main.js"), - ]).ok(); - cache.cache("image/png", "/favicon.png", &[ - WebCache::file("www/asset/favicon.png"), - ]).ok(); - cache.cache("image/png", "/favicon_notify.png", &[ - WebCache::file("www/asset/favicon_notify.png"), - ]).ok(); + let about_path = std::path::Path::new("www/pages/about"); + for doc in [ + "main", + ] { + if cache.cache("text/html", &format!("/about/{}.html", doc), &[ + WebCache::markdown(about_path.join(format!("{}.md", doc))) + ]).is_err() { + println!("error: failed to load: {}", doc); + } + } - let about_path = std::path::Path::new("www/pages/about"); - for doc in [ - "main", - ] { - if cache.cache("text/html", &format!("/about/{}.html", doc), &[ - WebCache::markdown(about_path.join(format!("{}.md", doc))) - ]).is_err() { - println!("error: failed to load: {}", doc); - } - } + let guide_path = std::path::Path::new("www/pages/guide"); + for doc in [ + "game", + "pieces", + "interface", + ] { + if cache.cache("text/html", &format!("/guide/{}.html", doc), &[ + WebCache::markdown(guide_path.join(format!("{}.md", doc))), + ]).is_err() { + println!("error: failed to load: {}", doc); + } + } + - let guide_path = std::path::Path::new("www/pages/guide"); - for doc in [ - "game", - "pieces", - "interface", - ] { - if cache.cache("text/html", &format!("/guide/{}.html", doc), &[ - WebCache::markdown(guide_path.join(format!("{}.md", doc))), - ]).is_err() { - println!("error: failed to load: {}", doc); - } - } - - - /* - ** Initialize network services. - */ - let mut tcp_server = TcpServer::new(); - match tcp_server.bind("127.0.0.1:38611").await { - Ok(_) => { - let mut b = bus.connect().await.unwrap(); - let c = cache.clone(); - let data_tx = data_tx.clone(); - tokio::spawn(async move { - while tcp_server.accept(handle_tcp, HttpServiceArgs { - tx:data_tx, - cache:c.clone(), - }).await.is_ok() { } - }); - } - Err(_) => { - println!("error: failed to bind TCP port 38611."); - } - } - - let mut tls_server = TlsServer::new(); - for domain in [ - "omen.kirisame.com", - "dzura.com", - ] { - if tls_server.add_cert(domain, &format!("cert/{}/fullchain.pem", domain), &format!("cert/{}/privkey.pem", domain)).await.is_err() { - println!("error: failed to load TLS certificates for {}.", domain); - } - } - match tls_server.bind("0.0.0.0:38612").await { - Ok(_) => { - let (tx, rx) = tokio::sync::mpsc::channel::<>(24); - let mut b = bus.connect().await.unwrap(); - let c = cache.clone(); - - let data_tx = data_tx.clone(); - tokio::spawn(async move { - while tls_server.accept(handle_tls, HttpServiceArgs { - tx:data_tx, - cache:c.clone(), - }).await.is_ok() { } - }); - } - Err(_) => { - println!("error: failed to bind TLS port 38612."); - } - } + /* + ** Initialize network services. + */ + let mut tcp_server = TcpServer::new(); + match tcp_server.bind("127.0.0.1:38611").await { + Ok(_) => { + let c = cache.clone(); + let data_tx = data_tx.clone(); + tokio::spawn(async move { + while tcp_server.accept(handle_tcp, HttpServiceArgs { + tx:data_tx.clone(), + cache:c.clone(), + }).await.is_ok() { } + }); } Err(_) => { - println!("error: failed to initialize HTTPS service."); + println!("error: failed to bind TCP port 38611."); + } + } + + let mut tls_server = TlsServer::new(); + for domain in [ + "omen.kirisame.com", + "dzura.com", + ] { + if tls_server.add_cert(domain, &format!("cert/{}/fullchain.pem", domain), &format!("cert/{}/privkey.pem", domain)).await.is_err() { + println!("error: failed to load TLS certificates for {}.", domain); + } + } + match tls_server.bind("0.0.0.0:38612").await { + Ok(_) => { + let c = cache.clone(); + let data_tx = data_tx.clone(); + tokio::spawn(async move { + while tls_server.accept(handle_tls, HttpServiceArgs { + tx:data_tx.clone(), + cache:c.clone(), + }).await.is_ok() { } + }); + } + Err(_) => { + println!("error: failed to bind TLS port 38612."); } } diff --git a/server/src/manager/data.rs b/server/src/manager/data.rs index 4f44216..1789aa8 100644 --- a/server/src/manager/data.rs +++ b/server/src/manager/data.rs @@ -23,8 +23,7 @@ pub async fn thread_system(mut app:App, mut rx:Receiver) let mut send_user_status = Vec::::new(); - while let Some(packet) = rx.recv().await { - let qr = packet.data; + while let Some(qr) = rx.recv().await { let mut user_id = None; let mut context = Context::None; @@ -44,7 +43,7 @@ pub async fn thread_system(mut app:App, mut rx:Receiver) stream, } => { let id = app.connections.add(Connection { - stream: request.stream, + stream, auth: None, context:Context::None, @@ -1053,10 +1052,8 @@ pub async fn thread_system(mut app:App, mut rx:Receiver) } } -fn generate_game_state(app:&App, session:&Session) -> protocol::PacketGameStateResponse +fn generate_game_state(app:&App, session:&Session) -> PacketGameStateResponse { - use protocol::PacketGameStateResponse; - let mut response = PacketGameStateResponse::new(); response.token = session.token; diff --git a/server/src/manager/ws.rs b/server/src/manager/ws.rs index fb88bab..6c2dadd 100644 --- a/server/src/manager/ws.rs +++ b/server/src/manager/ws.rs @@ -11,7 +11,7 @@ use crate::{ HttpServiceArgs, }; -pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServiceArgs) -> Result<(),()> +pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceArgs) -> Result<(),()> // Handle websocket connection. // { @@ -21,7 +21,7 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi let conn_id :u32; let (sink, mut stream) = ws.split(); - let (tx, rx) = mpsc::channel::(1); + let (tx, mut rx) = mpsc::channel::(1); // Perform connection handshake with data system. // - Provide system with connection/bus pairing. @@ -30,11 +30,10 @@ pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServi args.tx.send(QRPacket::new(0, QRPacketData::QConn { tx:tx.clone(), stream:Arc::new(RwLock::new(sink)), - })).await?; + })).await.map_err(|_| ())?; - if let Some(resp) = rx.blocking_recv() { - let qr = &resp.data; - match qr.data { + if let Some(qr) = rx.recv().await { + match &qr.data { QRPacketData::RConn => { conn_id = qr.id; } _ => { return Err(()); } }