Replies: 1 comment
-
Through debugging, it is found that HTTP/2 CONNECT is treated as HTTP/1.1 CONNECT and the request path is processed. Comment this part of the code and the handshake is normal: Here are the source code examples of axum server and two clients, which can be directly cloned and run: https://github.com/0x676e67/http2-websocket-client
//! Run with
//!
//! ```not_rust
//! cargo run --bin server
//! ```
use axum::{
extract::{
ws::{self, WebSocketUpgrade},
State,
},
http::Version,
routing::any,
Router,
};
use std::{net::SocketAddr, path::PathBuf};
use tokio::sync::broadcast;
use tower_http::services::ServeDir;
use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
#[tokio::main]
async fn main() {
tracing_subscriber::registry()
.with(
tracing_subscriber::EnvFilter::try_from_default_env()
.unwrap_or_else(|_| format!("{}=debug", env!("CARGO_CRATE_NAME")).into()),
)
.with(tracing_subscriber::fmt::layer())
.init();
let assets_dir = PathBuf::from(env!("CARGO_MANIFEST_DIR")).join("assets");
// build our application with some routes and a broadcast channel
let app = Router::new()
.fallback_service(ServeDir::new(assets_dir).append_index_html_on_directories(true))
.route("/ws", any(ws_handler))
.with_state(broadcast::channel::<String>(16).0);
let addr = SocketAddr::from(([127, 0, 0, 1], 3000));
tracing::debug!("listening on {}", addr);
let mut server = axum_server::bind(addr);
// IMPORTANT: This is required to advertise our support for HTTP/2 websockets to the client.
// If you use axum::serve, it is enabled by default.
server.http_builder().http2().enable_connect_protocol();
server.serve(app.into_make_service()).await.unwrap();
}
async fn ws_handler(
ws: WebSocketUpgrade,
version: Version,
State(sender): State<broadcast::Sender<String>>,
) -> axum::response::Response {
tracing::debug!("accepted a WebSocket using {version:?}");
let mut receiver = sender.subscribe();
ws.on_upgrade(|mut ws| async move {
loop {
tokio::select! {
// Since `ws` is a `Stream`, it is by nature cancel-safe.
res = ws.recv() => {
match res {
Some(Ok(ws::Message::Text(s))) => {
let _ = sender.send(s.to_string());
}
Some(Ok(_)) => {}
Some(Err(e)) => tracing::debug!("client disconnected abruptly: {e}"),
None => break,
}
}
// Tokio guarantees that `broadcast::Receiver::recv` is cancel-safe.
res = receiver.recv() => {
match res {
Ok(msg) => if let Err(e) = ws.send(ws::Message::Text(msg.into())).await {
tracing::debug!("client disconnected abruptly: {e}");
}
Err(_) => continue,
}
}
}
}
})
}
use bytes::Bytes;
use futures::{SinkExt, StreamExt};
use http::{Method, Request};
use http_body_util::BodyExt as _;
use hyper_util::rt::{TokioExecutor, TokioIo};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio_tungstenite::tungstenite::protocol;
use tokio_tungstenite::{tungstenite, WebSocketStream};
#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::TRACE)
.init();
let mut client = hyper_util::client::legacy::Client::builder(TokioExecutor::new());
let client = client.http2_only(true).build_http::<String>();
let req = Request::builder()
.method(Method::CONNECT)
.extension(hyper::ext::Protocol::from_static("websocket"))
.uri("http://127.0.0.1:3000/ws")
.header("sec-websocket-version", "13")
.body(String::default())
.unwrap();
let response = client.request(req).await.unwrap();
let status = response.status();
if status != 200 {
let body = response.into_body().collect().await.unwrap().to_bytes();
let body = std::str::from_utf8(&body).unwrap();
panic!("response status was {status}: {body}");
}
let upgraded = hyper::upgrade::on(response).await.unwrap();
let upgraded = TokioIo::new(upgraded);
let socket = WebSocketStream::from_raw_socket(upgraded, protocol::Role::Client, None).await;
test_echo_app(socket).await;
}
async fn test_echo_app<S: AsyncRead + AsyncWrite + Unpin>(mut socket: WebSocketStream<S>) {
let input = tungstenite::Message::Text(tungstenite::Utf8Bytes::from_static("foobar"));
socket.send(input.clone()).await.unwrap();
let output = socket.next().await.unwrap().unwrap();
assert_eq!(input, output);
socket
.send(tungstenite::Message::Ping(Bytes::from_static(b"ping")))
.await
.unwrap();
let output = socket.next().await.unwrap().unwrap();
assert_eq!(
output,
tungstenite::Message::Pong(Bytes::from_static(b"ping"))
);
}
use bytes::Bytes;
use futures::{SinkExt, StreamExt};
use http::{Method, Request};
use http_body_util::BodyExt as _;
use hyper_util::rt::{TokioExecutor, TokioIo};
use tokio::io::{AsyncRead, AsyncWrite};
use tokio::net::TcpStream;
use tokio_tungstenite::tungstenite::protocol;
use tokio_tungstenite::{tungstenite, WebSocketStream};
#[tokio::main]
async fn main() {
tracing_subscriber::fmt()
.with_max_level(tracing::Level::TRACE)
.init();
let io = TokioIo::new(TcpStream::connect("127.0.0.1:3000").await.unwrap());
let (mut send_request, conn) = hyper::client::conn::http2::Builder::new(TokioExecutor::new())
.handshake(io)
.await
.unwrap();
// Wait a little for the SETTINGS frame to go through…
for _ in 0..10000 {
tokio::task::yield_now().await;
}
assert!(conn.is_extended_connect_protocol_enabled());
tokio::spawn(async {
conn.await.unwrap();
});
let req = Request::builder()
.method(Method::CONNECT)
.extension(hyper::ext::Protocol::from_static("websocket"))
.uri("/ws")
.header("sec-websocket-version", "13")
.body("".to_owned())
.unwrap();
dbg!(&req);
let response = send_request.send_request(req).await.unwrap();
let status = response.status();
if status != 200 {
let body = response.into_body().collect().await.unwrap().to_bytes();
let body = std::str::from_utf8(&body).unwrap();
panic!("response status was {status}: {body}");
}
let upgraded = hyper::upgrade::on(response).await.unwrap();
let upgraded = TokioIo::new(upgraded);
let socket = WebSocketStream::from_raw_socket(upgraded, protocol::Role::Client, None).await;
test_echo_app(socket).await;
}
async fn test_echo_app<S: AsyncRead + AsyncWrite + Unpin>(mut socket: WebSocketStream<S>) {
let input = tungstenite::Message::Text(tungstenite::Utf8Bytes::from_static("foobar"));
socket.send(input.clone()).await.unwrap();
let output = socket.next().await.unwrap().unwrap();
assert_eq!(input, output);
socket
.send(tungstenite::Message::Ping(Bytes::from_static(b"ping")))
.await
.unwrap();
let output = socket.next().await.unwrap().unwrap();
assert_eq!(
output,
tungstenite::Message::Pong(Bytes::from_static(b"ping"))
);
} |
Beta Was this translation helpful? Give feedback.
0 replies
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
-
This is a discussion question. I am currently studying the use of
hyper-util
client to upgradeHTTP/2
toWebSocket
.Currently, the client encapsulated by hyper_util cannot successfully handshake from
HTTP/2
. Of course, this is limited toHTTP/2
CONNECT
HTTP/2 protocol error during handshake, the error is probably like this:
Beta Was this translation helpful? Give feedback.
All reactions