Merge pull request 'is5' (#6) from is5 into main

Reviewed-on: #6
This commit is contained in:
yukirij 2024-12-17 16:33:03 -05:00
commit f34f695098
9 changed files with 290 additions and 313 deletions

View File

@ -12,7 +12,6 @@ hyper-tungstenite = "0.14.0"
rustls = "0.23.5" rustls = "0.23.5"
rustls-pemfile = "2.1.2" rustls-pemfile = "2.1.2"
webpki-roots = "0.26" webpki-roots = "0.26"
opaque-ke = "2.0.0"
hyper = { version = "1.4.1", features = ["full"] } hyper = { version = "1.4.1", features = ["full"] }
hyper-util = { version = "0.1.7", features = ["tokio"] } hyper-util = { version = "0.1.7", features = ["tokio"] }
http-body-util = "0.1.2" http-body-util = "0.1.2"
@ -26,7 +25,6 @@ regex = "1.11.0"
game = { path = "../game" } game = { path = "../game" }
bus = { git = "https://git.tsukiyo.org/Utility/bus" }
sparse = { git = "https://git.tsukiyo.org/Utility/sparse" } sparse = { git = "https://git.tsukiyo.org/Utility/sparse" }
trie = { git = "https://git.tsukiyo.org/Utility/trie" } trie = { git = "https://git.tsukiyo.org/Utility/trie" }
pool = { git = "https://git.tsukiyo.org/Utility/pool" } pool = { git = "https://git.tsukiyo.org/Utility/pool" }

View File

@ -14,7 +14,6 @@ type StreamType = Arc<RwLock<SplitSink<WebSocketStream<TokioIo<Upgraded>>, Messa
#[derive(Clone)] #[derive(Clone)]
pub struct Connection { pub struct Connection {
pub bus:u32,
pub stream:StreamType, pub stream:StreamType,
pub auth:Option<AuthToken>, pub auth:Option<AuthToken>,

View File

@ -160,7 +160,9 @@ impl App {
use tokio_tungstenite::tungstenite::Message; use tokio_tungstenite::tungstenite::Message;
use futures::SinkExt; use futures::SinkExt;
if match response.data { QRPacketData::None => false, _ => true } { match response.data {
QRPacketData::None => { }
_ => {
if let Some(conn) = self.connections.get(response.id as usize) { if let Some(conn) = self.connections.get(response.id as usize) {
let mut socket = conn.stream.write().await; let mut socket = conn.stream.write().await;
@ -260,6 +262,7 @@ impl App {
} }
} }
} }
}
pub async fn send_session_spectators(&mut self, token:SessionToken) pub async fn send_session_spectators(&mut self, token:SessionToken)
{ {

View File

@ -1,8 +1,6 @@
use std::net::SocketAddr; use std::net::SocketAddr;
use std::path::Path; use std::path::Path;
use bus::Bus;
mod config; mod config;
mod util; mod util;
mod app; mod app;
@ -13,6 +11,7 @@ mod manager;
use app::App; use app::App;
use hyper::body::Bytes; use hyper::body::Bytes;
use hyper_util::rt::TokioIo; use hyper_util::rt::TokioIo;
use tokio::sync::mpsc::{self, Sender};
use system::{ use system::{
cache::WebCache, cache::WebCache,
net::{ net::{
@ -21,10 +20,11 @@ use system::{
tls::*, tls::*,
}, },
}; };
use protocol::QRPacket;
#[derive(Clone)] #[derive(Clone)]
struct HttpServiceArgs { struct HttpServiceArgs {
bus:Bus<protocol::QRPacket>, tx:Sender<QRPacket>,
cache:WebCache, cache:WebCache,
} }
@ -166,18 +166,10 @@ async fn main()
/* /*
** Initialize central bus and data serivce. ** Initialize central bus and data serivce.
*/ */
let b_main :Bus<protocol::QRPacket> = Bus::new_as(bus::Mode::Transmitter); let (data_tx, data_rx) = mpsc::channel::<QRPacket>(64);
match b_main.connect() {
Ok(bus) => {
tokio::spawn(async move { tokio::spawn(async move {
manager::thread_system(app, bus).await; manager::thread_system(app, data_rx).await;
}); });
}
Err(_) => {
println!("fatal: failed to initialize bus.");
return;
}
}
/* /*
@ -204,9 +196,6 @@ async fn main()
} }
js_asset_data += "}};"; js_asset_data += "}};";
match b_main.connect() {
Ok(bus) => {
/* /*
** Cache source files. ** Cache source files.
*/ */
@ -276,11 +265,11 @@ async fn main()
let mut tcp_server = TcpServer::new(); let mut tcp_server = TcpServer::new();
match tcp_server.bind("127.0.0.1:38611").await { match tcp_server.bind("127.0.0.1:38611").await {
Ok(_) => { Ok(_) => {
let b = bus.connect().unwrap();
let c = cache.clone(); let c = cache.clone();
let data_tx = data_tx.clone();
tokio::spawn(async move { tokio::spawn(async move {
while tcp_server.accept(handle_tcp, HttpServiceArgs { while tcp_server.accept(handle_tcp, HttpServiceArgs {
bus:b.connect().unwrap(), tx:data_tx.clone(),
cache:c.clone(), cache:c.clone(),
}).await.is_ok() { } }).await.is_ok() { }
}); });
@ -301,11 +290,11 @@ async fn main()
} }
match tls_server.bind("0.0.0.0:38612").await { match tls_server.bind("0.0.0.0:38612").await {
Ok(_) => { Ok(_) => {
let b = bus.connect().unwrap();
let c = cache.clone(); let c = cache.clone();
let data_tx = data_tx.clone();
tokio::spawn(async move { tokio::spawn(async move {
while tls_server.accept(handle_tls, HttpServiceArgs { while tls_server.accept(handle_tls, HttpServiceArgs {
bus:b.connect().unwrap(), tx:data_tx.clone(),
cache:c.clone(), cache:c.clone(),
}).await.is_ok() { } }).await.is_ok() { }
}); });
@ -314,11 +303,6 @@ async fn main()
println!("error: failed to bind TLS port 38612."); println!("error: failed to bind TLS port 38612.");
} }
} }
}
Err(_) => {
println!("error: failed to initialize HTTPS service.");
}
}
loop { tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; } loop { tokio::time::sleep(tokio::time::Duration::from_secs(10)).await; }
} }

View File

@ -1,4 +1,4 @@
use bus::Bus; use tokio::sync::mpsc::Receiver;
use game::history::Play; use game::history::Play;
use crate::{ use crate::{
config, config,
@ -10,13 +10,12 @@ use crate::{
session::{Session, SessionFilter}, session::{Session, SessionFilter},
context::{self, Context}, context::{self, Context},
}, },
protocol, protocol::*,
}; };
pub async fn thread_system(mut app:App, bus:Bus<protocol::QRPacket>) pub async fn thread_system(mut app:App, mut rx:Receiver<QRPacket>)
{ {
use futures::SinkExt; use futures::SinkExt;
use protocol::*;
use ring::rand::{SecureRandom, SystemRandom}; use ring::rand::{SecureRandom, SystemRandom};
let rng = SystemRandom::new(); let rng = SystemRandom::new();
@ -24,8 +23,7 @@ pub async fn thread_system(mut app:App, bus:Bus<protocol::QRPacket>)
let mut send_user_status = Vec::<u32>::new(); let mut send_user_status = Vec::<u32>::new();
while let Some(packet) = bus.receive_wait() { while let Some(qr) = rx.recv().await {
let qr = packet.data;
let mut user_id = None; let mut user_id = None;
let mut context = Context::None; let mut context = Context::None;
@ -40,10 +38,12 @@ pub async fn thread_system(mut app:App, bus:Bus<protocol::QRPacket>)
} }
match match qr.data { match match qr.data {
QRPacketData::QConn(request) => { QRPacketData::QConn {
tx,
stream,
} => {
let id = app.connections.add(Connection { let id = app.connections.add(Connection {
bus: request.bus_id, stream,
stream: request.stream,
auth: None, auth: None,
context:Context::None, context:Context::None,
@ -58,10 +58,9 @@ pub async fn thread_system(mut app:App, bus:Bus<protocol::QRPacket>)
app.log.log(&format!("Connect: {}", id)); app.log.log(&format!("Connect: {}", id));
bus.send( tx.send(
packet.from,
QRPacket::new(id as u32, QRPacketData::RConn) QRPacket::new(id as u32, QRPacketData::RConn)
).ok(); ).await.ok();
Some(QRPacket::new(0, QRPacketData::None)) Some(QRPacket::new(0, QRPacketData::None))
} }
@ -1053,10 +1052,8 @@ pub async fn thread_system(mut app:App, bus:Bus<protocol::QRPacket>)
} }
} }
fn generate_game_state(app:&App, session:&Session) -> protocol::PacketGameStateResponse fn generate_game_state(app:&App, session:&Session) -> PacketGameStateResponse
{ {
use protocol::PacketGameStateResponse;
let mut response = PacketGameStateResponse::new(); let mut response = PacketGameStateResponse::new();
response.token = session.token; response.token = session.token;

View File

@ -1,5 +1,5 @@
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::RwLock; use tokio::sync::{RwLock, mpsc};
use hyper::upgrade::Upgraded; use hyper::upgrade::Upgraded;
use hyper_util::rt::TokioIo; use hyper_util::rt::TokioIo;
use tokio_tungstenite::WebSocketStream; use tokio_tungstenite::WebSocketStream;
@ -21,27 +21,28 @@ pub async fn handle_ws(ws:WebSocketStream<TokioIo<Upgraded>>, args:HttpServiceAr
let conn_id :u32; let conn_id :u32;
let (sink, mut stream) = ws.split(); let (sink, mut stream) = ws.split();
let (tx, mut rx) = mpsc::channel::<QRPacket>(1);
let bus_ds = args.bus.mailbox(1).unwrap_or(1);
// Perform connection handshake with data system. // Perform connection handshake with data system.
// - Provide system with connection/bus pairing. // - Provide system with connection/bus pairing.
// - Acquire connection id. // - Acquire connection id.
// //
args.bus.send(bus_ds, QRPacket::new(0, QRPacketData::QConn(LocalPacketConnect { args.tx.send(QRPacket::new(0, QRPacketData::QConn {
bus_id:args.bus.id(), tx:tx.clone(),
stream:Arc::new(RwLock::new(sink)), stream:Arc::new(RwLock::new(sink)),
})))?; })).await.map_err(|_| ())?;
match args.bus.receive_wait() {
Some(resp) => { if let Some(qr) = rx.recv().await {
let qr = &resp.data; match &qr.data {
match qr.data {
QRPacketData::RConn => { conn_id = qr.id; } QRPacketData::RConn => { conn_id = qr.id; }
_ => { return Err(()); } _ => { return Err(()); }
} }
} else {
return Err(());
} }
None => { return Err(()); }
} drop(tx);
drop(rx);
// Decode client requests from websocket, // Decode client requests from websocket,
// pass requests to data system, // pass requests to data system,
@ -55,171 +56,154 @@ pub async fn handle_ws(ws:WebSocketStream<TokioIo<Upgraded>>, args:HttpServiceAr
match code { match code {
CODE_HELLO => { CODE_HELLO => {
args.bus.send( args.tx.send(
bus_ds, QRPacket::new(conn_id, QRPacketData::QHello) QRPacket::new(conn_id, QRPacketData::QHello)
).ok(); ).await.ok();
} }
CODE_REGISTER => match PacketRegister::decode(&data, &mut index) { CODE_REGISTER => match PacketRegister::decode(&data, &mut index) {
Ok(packet) => { Ok(packet) => {
args.bus.send( args.tx.send(
bus_ds,
QRPacket::new(conn_id, QRPacketData::QRegister(packet)) QRPacket::new(conn_id, QRPacketData::QRegister(packet))
).ok(); ).await.ok();
} }
Err(_) => { println!("error: packet decode failed."); } Err(_) => { println!("error: packet decode failed."); }
} }
CODE_AUTH => match PacketAuth::decode(&data, &mut index) { CODE_AUTH => match PacketAuth::decode(&data, &mut index) {
Ok(packet) => { Ok(packet) => {
args.bus.send( args.tx.send(
bus_ds,
QRPacket::new(conn_id, QRPacketData::QAuth(packet)) QRPacket::new(conn_id, QRPacketData::QAuth(packet))
).ok(); ).await.ok();
} }
Err(_) => { println!("error: packet decode failed."); } Err(_) => { println!("error: packet decode failed."); }
} }
CODE_AUTH_RESUME => match PacketAuthResume::decode(&data, &mut index) { CODE_AUTH_RESUME => match PacketAuthResume::decode(&data, &mut index) {
Ok(packet) => { Ok(packet) => {
args.bus.send( args.tx.send(
bus_ds,
QRPacket::new(conn_id, QRPacketData::QAuthResume(packet)) QRPacket::new(conn_id, QRPacketData::QAuthResume(packet))
).ok(); ).await.ok();
} }
Err(_) => { println!("error: packet decode failed."); } Err(_) => { println!("error: packet decode failed."); }
} }
CODE_AUTH_REVOKE => { CODE_AUTH_REVOKE => {
args.bus.send( args.tx.send(
bus_ds, QRPacket::new(conn_id, QRPacket::new(conn_id, QRPacketData::QAuthRevoke)
QRPacketData::QAuthRevoke) ).await.ok();
).ok();
} }
CODE_SESSION_LIST => match PacketSessionList::decode(&data, &mut index) { CODE_SESSION_LIST => match PacketSessionList::decode(&data, &mut index) {
Ok(packet) => { Ok(packet) => {
args.bus.send( args.tx.send(
bus_ds,
QRPacket::new(conn_id, QRPacketData::QSessionList(packet)) QRPacket::new(conn_id, QRPacketData::QSessionList(packet))
).ok(); ).await.ok();
} }
Err(_) => { println!("error: packet decode failed."); } Err(_) => { println!("error: packet decode failed."); }
} }
/*CODE_SESSION_JOIN => match PacketSessionCreate::decode(&data, &mut index) { /*CODE_SESSION_JOIN => match PacketSessionCreate::decode(&data, &mut index) {
Ok(packet) => { Ok(packet) => {
args.bus.send( args.tx.send(
bus_ds, bus_ds,
QRPacket::new(conn_id, QRPacketData::QSessionCreate(packet)) QRPacket::new(conn_id, QRPacketData::QSessionCreate(packet))
).ok(); ).await.ok();
} }
Err(_) => { println!("error: packet decode failed."); } Err(_) => { println!("error: packet decode failed."); }
}*/ }*/
CODE_SESSION_VIEW => match PacketSessionView::decode(&data, &mut index) { CODE_SESSION_VIEW => match PacketSessionView::decode(&data, &mut index) {
Ok(packet) => { Ok(packet) => {
args.bus.send( args.tx.send(
bus_ds,
QRPacket::new(conn_id, QRPacketData::QSessionView(packet)) QRPacket::new(conn_id, QRPacketData::QSessionView(packet))
).ok(); ).await.ok();
} }
Err(_) => { println!("error: packet decode failed."); } Err(_) => { println!("error: packet decode failed."); }
} }
CODE_SESSION_LEAVE => { CODE_SESSION_LEAVE => {
args.bus.send( args.tx.send(
bus_ds,
QRPacket::new(conn_id, QRPacketData::QSessionLeave) QRPacket::new(conn_id, QRPacketData::QSessionLeave)
).ok(); ).await.ok();
} }
CODE_SESSION_RETIRE => match PacketSessionRetire::decode(&data, &mut index) { CODE_SESSION_RETIRE => match PacketSessionRetire::decode(&data, &mut index) {
Ok(packet) => { Ok(packet) => {
args.bus.send( args.tx.send(
bus_ds,
QRPacket::new(conn_id, QRPacketData::QSessionResign(packet)) QRPacket::new(conn_id, QRPacketData::QSessionResign(packet))
).ok(); ).await.ok();
} }
Err(_) => { println!("error: packet decode failed."); } Err(_) => { println!("error: packet decode failed."); }
} }
CODE_GAME_STATE => match PacketGameState::decode(&data, &mut index) { CODE_GAME_STATE => match PacketGameState::decode(&data, &mut index) {
Ok(packet) => { Ok(packet) => {
args.bus.send( args.tx.send(
bus_ds,
QRPacket::new(conn_id, QRPacketData::QGameState(packet)) QRPacket::new(conn_id, QRPacketData::QGameState(packet))
).ok(); ).await.ok();
} }
Err(_) => { println!("error: packet decode failed."); } Err(_) => { println!("error: packet decode failed."); }
} }
CODE_GAME_MESSAGE => match PacketGameMessage::decode(&data, &mut index) { CODE_GAME_MESSAGE => match PacketGameMessage::decode(&data, &mut index) {
Ok(packet) => { Ok(packet) => {
args.bus.send( args.tx.send(
bus_ds,
QRPacket::new(conn_id, QRPacketData::GameMessage(packet)) QRPacket::new(conn_id, QRPacketData::GameMessage(packet))
).ok(); ).await.ok();
} }
Err(_) => { println!("error: packet decode failed."); } Err(_) => { println!("error: packet decode failed."); }
} }
CODE_CHALLENGE => match PacketChallenge::decode(&data, &mut index) { CODE_CHALLENGE => match PacketChallenge::decode(&data, &mut index) {
Ok(packet) => { Ok(packet) => {
args.bus.send( args.tx.send(
bus_ds,
QRPacket::new(conn_id, QRPacketData::QChallenge(packet)) QRPacket::new(conn_id, QRPacketData::QChallenge(packet))
).ok(); ).await.ok();
} }
Err(_) => { println!("error: packet decode failed."); } Err(_) => { println!("error: packet decode failed."); }
} }
CODE_CHALLENGE_ANSWER => match PacketChallengeAnswer::decode(&data, &mut index) { CODE_CHALLENGE_ANSWER => match PacketChallengeAnswer::decode(&data, &mut index) {
Ok(packet) => { Ok(packet) => {
args.bus.send( args.tx.send(
bus_ds,
QRPacket::new(conn_id, QRPacketData::QChallengeAnswer(packet)) QRPacket::new(conn_id, QRPacketData::QChallengeAnswer(packet))
).ok(); ).await.ok();
} }
Err(_) => { println!("error: packet decode failed."); } Err(_) => { println!("error: packet decode failed."); }
} }
CODE_CHALLENGE_LIST => { CODE_CHALLENGE_LIST => {
args.bus.send( args.tx.send(
bus_ds,
QRPacket::new(conn_id, QRPacketData::QChallengeList) QRPacket::new(conn_id, QRPacketData::QChallengeList)
).ok(); ).await.ok();
} }
CODE_USER_LIST => { CODE_USER_LIST => {
args.bus.send( args.tx.send(
bus_ds,
QRPacket::new(conn_id, QRPacketData::QUserList) QRPacket::new(conn_id, QRPacketData::QUserList)
).ok(); ).await.ok();
} }
CODE_USER_PROFILE => match PacketUserProfile::decode(&data, &mut index) { CODE_USER_PROFILE => match PacketUserProfile::decode(&data, &mut index) {
Ok(packet) => { Ok(packet) => {
args.bus.send( args.tx.send(
bus_ds,
QRPacket::new(conn_id, QRPacketData::QUserProfile(packet)) QRPacket::new(conn_id, QRPacketData::QUserProfile(packet))
).ok(); ).await.ok();
} }
Err(_) => { println!("error: packet decode failed."); } Err(_) => { println!("error: packet decode failed."); }
} }
CODE_INVITE_LIST => { CODE_INVITE_LIST => {
args.bus.send( args.tx.send(
bus_ds,
QRPacket::new(conn_id, QRPacketData::QInviteList) QRPacket::new(conn_id, QRPacketData::QInviteList)
).ok(); ).await.ok();
} }
CODE_INVITE_ACQUIRE => { CODE_INVITE_ACQUIRE => {
args.bus.send( args.tx.send(
bus_ds,
QRPacket::new(conn_id, QRPacketData::QInviteAcquire) QRPacket::new(conn_id, QRPacketData::QInviteAcquire)
).ok(); ).await.ok();
} }
_ => { } _ => { }
@ -242,6 +226,6 @@ pub async fn handle_ws(ws:WebSocketStream<TokioIo<Upgraded>>, args:HttpServiceAr
None => false, None => false,
} { } } { }
args.bus.send(bus_ds, QRPacket::new(conn_id, QRPacketData::QDisconn)).ok(); args.tx.send(QRPacket::new(conn_id, QRPacketData::QDisconn)).await.ok();
Ok(()) Ok(())
} }

View File

@ -1,5 +1,12 @@
#![allow(dead_code)] #![allow(dead_code)]
use std::sync::Arc;
use tokio::sync::{mpsc::Sender, RwLock};
use futures::stream::SplitSink;
use hyper::upgrade::Upgraded;
use hyper_util::rt::TokioIo;
use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
pub mod code; pub use code::*; pub mod code; pub use code::*;
pub mod packet; pub use packet::*; pub mod packet; pub use packet::*;
@ -7,7 +14,10 @@ pub mod packet; pub use packet::*;
pub enum QRPacketData { pub enum QRPacketData {
None, None,
QConn(LocalPacketConnect), QConn {
tx:Sender<QRPacket>,
stream:Arc<RwLock<SplitSink<WebSocketStream<TokioIo<Upgraded>>, Message>>>,
},
RConn, RConn,
QDisconn, QDisconn,

View File

@ -1,12 +1,14 @@
use std::sync::Arc; use std::sync::Arc;
use tokio::sync::RwLock; use tokio::sync::{mpsc::Sender, RwLock};
use futures::stream::SplitSink; use futures::stream::SplitSink;
use hyper::upgrade::Upgraded; use hyper::upgrade::Upgraded;
use hyper_util::rt::TokioIo; use hyper_util::rt::TokioIo;
use tokio_tungstenite::{tungstenite::Message, WebSocketStream}; use tokio_tungstenite::{tungstenite::Message, WebSocketStream};
use crate::protocol::QRPacket;
#[derive(Clone)] #[derive(Clone)]
pub struct LocalPacketConnect { pub struct LocalPacketConnect {
pub bus_id:u32, pub tx:Sender<QRPacket>,
pub stream:Arc<RwLock<SplitSink<WebSocketStream<TokioIo<Upgraded>>, Message>>>, pub stream:Arc<RwLock<SplitSink<WebSocketStream<TokioIo<Upgraded>>, Message>>>,
} }

View File

@ -1,5 +1,5 @@
mod hello; pub use hello::*; mod hello; pub use hello::*;
mod connect; pub use connect::*; //mod connect; pub use connect::*;
mod register; pub use register::*; mod register; pub use register::*;
mod auth; pub use auth::*; mod auth; pub use auth::*;