Replace bus with tokio mpsc.
This commit is contained in:
parent
45366ff330
commit
9c0c400546
@ -26,7 +26,6 @@ regex = "1.11.0"
|
|||||||
|
|
||||||
game = { path = "../game" }
|
game = { path = "../game" }
|
||||||
|
|
||||||
bus = { git = "https://git.tsukiyo.org/Utility/bus", branch = "tokio" }
|
|
||||||
sparse = { git = "https://git.tsukiyo.org/Utility/sparse" }
|
sparse = { git = "https://git.tsukiyo.org/Utility/sparse" }
|
||||||
trie = { git = "https://git.tsukiyo.org/Utility/trie" }
|
trie = { git = "https://git.tsukiyo.org/Utility/trie" }
|
||||||
pool = { git = "https://git.tsukiyo.org/Utility/pool" }
|
pool = { git = "https://git.tsukiyo.org/Utility/pool" }
|
||||||
|
@ -14,7 +14,6 @@ type StreamType = Arc<RwLock<SplitSink<WebSocketStream<TokioIo<Upgraded>>, Messa
|
|||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct Connection {
|
pub struct Connection {
|
||||||
pub bus:u32,
|
|
||||||
pub stream:StreamType,
|
pub stream:StreamType,
|
||||||
pub auth:Option<AuthToken>,
|
pub auth:Option<AuthToken>,
|
||||||
|
|
||||||
|
@ -160,102 +160,105 @@ impl App {
|
|||||||
use tokio_tungstenite::tungstenite::Message;
|
use tokio_tungstenite::tungstenite::Message;
|
||||||
use futures::SinkExt;
|
use futures::SinkExt;
|
||||||
|
|
||||||
if match response.data { QRPacketData::None => false, _ => true } {
|
match response.data {
|
||||||
if let Some(conn) = self.connections.get(response.id as usize) {
|
QRPacketData::None => { }
|
||||||
let mut socket = conn.stream.write().await;
|
_ => {
|
||||||
|
if let Some(conn) = self.connections.get(response.id as usize) {
|
||||||
|
let mut socket = conn.stream.write().await;
|
||||||
|
|
||||||
match response.data {
|
match response.data {
|
||||||
QRPacketData::RHello(response) => {
|
QRPacketData::RHello(response) => {
|
||||||
socket.send(Message::Binary(
|
socket.send(Message::Binary(
|
||||||
encode_response(CODE_HELLO, response.encode())
|
encode_response(CODE_HELLO, response.encode())
|
||||||
)).await.ok();
|
)).await.ok();
|
||||||
|
}
|
||||||
|
|
||||||
|
QRPacketData::RRegister(response) => {
|
||||||
|
socket.send(Message::Binary(
|
||||||
|
encode_response(CODE_REGISTER, response.encode())
|
||||||
|
)).await.ok();
|
||||||
|
}
|
||||||
|
|
||||||
|
QRPacketData::RAuth(response) => {
|
||||||
|
socket.send(Message::Binary(
|
||||||
|
encode_response(CODE_AUTH, response.encode())
|
||||||
|
)).await.ok();
|
||||||
|
}
|
||||||
|
|
||||||
|
QRPacketData::RAuthResume(response) => {
|
||||||
|
socket.send(Message::Binary(
|
||||||
|
encode_response(CODE_AUTH_RESUME, response.encode())
|
||||||
|
)).await.ok();
|
||||||
|
}
|
||||||
|
|
||||||
|
QRPacketData::RSessionList(response) => {
|
||||||
|
socket.send(Message::Binary(
|
||||||
|
encode_response(CODE_SESSION_LIST, response.encode())
|
||||||
|
)).await.ok();
|
||||||
|
}
|
||||||
|
|
||||||
|
QRPacketData::RSessionView(response) => {
|
||||||
|
socket.send(Message::Binary(
|
||||||
|
encode_response(CODE_SESSION_VIEW, response.encode())
|
||||||
|
)).await.ok();
|
||||||
|
}
|
||||||
|
|
||||||
|
QRPacketData::RGameState(response) => {
|
||||||
|
socket.send(Message::Binary(
|
||||||
|
encode_response(CODE_GAME_STATE, response.encode())
|
||||||
|
)).await.ok();
|
||||||
|
}
|
||||||
|
|
||||||
|
QRPacketData::GameMessage(response) => {
|
||||||
|
socket.send(Message::Binary(
|
||||||
|
encode_response(CODE_GAME_MESSAGE, response.encode())
|
||||||
|
)).await.ok();
|
||||||
|
}
|
||||||
|
|
||||||
|
QRPacketData::RChallengeAnswer(response) => {
|
||||||
|
socket.send(Message::Binary(
|
||||||
|
encode_response(CODE_CHALLENGE_ANSWER, response.encode())
|
||||||
|
)).await.ok();
|
||||||
|
}
|
||||||
|
|
||||||
|
QRPacketData::RChallengeList(response) => {
|
||||||
|
socket.send(Message::Binary(
|
||||||
|
encode_response(CODE_CHALLENGE_LIST, response.encode())
|
||||||
|
)).await.ok();
|
||||||
|
}
|
||||||
|
|
||||||
|
QRPacketData::RUserList(response) => {
|
||||||
|
socket.send(Message::Binary(
|
||||||
|
encode_response(CODE_USER_LIST, response.encode())
|
||||||
|
)).await.ok();
|
||||||
|
}
|
||||||
|
|
||||||
|
QRPacketData::RUserProfile(response) => {
|
||||||
|
socket.send(Message::Binary(
|
||||||
|
encode_response(CODE_USER_PROFILE, response.encode())
|
||||||
|
)).await.ok();
|
||||||
|
}
|
||||||
|
|
||||||
|
QRPacketData::RInviteList(response) => {
|
||||||
|
socket.send(Message::Binary(
|
||||||
|
encode_response(CODE_INVITE_LIST, response.encode())
|
||||||
|
)).await.ok();
|
||||||
|
}
|
||||||
|
|
||||||
|
QRPacketData::RInviteAcquire(response) => {
|
||||||
|
socket.send(Message::Binary(
|
||||||
|
encode_response(CODE_INVITE_ACQUIRE, response.encode())
|
||||||
|
)).await.ok();
|
||||||
|
}
|
||||||
|
|
||||||
|
QRPacketData::TestResult(response) => {
|
||||||
|
socket.send(Message::Binary(
|
||||||
|
encode_response(CODE_TEST_RESULT, response.encode())
|
||||||
|
)).await.ok();
|
||||||
|
}
|
||||||
|
|
||||||
|
_ => { }
|
||||||
}
|
}
|
||||||
|
|
||||||
QRPacketData::RRegister(response) => {
|
|
||||||
socket.send(Message::Binary(
|
|
||||||
encode_response(CODE_REGISTER, response.encode())
|
|
||||||
)).await.ok();
|
|
||||||
}
|
|
||||||
|
|
||||||
QRPacketData::RAuth(response) => {
|
|
||||||
socket.send(Message::Binary(
|
|
||||||
encode_response(CODE_AUTH, response.encode())
|
|
||||||
)).await.ok();
|
|
||||||
}
|
|
||||||
|
|
||||||
QRPacketData::RAuthResume(response) => {
|
|
||||||
socket.send(Message::Binary(
|
|
||||||
encode_response(CODE_AUTH_RESUME, response.encode())
|
|
||||||
)).await.ok();
|
|
||||||
}
|
|
||||||
|
|
||||||
QRPacketData::RSessionList(response) => {
|
|
||||||
socket.send(Message::Binary(
|
|
||||||
encode_response(CODE_SESSION_LIST, response.encode())
|
|
||||||
)).await.ok();
|
|
||||||
}
|
|
||||||
|
|
||||||
QRPacketData::RSessionView(response) => {
|
|
||||||
socket.send(Message::Binary(
|
|
||||||
encode_response(CODE_SESSION_VIEW, response.encode())
|
|
||||||
)).await.ok();
|
|
||||||
}
|
|
||||||
|
|
||||||
QRPacketData::RGameState(response) => {
|
|
||||||
socket.send(Message::Binary(
|
|
||||||
encode_response(CODE_GAME_STATE, response.encode())
|
|
||||||
)).await.ok();
|
|
||||||
}
|
|
||||||
|
|
||||||
QRPacketData::GameMessage(response) => {
|
|
||||||
socket.send(Message::Binary(
|
|
||||||
encode_response(CODE_GAME_MESSAGE, response.encode())
|
|
||||||
)).await.ok();
|
|
||||||
}
|
|
||||||
|
|
||||||
QRPacketData::RChallengeAnswer(response) => {
|
|
||||||
socket.send(Message::Binary(
|
|
||||||
encode_response(CODE_CHALLENGE_ANSWER, response.encode())
|
|
||||||
)).await.ok();
|
|
||||||
}
|
|
||||||
|
|
||||||
QRPacketData::RChallengeList(response) => {
|
|
||||||
socket.send(Message::Binary(
|
|
||||||
encode_response(CODE_CHALLENGE_LIST, response.encode())
|
|
||||||
)).await.ok();
|
|
||||||
}
|
|
||||||
|
|
||||||
QRPacketData::RUserList(response) => {
|
|
||||||
socket.send(Message::Binary(
|
|
||||||
encode_response(CODE_USER_LIST, response.encode())
|
|
||||||
)).await.ok();
|
|
||||||
}
|
|
||||||
|
|
||||||
QRPacketData::RUserProfile(response) => {
|
|
||||||
socket.send(Message::Binary(
|
|
||||||
encode_response(CODE_USER_PROFILE, response.encode())
|
|
||||||
)).await.ok();
|
|
||||||
}
|
|
||||||
|
|
||||||
QRPacketData::RInviteList(response) => {
|
|
||||||
socket.send(Message::Binary(
|
|
||||||
encode_response(CODE_INVITE_LIST, response.encode())
|
|
||||||
)).await.ok();
|
|
||||||
}
|
|
||||||
|
|
||||||
QRPacketData::RInviteAcquire(response) => {
|
|
||||||
socket.send(Message::Binary(
|
|
||||||
encode_response(CODE_INVITE_ACQUIRE, response.encode())
|
|
||||||
)).await.ok();
|
|
||||||
}
|
|
||||||
|
|
||||||
QRPacketData::TestResult(response) => {
|
|
||||||
socket.send(Message::Binary(
|
|
||||||
encode_response(CODE_TEST_RESULT, response.encode())
|
|
||||||
)).await.ok();
|
|
||||||
}
|
|
||||||
|
|
||||||
_ => { }
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -13,6 +13,7 @@ mod manager;
|
|||||||
use app::App;
|
use app::App;
|
||||||
use hyper::body::Bytes;
|
use hyper::body::Bytes;
|
||||||
use hyper_util::rt::TokioIo;
|
use hyper_util::rt::TokioIo;
|
||||||
|
use tokio::sync::mpsc::{self, Sender};
|
||||||
use system::{
|
use system::{
|
||||||
cache::WebCache,
|
cache::WebCache,
|
||||||
net::{
|
net::{
|
||||||
@ -21,16 +22,17 @@ use system::{
|
|||||||
tls::*,
|
tls::*,
|
||||||
},
|
},
|
||||||
};
|
};
|
||||||
|
use protocol::QRPacket;
|
||||||
|
|
||||||
struct HttpServiceArgs {
|
struct HttpServiceArgs {
|
||||||
bus:Bus<protocol::QRPacket>,
|
tx:Sender<QRPacket>,
|
||||||
cache:WebCache,
|
cache:WebCache,
|
||||||
}
|
}
|
||||||
impl HttpServiceArgs {
|
impl HttpServiceArgs {
|
||||||
pub async fn clone(&mut self) -> Self
|
pub async fn clone(&mut self) -> Self
|
||||||
{
|
{
|
||||||
Self {
|
Self {
|
||||||
bus:self.bus.connect().await.unwrap(),
|
tx:self.tx.clone(),
|
||||||
cache:self.cache.clone(),
|
cache:self.cache.clone(),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -175,18 +177,10 @@ async fn main()
|
|||||||
/*
|
/*
|
||||||
** Initialize central bus and data serivce.
|
** Initialize central bus and data serivce.
|
||||||
*/
|
*/
|
||||||
let mut b_main :Bus<protocol::QRPacket> = Bus::new_as(bus::Mode::Transmitter);
|
let (data_tx, data_rx) = mpsc::channel::<QRPacket>(64);
|
||||||
match b_main.connect().await {
|
tokio::spawn(async move {
|
||||||
Ok(bus) => {
|
manager::thread_system(app, data_rx).await;
|
||||||
tokio::spawn(async move {
|
});
|
||||||
manager::thread_system(app, bus).await;
|
|
||||||
});
|
|
||||||
}
|
|
||||||
Err(_) => {
|
|
||||||
println!("fatal: failed to initialize bus.");
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
|
|
||||||
/*
|
/*
|
||||||
@ -287,9 +281,10 @@ async fn main()
|
|||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
let mut b = bus.connect().await.unwrap();
|
let mut b = bus.connect().await.unwrap();
|
||||||
let c = cache.clone();
|
let c = cache.clone();
|
||||||
|
let data_tx = data_tx.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
while tcp_server.accept(handle_tcp, HttpServiceArgs {
|
while tcp_server.accept(handle_tcp, HttpServiceArgs {
|
||||||
bus:b.connect().await.unwrap(),
|
tx:data_tx,
|
||||||
cache:c.clone(),
|
cache:c.clone(),
|
||||||
}).await.is_ok() { }
|
}).await.is_ok() { }
|
||||||
});
|
});
|
||||||
@ -310,11 +305,14 @@ async fn main()
|
|||||||
}
|
}
|
||||||
match tls_server.bind("0.0.0.0:38612").await {
|
match tls_server.bind("0.0.0.0:38612").await {
|
||||||
Ok(_) => {
|
Ok(_) => {
|
||||||
|
let (tx, rx) = tokio::sync::mpsc::channel::<>(24);
|
||||||
let mut b = bus.connect().await.unwrap();
|
let mut b = bus.connect().await.unwrap();
|
||||||
let c = cache.clone();
|
let c = cache.clone();
|
||||||
|
|
||||||
|
let data_tx = data_tx.clone();
|
||||||
tokio::spawn(async move {
|
tokio::spawn(async move {
|
||||||
while tls_server.accept(handle_tls, HttpServiceArgs {
|
while tls_server.accept(handle_tls, HttpServiceArgs {
|
||||||
bus:b.connect().await.unwrap(),
|
tx:data_tx,
|
||||||
cache:c.clone(),
|
cache:c.clone(),
|
||||||
}).await.is_ok() { }
|
}).await.is_ok() { }
|
||||||
});
|
});
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
use bus::Bus;
|
use tokio::sync::mpsc::Receiver;
|
||||||
use game::history::Play;
|
use game::history::Play;
|
||||||
use crate::{
|
use crate::{
|
||||||
config,
|
config,
|
||||||
@ -10,13 +10,12 @@ use crate::{
|
|||||||
session::{Session, SessionFilter},
|
session::{Session, SessionFilter},
|
||||||
context::{self, Context},
|
context::{self, Context},
|
||||||
},
|
},
|
||||||
protocol,
|
protocol::*,
|
||||||
};
|
};
|
||||||
|
|
||||||
pub async fn thread_system(mut app:App, mut bus:Bus<protocol::QRPacket>)
|
pub async fn thread_system(mut app:App, mut rx:Receiver<QRPacket>)
|
||||||
{
|
{
|
||||||
use futures::SinkExt;
|
use futures::SinkExt;
|
||||||
use protocol::*;
|
|
||||||
use ring::rand::{SecureRandom, SystemRandom};
|
use ring::rand::{SecureRandom, SystemRandom};
|
||||||
|
|
||||||
let rng = SystemRandom::new();
|
let rng = SystemRandom::new();
|
||||||
@ -24,7 +23,7 @@ pub async fn thread_system(mut app:App, mut bus:Bus<protocol::QRPacket>)
|
|||||||
|
|
||||||
let mut send_user_status = Vec::<u32>::new();
|
let mut send_user_status = Vec::<u32>::new();
|
||||||
|
|
||||||
while let Some(packet) = bus.receive_wait().await {
|
while let Some(packet) = rx.recv().await {
|
||||||
let qr = packet.data;
|
let qr = packet.data;
|
||||||
|
|
||||||
let mut user_id = None;
|
let mut user_id = None;
|
||||||
@ -40,9 +39,11 @@ pub async fn thread_system(mut app:App, mut bus:Bus<protocol::QRPacket>)
|
|||||||
}
|
}
|
||||||
|
|
||||||
match match qr.data {
|
match match qr.data {
|
||||||
QRPacketData::QConn(request) => {
|
QRPacketData::QConn {
|
||||||
|
tx,
|
||||||
|
stream,
|
||||||
|
} => {
|
||||||
let id = app.connections.add(Connection {
|
let id = app.connections.add(Connection {
|
||||||
bus: request.bus_id,
|
|
||||||
stream: request.stream,
|
stream: request.stream,
|
||||||
auth: None,
|
auth: None,
|
||||||
|
|
||||||
@ -58,8 +59,7 @@ pub async fn thread_system(mut app:App, mut bus:Bus<protocol::QRPacket>)
|
|||||||
|
|
||||||
app.log.log(&format!("Connect: {}", id));
|
app.log.log(&format!("Connect: {}", id));
|
||||||
|
|
||||||
bus.send(
|
tx.send(
|
||||||
packet.from,
|
|
||||||
QRPacket::new(id as u32, QRPacketData::RConn)
|
QRPacket::new(id as u32, QRPacketData::RConn)
|
||||||
).await.ok();
|
).await.ok();
|
||||||
|
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::{RwLock, mpsc};
|
||||||
use hyper::upgrade::Upgraded;
|
use hyper::upgrade::Upgraded;
|
||||||
use hyper_util::rt::TokioIo;
|
use hyper_util::rt::TokioIo;
|
||||||
use tokio_tungstenite::WebSocketStream;
|
use tokio_tungstenite::WebSocketStream;
|
||||||
@ -21,28 +21,30 @@ pub async fn handle_ws(ws:WebSocketStream<TokioIo<Upgraded>>, mut args:HttpServi
|
|||||||
let conn_id :u32;
|
let conn_id :u32;
|
||||||
|
|
||||||
let (sink, mut stream) = ws.split();
|
let (sink, mut stream) = ws.split();
|
||||||
|
let (tx, rx) = mpsc::channel::<QRPacket>(1);
|
||||||
let bus_ds = args.bus.mailbox(1).await.unwrap_or(1);
|
|
||||||
|
|
||||||
// Perform connection handshake with data system.
|
// Perform connection handshake with data system.
|
||||||
// - Provide system with connection/bus pairing.
|
// - Provide system with connection/bus pairing.
|
||||||
// - Acquire connection id.
|
// - Acquire connection id.
|
||||||
//
|
//
|
||||||
args.bus.send(bus_ds, QRPacket::new(0, QRPacketData::QConn(LocalPacketConnect {
|
args.tx.send(QRPacket::new(0, QRPacketData::QConn {
|
||||||
bus_id:args.bus.id(),
|
tx:tx.clone(),
|
||||||
stream:Arc::new(RwLock::new(sink)),
|
stream:Arc::new(RwLock::new(sink)),
|
||||||
}))).await?;
|
})).await?;
|
||||||
match args.bus.receive_wait().await {
|
|
||||||
Some(resp) => {
|
if let Some(resp) = rx.blocking_recv() {
|
||||||
let qr = &resp.data;
|
let qr = &resp.data;
|
||||||
match qr.data {
|
match qr.data {
|
||||||
QRPacketData::RConn => { conn_id = qr.id; }
|
QRPacketData::RConn => { conn_id = qr.id; }
|
||||||
_ => { return Err(()); }
|
_ => { return Err(()); }
|
||||||
}
|
|
||||||
}
|
}
|
||||||
None => { return Err(()); }
|
} else {
|
||||||
|
return Err(());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
drop(tx);
|
||||||
|
drop(rx);
|
||||||
|
|
||||||
// Decode client requests from websocket,
|
// Decode client requests from websocket,
|
||||||
// pass requests to data system,
|
// pass requests to data system,
|
||||||
// and return responses to client.
|
// and return responses to client.
|
||||||
@ -55,15 +57,14 @@ pub async fn handle_ws(ws:WebSocketStream<TokioIo<Upgraded>>, mut args:HttpServi
|
|||||||
|
|
||||||
match code {
|
match code {
|
||||||
CODE_HELLO => {
|
CODE_HELLO => {
|
||||||
args.bus.send(
|
args.tx.send(
|
||||||
bus_ds, QRPacket::new(conn_id, QRPacketData::QHello)
|
QRPacket::new(conn_id, QRPacketData::QHello)
|
||||||
).await.ok();
|
).await.ok();
|
||||||
}
|
}
|
||||||
|
|
||||||
CODE_REGISTER => match PacketRegister::decode(&data, &mut index) {
|
CODE_REGISTER => match PacketRegister::decode(&data, &mut index) {
|
||||||
Ok(packet) => {
|
Ok(packet) => {
|
||||||
args.bus.send(
|
args.tx.send(
|
||||||
bus_ds,
|
|
||||||
QRPacket::new(conn_id, QRPacketData::QRegister(packet))
|
QRPacket::new(conn_id, QRPacketData::QRegister(packet))
|
||||||
).await.ok();
|
).await.ok();
|
||||||
}
|
}
|
||||||
@ -72,8 +73,7 @@ pub async fn handle_ws(ws:WebSocketStream<TokioIo<Upgraded>>, mut args:HttpServi
|
|||||||
|
|
||||||
CODE_AUTH => match PacketAuth::decode(&data, &mut index) {
|
CODE_AUTH => match PacketAuth::decode(&data, &mut index) {
|
||||||
Ok(packet) => {
|
Ok(packet) => {
|
||||||
args.bus.send(
|
args.tx.send(
|
||||||
bus_ds,
|
|
||||||
QRPacket::new(conn_id, QRPacketData::QAuth(packet))
|
QRPacket::new(conn_id, QRPacketData::QAuth(packet))
|
||||||
).await.ok();
|
).await.ok();
|
||||||
}
|
}
|
||||||
@ -82,8 +82,7 @@ pub async fn handle_ws(ws:WebSocketStream<TokioIo<Upgraded>>, mut args:HttpServi
|
|||||||
|
|
||||||
CODE_AUTH_RESUME => match PacketAuthResume::decode(&data, &mut index) {
|
CODE_AUTH_RESUME => match PacketAuthResume::decode(&data, &mut index) {
|
||||||
Ok(packet) => {
|
Ok(packet) => {
|
||||||
args.bus.send(
|
args.tx.send(
|
||||||
bus_ds,
|
|
||||||
QRPacket::new(conn_id, QRPacketData::QAuthResume(packet))
|
QRPacket::new(conn_id, QRPacketData::QAuthResume(packet))
|
||||||
).await.ok();
|
).await.ok();
|
||||||
}
|
}
|
||||||
@ -91,16 +90,14 @@ pub async fn handle_ws(ws:WebSocketStream<TokioIo<Upgraded>>, mut args:HttpServi
|
|||||||
}
|
}
|
||||||
|
|
||||||
CODE_AUTH_REVOKE => {
|
CODE_AUTH_REVOKE => {
|
||||||
args.bus.send(
|
args.tx.send(
|
||||||
bus_ds, QRPacket::new(conn_id,
|
QRPacket::new(conn_id, QRPacketData::QAuthRevoke)
|
||||||
QRPacketData::QAuthRevoke)
|
|
||||||
).await.ok();
|
).await.ok();
|
||||||
}
|
}
|
||||||
|
|
||||||
CODE_SESSION_LIST => match PacketSessionList::decode(&data, &mut index) {
|
CODE_SESSION_LIST => match PacketSessionList::decode(&data, &mut index) {
|
||||||
Ok(packet) => {
|
Ok(packet) => {
|
||||||
args.bus.send(
|
args.tx.send(
|
||||||
bus_ds,
|
|
||||||
QRPacket::new(conn_id, QRPacketData::QSessionList(packet))
|
QRPacket::new(conn_id, QRPacketData::QSessionList(packet))
|
||||||
).await.ok();
|
).await.ok();
|
||||||
}
|
}
|
||||||
@ -109,7 +106,7 @@ pub async fn handle_ws(ws:WebSocketStream<TokioIo<Upgraded>>, mut args:HttpServi
|
|||||||
|
|
||||||
/*CODE_SESSION_JOIN => match PacketSessionCreate::decode(&data, &mut index) {
|
/*CODE_SESSION_JOIN => match PacketSessionCreate::decode(&data, &mut index) {
|
||||||
Ok(packet) => {
|
Ok(packet) => {
|
||||||
args.bus.send(
|
args.tx.send(
|
||||||
bus_ds,
|
bus_ds,
|
||||||
QRPacket::new(conn_id, QRPacketData::QSessionCreate(packet))
|
QRPacket::new(conn_id, QRPacketData::QSessionCreate(packet))
|
||||||
).await.ok();
|
).await.ok();
|
||||||
@ -119,8 +116,7 @@ pub async fn handle_ws(ws:WebSocketStream<TokioIo<Upgraded>>, mut args:HttpServi
|
|||||||
|
|
||||||
CODE_SESSION_VIEW => match PacketSessionView::decode(&data, &mut index) {
|
CODE_SESSION_VIEW => match PacketSessionView::decode(&data, &mut index) {
|
||||||
Ok(packet) => {
|
Ok(packet) => {
|
||||||
args.bus.send(
|
args.tx.send(
|
||||||
bus_ds,
|
|
||||||
QRPacket::new(conn_id, QRPacketData::QSessionView(packet))
|
QRPacket::new(conn_id, QRPacketData::QSessionView(packet))
|
||||||
).await.ok();
|
).await.ok();
|
||||||
}
|
}
|
||||||
@ -128,16 +124,14 @@ pub async fn handle_ws(ws:WebSocketStream<TokioIo<Upgraded>>, mut args:HttpServi
|
|||||||
}
|
}
|
||||||
|
|
||||||
CODE_SESSION_LEAVE => {
|
CODE_SESSION_LEAVE => {
|
||||||
args.bus.send(
|
args.tx.send(
|
||||||
bus_ds,
|
|
||||||
QRPacket::new(conn_id, QRPacketData::QSessionLeave)
|
QRPacket::new(conn_id, QRPacketData::QSessionLeave)
|
||||||
).await.ok();
|
).await.ok();
|
||||||
}
|
}
|
||||||
|
|
||||||
CODE_SESSION_RETIRE => match PacketSessionRetire::decode(&data, &mut index) {
|
CODE_SESSION_RETIRE => match PacketSessionRetire::decode(&data, &mut index) {
|
||||||
Ok(packet) => {
|
Ok(packet) => {
|
||||||
args.bus.send(
|
args.tx.send(
|
||||||
bus_ds,
|
|
||||||
QRPacket::new(conn_id, QRPacketData::QSessionResign(packet))
|
QRPacket::new(conn_id, QRPacketData::QSessionResign(packet))
|
||||||
).await.ok();
|
).await.ok();
|
||||||
}
|
}
|
||||||
@ -146,8 +140,7 @@ pub async fn handle_ws(ws:WebSocketStream<TokioIo<Upgraded>>, mut args:HttpServi
|
|||||||
|
|
||||||
CODE_GAME_STATE => match PacketGameState::decode(&data, &mut index) {
|
CODE_GAME_STATE => match PacketGameState::decode(&data, &mut index) {
|
||||||
Ok(packet) => {
|
Ok(packet) => {
|
||||||
args.bus.send(
|
args.tx.send(
|
||||||
bus_ds,
|
|
||||||
QRPacket::new(conn_id, QRPacketData::QGameState(packet))
|
QRPacket::new(conn_id, QRPacketData::QGameState(packet))
|
||||||
).await.ok();
|
).await.ok();
|
||||||
}
|
}
|
||||||
@ -156,8 +149,7 @@ pub async fn handle_ws(ws:WebSocketStream<TokioIo<Upgraded>>, mut args:HttpServi
|
|||||||
|
|
||||||
CODE_GAME_MESSAGE => match PacketGameMessage::decode(&data, &mut index) {
|
CODE_GAME_MESSAGE => match PacketGameMessage::decode(&data, &mut index) {
|
||||||
Ok(packet) => {
|
Ok(packet) => {
|
||||||
args.bus.send(
|
args.tx.send(
|
||||||
bus_ds,
|
|
||||||
QRPacket::new(conn_id, QRPacketData::GameMessage(packet))
|
QRPacket::new(conn_id, QRPacketData::GameMessage(packet))
|
||||||
).await.ok();
|
).await.ok();
|
||||||
}
|
}
|
||||||
@ -166,8 +158,7 @@ pub async fn handle_ws(ws:WebSocketStream<TokioIo<Upgraded>>, mut args:HttpServi
|
|||||||
|
|
||||||
CODE_CHALLENGE => match PacketChallenge::decode(&data, &mut index) {
|
CODE_CHALLENGE => match PacketChallenge::decode(&data, &mut index) {
|
||||||
Ok(packet) => {
|
Ok(packet) => {
|
||||||
args.bus.send(
|
args.tx.send(
|
||||||
bus_ds,
|
|
||||||
QRPacket::new(conn_id, QRPacketData::QChallenge(packet))
|
QRPacket::new(conn_id, QRPacketData::QChallenge(packet))
|
||||||
).await.ok();
|
).await.ok();
|
||||||
}
|
}
|
||||||
@ -176,8 +167,7 @@ pub async fn handle_ws(ws:WebSocketStream<TokioIo<Upgraded>>, mut args:HttpServi
|
|||||||
|
|
||||||
CODE_CHALLENGE_ANSWER => match PacketChallengeAnswer::decode(&data, &mut index) {
|
CODE_CHALLENGE_ANSWER => match PacketChallengeAnswer::decode(&data, &mut index) {
|
||||||
Ok(packet) => {
|
Ok(packet) => {
|
||||||
args.bus.send(
|
args.tx.send(
|
||||||
bus_ds,
|
|
||||||
QRPacket::new(conn_id, QRPacketData::QChallengeAnswer(packet))
|
QRPacket::new(conn_id, QRPacketData::QChallengeAnswer(packet))
|
||||||
).await.ok();
|
).await.ok();
|
||||||
}
|
}
|
||||||
@ -185,23 +175,20 @@ pub async fn handle_ws(ws:WebSocketStream<TokioIo<Upgraded>>, mut args:HttpServi
|
|||||||
}
|
}
|
||||||
|
|
||||||
CODE_CHALLENGE_LIST => {
|
CODE_CHALLENGE_LIST => {
|
||||||
args.bus.send(
|
args.tx.send(
|
||||||
bus_ds,
|
|
||||||
QRPacket::new(conn_id, QRPacketData::QChallengeList)
|
QRPacket::new(conn_id, QRPacketData::QChallengeList)
|
||||||
).await.ok();
|
).await.ok();
|
||||||
}
|
}
|
||||||
|
|
||||||
CODE_USER_LIST => {
|
CODE_USER_LIST => {
|
||||||
args.bus.send(
|
args.tx.send(
|
||||||
bus_ds,
|
|
||||||
QRPacket::new(conn_id, QRPacketData::QUserList)
|
QRPacket::new(conn_id, QRPacketData::QUserList)
|
||||||
).await.ok();
|
).await.ok();
|
||||||
}
|
}
|
||||||
|
|
||||||
CODE_USER_PROFILE => match PacketUserProfile::decode(&data, &mut index) {
|
CODE_USER_PROFILE => match PacketUserProfile::decode(&data, &mut index) {
|
||||||
Ok(packet) => {
|
Ok(packet) => {
|
||||||
args.bus.send(
|
args.tx.send(
|
||||||
bus_ds,
|
|
||||||
QRPacket::new(conn_id, QRPacketData::QUserProfile(packet))
|
QRPacket::new(conn_id, QRPacketData::QUserProfile(packet))
|
||||||
).await.ok();
|
).await.ok();
|
||||||
}
|
}
|
||||||
@ -209,15 +196,13 @@ pub async fn handle_ws(ws:WebSocketStream<TokioIo<Upgraded>>, mut args:HttpServi
|
|||||||
}
|
}
|
||||||
|
|
||||||
CODE_INVITE_LIST => {
|
CODE_INVITE_LIST => {
|
||||||
args.bus.send(
|
args.tx.send(
|
||||||
bus_ds,
|
|
||||||
QRPacket::new(conn_id, QRPacketData::QInviteList)
|
QRPacket::new(conn_id, QRPacketData::QInviteList)
|
||||||
).await.ok();
|
).await.ok();
|
||||||
}
|
}
|
||||||
|
|
||||||
CODE_INVITE_ACQUIRE => {
|
CODE_INVITE_ACQUIRE => {
|
||||||
args.bus.send(
|
args.tx.send(
|
||||||
bus_ds,
|
|
||||||
QRPacket::new(conn_id, QRPacketData::QInviteAcquire)
|
QRPacket::new(conn_id, QRPacketData::QInviteAcquire)
|
||||||
).await.ok();
|
).await.ok();
|
||||||
}
|
}
|
||||||
@ -242,6 +227,6 @@ pub async fn handle_ws(ws:WebSocketStream<TokioIo<Upgraded>>, mut args:HttpServi
|
|||||||
None => false,
|
None => false,
|
||||||
} { }
|
} { }
|
||||||
|
|
||||||
args.bus.send(bus_ds, QRPacket::new(conn_id, QRPacketData::QDisconn)).await.ok();
|
args.tx.send(QRPacket::new(conn_id, QRPacketData::QDisconn)).await.ok();
|
||||||
Ok(())
|
Ok(())
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,12 @@
|
|||||||
#![allow(dead_code)]
|
#![allow(dead_code)]
|
||||||
|
|
||||||
|
use std::sync::Arc;
|
||||||
|
use tokio::sync::{mpsc::Sender, RwLock};
|
||||||
|
use futures::stream::SplitSink;
|
||||||
|
use hyper::upgrade::Upgraded;
|
||||||
|
use hyper_util::rt::TokioIo;
|
||||||
|
use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
|
||||||
|
|
||||||
pub mod code; pub use code::*;
|
pub mod code; pub use code::*;
|
||||||
pub mod packet; pub use packet::*;
|
pub mod packet; pub use packet::*;
|
||||||
|
|
||||||
@ -7,7 +14,10 @@ pub mod packet; pub use packet::*;
|
|||||||
pub enum QRPacketData {
|
pub enum QRPacketData {
|
||||||
None,
|
None,
|
||||||
|
|
||||||
QConn(LocalPacketConnect),
|
QConn {
|
||||||
|
tx:Sender<QRPacket>,
|
||||||
|
stream:Arc<RwLock<SplitSink<WebSocketStream<TokioIo<Upgraded>>, Message>>>,
|
||||||
|
},
|
||||||
RConn,
|
RConn,
|
||||||
|
|
||||||
QDisconn,
|
QDisconn,
|
||||||
|
@ -1,12 +1,14 @@
|
|||||||
use std::sync::Arc;
|
use std::sync::Arc;
|
||||||
use tokio::sync::RwLock;
|
use tokio::sync::{mpsc::Sender, RwLock};
|
||||||
use futures::stream::SplitSink;
|
use futures::stream::SplitSink;
|
||||||
use hyper::upgrade::Upgraded;
|
use hyper::upgrade::Upgraded;
|
||||||
use hyper_util::rt::TokioIo;
|
use hyper_util::rt::TokioIo;
|
||||||
use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
|
use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
|
||||||
|
|
||||||
|
use crate::protocol::QRPacket;
|
||||||
|
|
||||||
#[derive(Clone)]
|
#[derive(Clone)]
|
||||||
pub struct LocalPacketConnect {
|
pub struct LocalPacketConnect {
|
||||||
pub bus_id:u32,
|
pub tx:Sender<QRPacket>,
|
||||||
pub stream:Arc<RwLock<SplitSink<WebSocketStream<TokioIo<Upgraded>>, Message>>>,
|
pub stream:Arc<RwLock<SplitSink<WebSocketStream<TokioIo<Upgraded>>, Message>>>,
|
||||||
}
|
}
|
||||||
|
@ -1,5 +1,5 @@
|
|||||||
mod hello; pub use hello::*;
|
mod hello; pub use hello::*;
|
||||||
mod connect; pub use connect::*;
|
//mod connect; pub use connect::*;
|
||||||
|
|
||||||
mod register; pub use register::*;
|
mod register; pub use register::*;
|
||||||
mod auth; pub use auth::*;
|
mod auth; pub use auth::*;
|
||||||
|
Loading…
x
Reference in New Issue
Block a user