diff --git a/server/Cargo.toml b/server/Cargo.toml index 209256a..9a8a330 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -12,7 +12,6 @@ hyper-tungstenite = "0.14.0" rustls = "0.23.5" rustls-pemfile = "2.1.2" webpki-roots = "0.26" -opaque-ke = "2.0.0" hyper = { version = "1.4.1", features = ["full"] } hyper-util = { version = "0.1.7", features = ["tokio"] } http-body-util = "0.1.2" @@ -26,7 +25,6 @@ regex = "1.11.0" game = { path = "../game" } -bus = { git = "https://git.tsukiyo.org/Utility/bus" } sparse = { git = "https://git.tsukiyo.org/Utility/sparse" } trie = { git = "https://git.tsukiyo.org/Utility/trie" } pool = { git = "https://git.tsukiyo.org/Utility/pool" } diff --git a/server/src/app/connection.rs b/server/src/app/connection.rs index bc6b94b..80887bb 100644 --- a/server/src/app/connection.rs +++ b/server/src/app/connection.rs @@ -14,7 +14,6 @@ type StreamType = Arc>, Messa #[derive(Clone)] pub struct Connection { - pub bus:u32, pub stream:StreamType, pub auth:Option, diff --git a/server/src/app/mod.rs b/server/src/app/mod.rs index 360d556..c70128d 100644 --- a/server/src/app/mod.rs +++ b/server/src/app/mod.rs @@ -160,102 +160,105 @@ impl App { use tokio_tungstenite::tungstenite::Message; use futures::SinkExt; - if match response.data { QRPacketData::None => false, _ => true } { - if let Some(conn) = self.connections.get(response.id as usize) { - let mut socket = conn.stream.write().await; + match response.data { + QRPacketData::None => { } + _ => { + if let Some(conn) = self.connections.get(response.id as usize) { + let mut socket = conn.stream.write().await; - match response.data { - QRPacketData::RHello(response) => { - socket.send(Message::Binary( - encode_response(CODE_HELLO, response.encode()) - )).await.ok(); + match response.data { + QRPacketData::RHello(response) => { + socket.send(Message::Binary( + encode_response(CODE_HELLO, response.encode()) + )).await.ok(); + } + + QRPacketData::RRegister(response) => { + socket.send(Message::Binary( + encode_response(CODE_REGISTER, response.encode()) + )).await.ok(); + } + + QRPacketData::RAuth(response) => { + socket.send(Message::Binary( + encode_response(CODE_AUTH, response.encode()) + )).await.ok(); + } + + QRPacketData::RAuthResume(response) => { + socket.send(Message::Binary( + encode_response(CODE_AUTH_RESUME, response.encode()) + )).await.ok(); + } + + QRPacketData::RSessionList(response) => { + socket.send(Message::Binary( + encode_response(CODE_SESSION_LIST, response.encode()) + )).await.ok(); + } + + QRPacketData::RSessionView(response) => { + socket.send(Message::Binary( + encode_response(CODE_SESSION_VIEW, response.encode()) + )).await.ok(); + } + + QRPacketData::RGameState(response) => { + socket.send(Message::Binary( + encode_response(CODE_GAME_STATE, response.encode()) + )).await.ok(); + } + + QRPacketData::GameMessage(response) => { + socket.send(Message::Binary( + encode_response(CODE_GAME_MESSAGE, response.encode()) + )).await.ok(); + } + + QRPacketData::RChallengeAnswer(response) => { + socket.send(Message::Binary( + encode_response(CODE_CHALLENGE_ANSWER, response.encode()) + )).await.ok(); + } + + QRPacketData::RChallengeList(response) => { + socket.send(Message::Binary( + encode_response(CODE_CHALLENGE_LIST, response.encode()) + )).await.ok(); + } + + QRPacketData::RUserList(response) => { + socket.send(Message::Binary( + encode_response(CODE_USER_LIST, response.encode()) + )).await.ok(); + } + + QRPacketData::RUserProfile(response) => { + socket.send(Message::Binary( + encode_response(CODE_USER_PROFILE, response.encode()) + )).await.ok(); + } + + QRPacketData::RInviteList(response) => { + socket.send(Message::Binary( + encode_response(CODE_INVITE_LIST, response.encode()) + )).await.ok(); + } + + QRPacketData::RInviteAcquire(response) => { + socket.send(Message::Binary( + encode_response(CODE_INVITE_ACQUIRE, response.encode()) + )).await.ok(); + } + + QRPacketData::TestResult(response) => { + socket.send(Message::Binary( + encode_response(CODE_TEST_RESULT, response.encode()) + )).await.ok(); + } + + _ => { } } - - QRPacketData::RRegister(response) => { - socket.send(Message::Binary( - encode_response(CODE_REGISTER, response.encode()) - )).await.ok(); - } - - QRPacketData::RAuth(response) => { - socket.send(Message::Binary( - encode_response(CODE_AUTH, response.encode()) - )).await.ok(); - } - - QRPacketData::RAuthResume(response) => { - socket.send(Message::Binary( - encode_response(CODE_AUTH_RESUME, response.encode()) - )).await.ok(); - } - - QRPacketData::RSessionList(response) => { - socket.send(Message::Binary( - encode_response(CODE_SESSION_LIST, response.encode()) - )).await.ok(); - } - - QRPacketData::RSessionView(response) => { - socket.send(Message::Binary( - encode_response(CODE_SESSION_VIEW, response.encode()) - )).await.ok(); - } - - QRPacketData::RGameState(response) => { - socket.send(Message::Binary( - encode_response(CODE_GAME_STATE, response.encode()) - )).await.ok(); - } - - QRPacketData::GameMessage(response) => { - socket.send(Message::Binary( - encode_response(CODE_GAME_MESSAGE, response.encode()) - )).await.ok(); - } - - QRPacketData::RChallengeAnswer(response) => { - socket.send(Message::Binary( - encode_response(CODE_CHALLENGE_ANSWER, response.encode()) - )).await.ok(); - } - - QRPacketData::RChallengeList(response) => { - socket.send(Message::Binary( - encode_response(CODE_CHALLENGE_LIST, response.encode()) - )).await.ok(); - } - - QRPacketData::RUserList(response) => { - socket.send(Message::Binary( - encode_response(CODE_USER_LIST, response.encode()) - )).await.ok(); - } - - QRPacketData::RUserProfile(response) => { - socket.send(Message::Binary( - encode_response(CODE_USER_PROFILE, response.encode()) - )).await.ok(); - } - - QRPacketData::RInviteList(response) => { - socket.send(Message::Binary( - encode_response(CODE_INVITE_LIST, response.encode()) - )).await.ok(); - } - - QRPacketData::RInviteAcquire(response) => { - socket.send(Message::Binary( - encode_response(CODE_INVITE_ACQUIRE, response.encode()) - )).await.ok(); - } - - QRPacketData::TestResult(response) => { - socket.send(Message::Binary( - encode_response(CODE_TEST_RESULT, response.encode()) - )).await.ok(); - } - - _ => { } } } } diff --git a/server/src/main.rs b/server/src/main.rs index e38e78a..0cdea50 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -1,8 +1,6 @@ use std::net::SocketAddr; use std::path::Path; -use bus::Bus; - mod config; mod util; mod app; @@ -13,6 +11,7 @@ mod manager; use app::App; use hyper::body::Bytes; use hyper_util::rt::TokioIo; +use tokio::sync::mpsc::{self, Sender}; use system::{ cache::WebCache, net::{ @@ -21,10 +20,11 @@ use system::{ tls::*, }, }; +use protocol::QRPacket; #[derive(Clone)] struct HttpServiceArgs { - bus:Bus, + tx:Sender, cache:WebCache, } @@ -166,24 +166,16 @@ async fn main() /* ** Initialize central bus and data serivce. */ - let b_main :Bus = Bus::new_as(bus::Mode::Transmitter); - match b_main.connect() { - Ok(bus) => { - tokio::spawn(async move { - manager::thread_system(app, bus).await; - }); - } - Err(_) => { - println!("fatal: failed to initialize bus."); - return; - } - } + let (data_tx, data_rx) = mpsc::channel::(64); + tokio::spawn(async move { + manager::thread_system(app, data_rx).await; + }); /* ** Load image assets. */ - let mut js_asset_data = String::from("const GAME_ASSET = { Image: {"); + let mut js_asset_data = String::from("const GAME_ASSET={Image:{"); let asset_path = Path::new("www/asset/"); for name in [ "Promote", @@ -202,121 +194,113 @@ async fn main() println!("error: failed to load asset: {}", name); } } - js_asset_data += "} };"; + js_asset_data += "}};"; - match b_main.connect() { - Ok(bus) => { + /* + ** Cache source files. + */ + let cache = WebCache::new(); + cache.cache("text/html", "/.html", &[ + WebCache::file("www/.html"), + ]).ok(); + cache.cache_whitespace_minimize("/.html").ok(); + cache.cache("text/css", "/.css", &[ + WebCache::file("www/css/main.css"), + WebCache::file("www/css/ui.css"), + WebCache::file("www/css/form.css"), + WebCache::file("www/css/game.css"), + WebCache::file("www/css/text.css"), + WebCache::file("www/css/profile.css"), + WebCache::file("www/css/util.css"), + ]).ok(); + cache.cache("text/javascript", "/.js", &[ + WebCache::file("www/js/const.js"), + WebCache::file("www/js/language.js"), + WebCache::file("www/js/util.js"), + WebCache::file("www/js/badge.js"), + WebCache::file("www/js/game_asset.js"), + WebCache::string(&js_asset_data), + WebCache::file("www/js/game.js"), + WebCache::file("www/js/interface.js"), + WebCache::file("www/js/ui.js"), + WebCache::file("www/js/scene.js"), + WebCache::file("www/js/system.js"), + WebCache::file("www/js/main.js"), + ]).ok(); + cache.cache("image/png", "/favicon.png", &[ + WebCache::file("www/asset/favicon.png"), + ]).ok(); + cache.cache("image/png", "/favicon_notify.png", &[ + WebCache::file("www/asset/favicon_notify.png"), + ]).ok(); - /* - ** Cache source files. - */ - let cache = WebCache::new(); - cache.cache("text/html", "/.html", &[ - WebCache::file("www/.html"), - ]).ok(); - cache.cache_whitespace_minimize("/.html").ok(); - cache.cache("text/css", "/.css", &[ - WebCache::file("www/css/main.css"), - WebCache::file("www/css/ui.css"), - WebCache::file("www/css/form.css"), - WebCache::file("www/css/game.css"), - WebCache::file("www/css/text.css"), - WebCache::file("www/css/profile.css"), - WebCache::file("www/css/util.css"), - ]).ok(); - cache.cache("text/javascript", "/.js", &[ - WebCache::file("www/js/const.js"), - WebCache::file("www/js/language.js"), - WebCache::file("www/js/util.js"), - WebCache::file("www/js/badge.js"), - WebCache::file("www/js/game_asset.js"), - WebCache::string(&js_asset_data), - WebCache::file("www/js/game.js"), - WebCache::file("www/js/interface.js"), - WebCache::file("www/js/ui.js"), - WebCache::file("www/js/scene.js"), - WebCache::file("www/js/system.js"), - WebCache::file("www/js/main.js"), - ]).ok(); - cache.cache("image/png", "/favicon.png", &[ - WebCache::file("www/asset/favicon.png"), - ]).ok(); - cache.cache("image/png", "/favicon_notify.png", &[ - WebCache::file("www/asset/favicon_notify.png"), - ]).ok(); + let about_path = std::path::Path::new("www/pages/about"); + for doc in [ + "main", + ] { + if cache.cache("text/html", &format!("/about/{}.html", doc), &[ + WebCache::markdown(about_path.join(format!("{}.md", doc))) + ]).is_err() { + println!("error: failed to load: {}", doc); + } + } - let about_path = std::path::Path::new("www/pages/about"); - for doc in [ - "main", - ] { - if cache.cache("text/html", &format!("/about/{}.html", doc), &[ - WebCache::markdown(about_path.join(format!("{}.md", doc))) - ]).is_err() { - println!("error: failed to load: {}", doc); - } - } + let guide_path = std::path::Path::new("www/pages/guide"); + for doc in [ + "game", + "pieces", + "interface", + ] { + if cache.cache("text/html", &format!("/guide/{}.html", doc), &[ + WebCache::markdown(guide_path.join(format!("{}.md", doc))), + ]).is_err() { + println!("error: failed to load: {}", doc); + } + } + - let guide_path = std::path::Path::new("www/pages/guide"); - for doc in [ - "game", - "pieces", - "interface", - ] { - if cache.cache("text/html", &format!("/guide/{}.html", doc), &[ - WebCache::markdown(guide_path.join(format!("{}.md", doc))), - ]).is_err() { - println!("error: failed to load: {}", doc); - } - } - - - /* - ** Initialize network services. - */ - let mut tcp_server = TcpServer::new(); - match tcp_server.bind("127.0.0.1:38611").await { - Ok(_) => { - let b = bus.connect().unwrap(); - let c = cache.clone(); - tokio::spawn(async move { - while tcp_server.accept(handle_tcp, HttpServiceArgs { - bus:b.connect().unwrap(), - cache:c.clone(), - }).await.is_ok() { } - }); - } - Err(_) => { - println!("error: failed to bind TCP port 38611."); - } - } - - let mut tls_server = TlsServer::new(); - for domain in [ - "omen.kirisame.com", - "dzura.com", - ] { - if tls_server.add_cert(domain, &format!("cert/{}/fullchain.pem", domain), &format!("cert/{}/privkey.pem", domain)).await.is_err() { - println!("error: failed to load TLS certificates for {}.", domain); - } - } - match tls_server.bind("0.0.0.0:38612").await { - Ok(_) => { - let b = bus.connect().unwrap(); - let c = cache.clone(); - tokio::spawn(async move { - while tls_server.accept(handle_tls, HttpServiceArgs { - bus:b.connect().unwrap(), - cache:c.clone(), - }).await.is_ok() { } - }); - } - Err(_) => { - println!("error: failed to bind TLS port 38612."); - } - } + /* + ** Initialize network services. + */ + let mut tcp_server = TcpServer::new(); + match tcp_server.bind("127.0.0.1:38611").await { + Ok(_) => { + let c = cache.clone(); + let data_tx = data_tx.clone(); + tokio::spawn(async move { + while tcp_server.accept(handle_tcp, HttpServiceArgs { + tx:data_tx.clone(), + cache:c.clone(), + }).await.is_ok() { } + }); } Err(_) => { - println!("error: failed to initialize HTTPS service."); + println!("error: failed to bind TCP port 38611."); + } + } + + let mut tls_server = TlsServer::new(); + for domain in [ + "omen.kirisame.com", + "dzura.com", + ] { + if tls_server.add_cert(domain, &format!("cert/{}/fullchain.pem", domain), &format!("cert/{}/privkey.pem", domain)).await.is_err() { + println!("error: failed to load TLS certificates for {}.", domain); + } + } + match tls_server.bind("0.0.0.0:38612").await { + Ok(_) => { + let c = cache.clone(); + let data_tx = data_tx.clone(); + tokio::spawn(async move { + while tls_server.accept(handle_tls, HttpServiceArgs { + tx:data_tx.clone(), + cache:c.clone(), + }).await.is_ok() { } + }); + } + Err(_) => { + println!("error: failed to bind TLS port 38612."); } } diff --git a/server/src/manager/data.rs b/server/src/manager/data.rs index 8a6a7f5..1789aa8 100644 --- a/server/src/manager/data.rs +++ b/server/src/manager/data.rs @@ -1,4 +1,4 @@ -use bus::Bus; +use tokio::sync::mpsc::Receiver; use game::history::Play; use crate::{ config, @@ -10,13 +10,12 @@ use crate::{ session::{Session, SessionFilter}, context::{self, Context}, }, - protocol, + protocol::*, }; -pub async fn thread_system(mut app:App, bus:Bus) +pub async fn thread_system(mut app:App, mut rx:Receiver) { use futures::SinkExt; - use protocol::*; use ring::rand::{SecureRandom, SystemRandom}; let rng = SystemRandom::new(); @@ -24,8 +23,7 @@ pub async fn thread_system(mut app:App, bus:Bus) let mut send_user_status = Vec::::new(); - while let Some(packet) = bus.receive_wait() { - let qr = packet.data; + while let Some(qr) = rx.recv().await { let mut user_id = None; let mut context = Context::None; @@ -40,10 +38,12 @@ pub async fn thread_system(mut app:App, bus:Bus) } match match qr.data { - QRPacketData::QConn(request) => { + QRPacketData::QConn { + tx, + stream, + } => { let id = app.connections.add(Connection { - bus: request.bus_id, - stream: request.stream, + stream, auth: None, context:Context::None, @@ -58,10 +58,9 @@ pub async fn thread_system(mut app:App, bus:Bus) app.log.log(&format!("Connect: {}", id)); - bus.send( - packet.from, + tx.send( QRPacket::new(id as u32, QRPacketData::RConn) - ).ok(); + ).await.ok(); Some(QRPacket::new(0, QRPacketData::None)) } @@ -1053,10 +1052,8 @@ pub async fn thread_system(mut app:App, bus:Bus) } } -fn generate_game_state(app:&App, session:&Session) -> protocol::PacketGameStateResponse +fn generate_game_state(app:&App, session:&Session) -> PacketGameStateResponse { - use protocol::PacketGameStateResponse; - let mut response = PacketGameStateResponse::new(); response.token = session.token; diff --git a/server/src/manager/ws.rs b/server/src/manager/ws.rs index f519efc..6c2dadd 100644 --- a/server/src/manager/ws.rs +++ b/server/src/manager/ws.rs @@ -1,5 +1,5 @@ use std::sync::Arc; -use tokio::sync::RwLock; +use tokio::sync::{RwLock, mpsc}; use hyper::upgrade::Upgraded; use hyper_util::rt::TokioIo; use tokio_tungstenite::WebSocketStream; @@ -21,28 +21,29 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr let conn_id :u32; let (sink, mut stream) = ws.split(); - - let bus_ds = args.bus.mailbox(1).unwrap_or(1); + let (tx, mut rx) = mpsc::channel::(1); // Perform connection handshake with data system. // - Provide system with connection/bus pairing. // - Acquire connection id. // - args.bus.send(bus_ds, QRPacket::new(0, QRPacketData::QConn(LocalPacketConnect { - bus_id:args.bus.id(), + args.tx.send(QRPacket::new(0, QRPacketData::QConn { + tx:tx.clone(), stream:Arc::new(RwLock::new(sink)), - })))?; - match args.bus.receive_wait() { - Some(resp) => { - let qr = &resp.data; - match qr.data { - QRPacketData::RConn => { conn_id = qr.id; } - _ => { return Err(()); } - } + })).await.map_err(|_| ())?; + + if let Some(qr) = rx.recv().await { + match &qr.data { + QRPacketData::RConn => { conn_id = qr.id; } + _ => { return Err(()); } } - None => { return Err(()); } + } else { + return Err(()); } + drop(tx); + drop(rx); + // Decode client requests from websocket, // pass requests to data system, // and return responses to client. @@ -55,171 +56,154 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr match code { CODE_HELLO => { - args.bus.send( - bus_ds, QRPacket::new(conn_id, QRPacketData::QHello) - ).ok(); + args.tx.send( + QRPacket::new(conn_id, QRPacketData::QHello) + ).await.ok(); } CODE_REGISTER => match PacketRegister::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QRegister(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } CODE_AUTH => match PacketAuth::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QAuth(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } CODE_AUTH_RESUME => match PacketAuthResume::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QAuthResume(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } CODE_AUTH_REVOKE => { - args.bus.send( - bus_ds, QRPacket::new(conn_id, - QRPacketData::QAuthRevoke) - ).ok(); + args.tx.send( + QRPacket::new(conn_id, QRPacketData::QAuthRevoke) + ).await.ok(); } CODE_SESSION_LIST => match PacketSessionList::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QSessionList(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } /*CODE_SESSION_JOIN => match PacketSessionCreate::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( + args.tx.send( bus_ds, QRPacket::new(conn_id, QRPacketData::QSessionCreate(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } }*/ CODE_SESSION_VIEW => match PacketSessionView::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QSessionView(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } CODE_SESSION_LEAVE => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QSessionLeave) - ).ok(); + ).await.ok(); } CODE_SESSION_RETIRE => match PacketSessionRetire::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QSessionResign(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } CODE_GAME_STATE => match PacketGameState::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QGameState(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } CODE_GAME_MESSAGE => match PacketGameMessage::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::GameMessage(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } CODE_CHALLENGE => match PacketChallenge::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QChallenge(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } CODE_CHALLENGE_ANSWER => match PacketChallengeAnswer::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QChallengeAnswer(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } CODE_CHALLENGE_LIST => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QChallengeList) - ).ok(); + ).await.ok(); } CODE_USER_LIST => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QUserList) - ).ok(); + ).await.ok(); } CODE_USER_PROFILE => match PacketUserProfile::decode(&data, &mut index) { Ok(packet) => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QUserProfile(packet)) - ).ok(); + ).await.ok(); } Err(_) => { println!("error: packet decode failed."); } } CODE_INVITE_LIST => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QInviteList) - ).ok(); + ).await.ok(); } CODE_INVITE_ACQUIRE => { - args.bus.send( - bus_ds, + args.tx.send( QRPacket::new(conn_id, QRPacketData::QInviteAcquire) - ).ok(); + ).await.ok(); } _ => { } @@ -242,6 +226,6 @@ pub async fn handle_ws(ws:WebSocketStream>, args:HttpServiceAr None => false, } { } - args.bus.send(bus_ds, QRPacket::new(conn_id, QRPacketData::QDisconn)).ok(); + args.tx.send(QRPacket::new(conn_id, QRPacketData::QDisconn)).await.ok(); Ok(()) } diff --git a/server/src/protocol/mod.rs b/server/src/protocol/mod.rs index 5bdcbf5..3b3532b 100644 --- a/server/src/protocol/mod.rs +++ b/server/src/protocol/mod.rs @@ -1,5 +1,12 @@ #![allow(dead_code)] +use std::sync::Arc; +use tokio::sync::{mpsc::Sender, RwLock}; +use futures::stream::SplitSink; +use hyper::upgrade::Upgraded; +use hyper_util::rt::TokioIo; +use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; + pub mod code; pub use code::*; pub mod packet; pub use packet::*; @@ -7,7 +14,10 @@ pub mod packet; pub use packet::*; pub enum QRPacketData { None, - QConn(LocalPacketConnect), + QConn { + tx:Sender, + stream:Arc>, Message>>>, + }, RConn, QDisconn, diff --git a/server/src/protocol/packet/connect.rs b/server/src/protocol/packet/connect.rs index 299c597..912c942 100644 --- a/server/src/protocol/packet/connect.rs +++ b/server/src/protocol/packet/connect.rs @@ -1,12 +1,14 @@ use std::sync::Arc; -use tokio::sync::RwLock; +use tokio::sync::{mpsc::Sender, RwLock}; use futures::stream::SplitSink; use hyper::upgrade::Upgraded; use hyper_util::rt::TokioIo; use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; +use crate::protocol::QRPacket; + #[derive(Clone)] pub struct LocalPacketConnect { - pub bus_id:u32, + pub tx:Sender, pub stream:Arc>, Message>>>, } diff --git a/server/src/protocol/packet/mod.rs b/server/src/protocol/packet/mod.rs index 050fddc..90eefaa 100644 --- a/server/src/protocol/packet/mod.rs +++ b/server/src/protocol/packet/mod.rs @@ -1,5 +1,5 @@ mod hello; pub use hello::*; -mod connect; pub use connect::*; +//mod connect; pub use connect::*; mod register; pub use register::*; mod auth; pub use auth::*;