From cd6db8391b5092031bbe2678d7a9ccc314a8e7f6 Mon Sep 17 00:00:00 2001 From: yukirij Date: Thu, 8 Aug 2024 01:49:16 -0700 Subject: [PATCH] Fix server websocket. --- server/Cargo.toml | 1 + server/src/main.rs | 83 ++++++++++++++++++++++++++++++---------------- www/.js | 14 ++++---- 3 files changed, 62 insertions(+), 36 deletions(-) diff --git a/server/Cargo.toml b/server/Cargo.toml index b0eb50a..516f769 100644 --- a/server/Cargo.toml +++ b/server/Cargo.toml @@ -9,6 +9,7 @@ tokio-stream = "0.1.15" tokio-tungstenite = "0.23.1" tokio-rustls = "0.26.0" tokio-util = { version = "0.7.11", features = ["compat"] } +hyper-tungstenite = "0.14.0" rustls = "0.23.5" rustls-pemfile = "2.1.2" webpki-roots = "0.26" diff --git a/server/src/main.rs b/server/src/main.rs index 6ea2e23..ac475ac 100644 --- a/server/src/main.rs +++ b/server/src/main.rs @@ -8,13 +8,13 @@ mod system; mod protocol; use app::App; -use hyper::body::Bytes; +use hyper::{body::Bytes, upgrade::Upgraded}; use system::{cache::WebCache, net::Stream}; use tokio_stream::StreamExt; use tokio_tungstenite::WebSocketStream; use hyper_util::rt::TokioIo; -use tokio_rustls::server::TlsStream; -use tokio::net::TcpStream; +//use tokio_rustls::server::TlsStream; +//use tokio::net::TcpStream; async fn thread_datasystem(mut _app:App, bus:Bus) { @@ -51,7 +51,7 @@ struct HttpServiceArgs { cache:WebCache, } -async fn handle_ws(mut ws_stream:WebSocketStream>, args:HttpServiceArgs) +async fn handle_ws(mut ws_stream:WebSocketStream>, args:HttpServiceArgs) { use tokio_tungstenite::tungstenite::protocol::Message; use game::util::unpack_u16; @@ -105,12 +105,12 @@ async fn handle_ws(mut ws_stream:WebSocketStream>, args:Htt ws_stream.close(None).await.ok(); } -async fn service_http(request:hyper::Request, args:HttpServiceArgs) -> Result>, std::convert::Infallible> +async fn service_http(mut request:hyper::Request, args:HttpServiceArgs) -> Result>, std::convert::Infallible> { - use hyper::{Response, body::Bytes, header::{CONTENT_TYPE, CACHE_CONTROL, UPGRADE}}; + use hyper::{Response, body::Bytes, header::{CONTENT_TYPE, CACHE_CONTROL}}; //SEC_WEBSOCKET_ACCEPT, SEC_WEBSOCKET_KEY, UPGRADE use http_body_util::Full; - use tokio_tungstenite::accept_async; - use tokio_tungstenite::tungstenite::handshake::server::create_response_with_body; + //use tokio_tungstenite::accept_async; + //use tokio_tungstenite::tungstenite::handshake::derive_accept_key; println!("Serving: {}", request.uri().path()); @@ -130,30 +130,55 @@ async fn service_http(request:hyper::Request, args:HttpSe .body(Full::new(Bytes::from(args.cache.favicon()))).unwrap()), _ => { - if request.headers().get(UPGRADE).map(|h| h == "websocket").unwrap_or(false) { - let response = create_response_with_body(&request, || Full::new(Bytes::new())).unwrap(); + if hyper_tungstenite::is_upgrade_request(&request) { + //if request.headers().get(UPGRADE).map(|h| h == "websocket").unwrap_or(false) { + //let response = create_response_with_body(&request, || Full::new(Bytes::new())).unwrap(); - tokio::task::spawn(async move { - match hyper::upgrade::on(request).await { - Ok(upgraded) => { - match upgraded.downcast::>>() { - Ok(parts) => { - match accept_async(parts.io.into_inner()).await { - Ok(ws_stream) => { - println!("here"); - handle_ws(ws_stream, args).await - } - Err(e) => { println!("ws not accepted: {}", e.to_string()); } - } - } - Err(_) => { println!("transfer error"); } - } + //let key = request.headers().get(SEC_WEBSOCKET_KEY) + // .and_then(|v| v.to_str().ok()) + // .map(|k| derive_accept_key(k.as_bytes())) + // .unwrap_or_default(); + + if let Ok((response, websocket)) = hyper_tungstenite::upgrade(&mut request, None) { + tokio::task::spawn(async move { + match websocket.await { + Ok(websocket) => handle_ws(websocket, args).await, + Err(_) => { } } - Err(e) => { println!("upgrade error: {}", e.to_string()); } - } - }); + //match hyper::upgrade::on(request).await { + /*Ok(upgraded) => { + match upgraded.downcast::>>() { + Ok(parts) => { + match accept_async(parts.io.into_inner()).await { + Ok(ws_stream) => { + println!("here"); + handle_ws(ws_stream, args).await + } + Err(e) => { println!("ws not accepted: {}", e.to_string()); } + } + } + Err(_) => { println!("transfer error"); } + } + } + Er(e) => { println!("upgrade error: {}", e.to_string()); }*/ + //} + }); - Ok(response) + Ok(response) + } else { + Ok(Response::builder() + .status(401) + .body(Full::new(Bytes::new())) + .unwrap()) + } + + //Ok(Response::builder() + // .status(101) + // .header(CONNECTION, "Upgrade") + // .header(UPGRADE, "websocket") + // .header(SEC_WEBSOCKET_ACCEPT, key) + // .body(Full::new(Bytes::new())) + // .unwrap()) } else { Ok(Response::builder() .header(CONTENT_TYPE, "text/html") diff --git a/www/.js b/www/.js index f938ef9..08de2b4 100644 --- a/www/.js +++ b/www/.js @@ -85,20 +85,20 @@ function RECONNECT() { console.log("Connecting.."); SOCKET = new WebSocket("wss://omen.kirisame.com:38612"); SOCKET.binaryType = 'blob'; - SOCKET.addEventListener("error", function(e) { - console.log("Failed"); + SOCKET.addEventListener("error", (event) => { + console.log("Failed: " + event.reason); SOCKET = null; }); - SOCKET.addEventListener("open", function(e) { + SOCKET.addEventListener("open", (event) => { if(SOCKET.readyState === WebSocket.OPEN) { console.log("Connected."); - SOCKET.addEventListener("message", function(e) { - MESSAGE(e.data); + SOCKET.addEventListener("message", (event) => { + MESSAGE(event.data); }); - SOCKET.addEventListener("close", function(e) { - console.log("Closed."); + SOCKET.addEventListener("close", (event) => { + console.log("Closed (" + event.wasClean + ":" + event.code + "): " + event.reason); SOCKET = null; RECONNECT(); });