diff --git a/server/Cargo.toml b/server/Cargo.toml index 209256a..38241f1 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -26,7 +26,7 @@ regex = "1.11.0" game = { path = "../game" } -bus = { git = "https://git.tsukiyo.org/Utility/bus" } +bus = { git = "https://git.tsukiyo.org/Utility/bus", branch = "tokio" } sparse = { git = "https://git.tsukiyo.org/Utility/sparse" } trie = { git = "https://git.tsukiyo.org/Utility/trie" } pool = { git = "https://git.tsukiyo.org/Utility/pool" } diff --git a/server/src/main.rs b/server/src/main.rs index e38e78a..6f14ee2 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -22,11 +22,19 @@ use system::{ }, }; -#[derive(Clone)] struct HttpServiceArgs { bus:Bus, cache:WebCache, } +impl HttpServiceArgs { + pub async fn clone(&mut self) -> Self + { + Self { + bus:self.bus.connect().await.unwrap(), + cache:self.cache.clone(), + } + } +} async fn service_http(mut request:hyper::Request, args:HttpServiceArgs) -> Result>, std::convert::Infallible> // Serve cached files and upgrade websocket connections. @@ -110,7 +118,7 @@ async fn service_http(mut request:hyper::Request, args:Ht } } -async fn handle_http(stream:S, addr:SocketAddr, args:HttpServiceArgs) -> Result<(),()> +async fn handle_http(stream:S, addr:SocketAddr, mut args:HttpServiceArgs) -> Result<(),()> where S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static // Hand off socket connection to Hyper server. // @@ -122,9 +130,10 @@ where S: tokio::io::AsyncRead + tokio::io::AsyncWrite + Unpin + Send + 'static let io = TokioIo::new(stream); + let args = args.clone().await; let conn = http1::Builder::new() .serve_connection(io, service_fn(move |req| { - service_http(req, args.clone()) + service_http(req, args) })); conn.with_upgrades().await.ok(); @@ -166,8 +175,8 @@ async fn main() /* ** Initialize central bus and data serivce. */ - let b_main :Bus = Bus::new_as(bus::Mode::Transmitter); - match b_main.connect() { + let mut b_main :Bus = Bus::new_as(bus::Mode::Transmitter); + match b_main.connect().await { Ok(bus) => { tokio::spawn(async move { manager::thread_system(app, bus).await; @@ -183,7 +192,7 @@ async fn main() /* ** Load image assets. */ - let mut js_asset_data = String::from("const GAME_ASSET = { Image: {"); + let mut js_asset_data = String::from("const GAME_ASSET={Image:{"); let asset_path = Path::new("www/asset/"); for name in [ "Promote", @@ -202,10 +211,10 @@ async fn main() println!("error: failed to load asset: {}", name); } } - js_asset_data += "} };"; + js_asset_data += "}};"; - match b_main.connect() { - Ok(bus) => { + match b_main.connect().await { + Ok(mut bus) => { /* ** Cache source files. @@ -276,11 +285,11 @@ async fn main() let mut tcp_server = TcpServer::new(); match tcp_server.bind("127.0.0.1:38611").await { Ok(_) => { - let b = bus.connect().unwrap(); + let mut b = bus.connect().await.unwrap(); let c = cache.clone(); tokio::spawn(async move { while tcp_server.accept(handle_tcp, HttpServiceArgs { - bus:b.connect().unwrap(), + bus:b.connect().await.unwrap(), cache:c.clone(), }).await.is_ok() { } }); @@ -301,11 +310,11 @@ async fn main() } match tls_server.bind("0.0.0.0:38612").await { Ok(_) => { - let b = bus.connect().unwrap(); + let mut b = bus.connect().await.unwrap(); let c = cache.clone(); tokio::spawn(async move { while tls_server.accept(handle_tls, HttpServiceArgs { - bus:b.connect().unwrap(), + bus:b.connect().await.unwrap(), cache:c.clone(), }).await.is_ok() { } }); diff --git a/server/src/manager/data.rs b/server/src/manager/data.rs index 8a6a7f5..dd0ddf2 100644 --- a/server/src/manager/data.rs +++ b/server/src/manager/data.rs @@ -13,7 +13,7 @@ use crate::{ protocol, }; -pub async fn thread_system(mut app:App, bus:Bus) +pub async fn thread_system(mut app:App, mut bus:Bus) { use futures::SinkExt; use protocol::*; @@ -24,7 +24,7 @@ pub async fn thread_system(mut app:App, bus:Bus) let mut send_user_status = Vec::::new(); - while let Some(packet) = bus.receive_wait() { + while let Some(packet) = bus.receive_wait().await { let qr = packet.data; let mut user_id = None; @@ -61,7 +61,7 @@ pub async fn thread_system(mut app:App, bus:Bus) bus.send( packet.from, QRPacket::new(id as u32, QRPacketData::RConn) - ).ok(); + ).await.ok(); Some(QRPacket::new(0, QRPacketData::None)) } diff --git a/server/src/manager/ws.rs b/server/src/manager/ws.rs index f519efc..1f7c777 100644 --- a/server/src/manager/ws.rs +++ b/server/src/manager/ws.rs @@ -11,7 +11,7 @@ use crate::{ HttpServiceArgs, }; -pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceArgs) -> Result<(),()> +pub async fn handle_ws(ws:WebSocketStream>, mut args:HttpServiceArgs) -> Result<(),()> // Handle websocket connection. // { @@ -22,7 +22,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr let (sink, mut stream) = ws.split(); - let bus_ds = args.bus.mailbox(1).unwrap_or(1); + let bus_ds = args.bus.mailbox(1).await.unwrap_or(1); // Perform connection handshake with data system. // - Provide system with connection/bus pairing. @@ -31,8 +31,8 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send(bus_ds, QRPacket::new(0, QRPacketData::QConn(LocalPacketConnect { bus_id:args.bus.id(), stream:Arc::new(RwLock::new(sink)), - })))?; - match args.bus.receive_wait() { + }))).await?; + match args.bus.receive_wait().await { Some(resp) => { let qr = &resp.data; match qr.data { @@ -57,7 +57,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr CODE_HELLO => { args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QHello) - ).ok(); + ).await.ok(); } CODE_REGISTER => match PacketRegister::decode(&data, &mut index) { @@ -65,7 +65,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QRegister(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } @@ -75,7 +75,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QAuth(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } @@ -85,7 +85,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QAuthResume(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } @@ -94,7 +94,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QAuthRevoke) - ).ok(); + ).await.ok(); } CODE_SESSION_LIST => match PacketSessionList::decode(&data, &mut index) { @@ -102,7 +102,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QSessionList(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } @@ -112,7 +112,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QSessionCreate(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } }*/ @@ -122,7 +122,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QSessionView(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } @@ -131,7 +131,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QSessionLeave) - ).ok(); + ).await.ok(); } CODE_SESSION_RETIRE => match PacketSessionRetire::decode(&data, &mut index) { @@ -139,7 +139,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QSessionResign(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } @@ -149,7 +149,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QGameState(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } @@ -159,7 +159,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::GameMessage(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } @@ -169,7 +169,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QChallenge(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } @@ -179,7 +179,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QChallengeAnswer(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } @@ -188,14 +188,14 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QChallengeList) - ).ok(); + ).await.ok(); } CODE_USER_LIST => { args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QUserList) - ).ok(); + ).await.ok(); } CODE_USER_PROFILE => match PacketUserProfile::decode(&data, &mut index) { @@ -203,7 +203,7 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QUserProfile(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } @@ -212,14 +212,14 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QInviteList) - ).ok(); + ).await.ok(); } CODE_INVITE_ACQUIRE => { args.bus.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QInviteAcquire) - ).ok(); + ).await.ok(); } _ => { } @@ -242,6 +242,6 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr None => false, } { } - args.bus.send(bus_ds, QRPacket::new(conn_id, QRPacketData::QDisconn)).ok(); + args.bus.send(bus_ds, QRPacket::new(conn_id, QRPacketData::QDisconn)).await.ok(); Ok(()) }