-
Notifications
You must be signed in to change notification settings - Fork 179
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(server): add TowerService::on_session_close
#1284
Changes from 2 commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -30,10 +30,11 @@ use std::pin::Pin; | |
use std::sync::Arc; | ||
use std::task::{Context, Poll}; | ||
|
||
use futures_util::{Stream, StreamExt}; | ||
use futures_util::{Future, Stream, StreamExt}; | ||
use pin_project::pin_project; | ||
use tokio::sync::{watch, OwnedSemaphorePermit, Semaphore, TryAcquireError}; | ||
use tokio::time::Interval; | ||
use tokio_stream::wrappers::BroadcastStream; | ||
|
||
/// Create channel to determine whether | ||
/// the server shall continue to run or not. | ||
|
@@ -157,3 +158,42 @@ impl Stream for IntervalStream { | |
} | ||
} | ||
} | ||
|
||
#[derive(Debug, Clone)] | ||
pub(crate) struct SessionClose(tokio::sync::broadcast::Sender<()>); | ||
|
||
impl SessionClose { | ||
pub(crate) fn close(self) { | ||
let _ = self.0.send(()); | ||
} | ||
|
||
pub(crate) fn closed(&self) -> SessionClosedFuture { | ||
SessionClosedFuture(BroadcastStream::new(self.0.subscribe())) | ||
} | ||
} | ||
|
||
/// A future that resolves when the connection has been closed. | ||
#[derive(Debug)] | ||
pub struct SessionClosedFuture(BroadcastStream<()>); | ||
|
||
impl Future for SessionClosedFuture { | ||
type Output = (); | ||
|
||
fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> { | ||
match self.0.poll_next_unpin(cx) { | ||
Poll::Pending => Poll::Pending, | ||
// A message is only sent when | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: when the connection is closed |
||
Poll::Ready(x) => { | ||
tracing::info!("{:?}", x); | ||
Poll::Ready(()) | ||
} | ||
} | ||
} | ||
} | ||
|
||
pub(crate) fn session_close() -> (SessionClose, SessionClosedFuture) { | ||
// SessionClosedFuture is closed after one message has been recevied | ||
// and max one message is handled then it's closed. | ||
let (tx, rx) = tokio::sync::broadcast::channel(1); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Tokio's API is a bit strange I feel here; you can only create a pair, but then you can also call There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Yepp, I think the implementation is quite complicated/clever as the Receiver is not clone. I think it just clone the message(s) to other receivers which is probably quite nice to avoid having a separate state for each receiver |
||
(SessionClose(tx), SessionClosedFuture(BroadcastStream::new(rx))) | ||
} |
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -33,7 +33,7 @@ use std::sync::Arc; | |
use std::task::Poll; | ||
use std::time::Duration; | ||
|
||
use crate::future::{ConnectionGuard, ServerHandle, StopHandle}; | ||
use crate::future::{session_close, ConnectionGuard, ServerHandle, SessionClose, SessionClosedFuture, StopHandle}; | ||
use crate::middleware::rpc::{RpcService, RpcServiceBuilder, RpcServiceCfg, RpcServiceT}; | ||
use crate::transport::ws::BackgroundTaskParams; | ||
use crate::transport::{http, ws}; | ||
|
@@ -501,6 +501,7 @@ impl<RpcMiddleware, HttpMiddleware> TowerServiceBuilder<RpcMiddleware, HttpMiddl | |
conn_guard: self.conn_guard, | ||
server_cfg: self.server_cfg, | ||
}, | ||
on_session_close: None, | ||
}; | ||
|
||
TowerService { rpc_middleware, http_middleware: self.http_middleware } | ||
|
@@ -617,18 +618,18 @@ impl<HttpMiddleware, RpcMiddleware> Builder<HttpMiddleware, RpcMiddleware> { | |
/// impl<'a, S> RpcServiceT<'a> for MyMiddleware<S> | ||
/// where S: RpcServiceT<'a> + Send + Sync + Clone + 'static, | ||
/// { | ||
/// type Future = BoxFuture<'a, MethodResponse>; | ||
/// | ||
/// type Future = BoxFuture<'a, MethodResponse>; | ||
/// | ||
/// fn call(&self, req: Request<'a>) -> Self::Future { | ||
/// tracing::info!("MyMiddleware processed call {}", req.method); | ||
/// let count = self.count.clone(); | ||
/// let service = self.service.clone(); | ||
/// let service = self.service.clone(); | ||
/// | ||
/// Box::pin(async move { | ||
/// let rp = service.call(req).await; | ||
/// // Modify the state. | ||
/// count.fetch_add(1, Ordering::Relaxed); | ||
/// rp | ||
/// rp | ||
/// }) | ||
/// } | ||
/// } | ||
|
@@ -941,6 +942,25 @@ pub struct TowerService<RpcMiddleware, HttpMiddleware> { | |
http_middleware: tower::ServiceBuilder<HttpMiddleware>, | ||
} | ||
|
||
impl<RpcMiddleware, HttpMiddleware> TowerService<RpcMiddleware, HttpMiddleware> { | ||
/// A future that returns when the connection has been closed. | ||
/// | ||
/// It's possible to call this many times but internally it uses | ||
/// a bounded buffer of 4 such that if one creates more than 4 | ||
/// SessionCloseFuture's. Then any of these 4 first futures | ||
/// must be polled or dropped to make any progress. | ||
pub fn on_session_closed(&mut self) -> SessionClosedFuture { | ||
if let Some(n) = self.rpc_middleware.on_session_close.as_mut() { | ||
// If it's called more then once another listener is created. | ||
n.closed() | ||
} else { | ||
let (session_close, fut) = session_close(); | ||
self.rpc_middleware.on_session_close = Some(session_close); | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Is there any likelihood of a race where the session is closing already or something, and only then you cann I don't think it really matters though because why would you subscribe to this so late on There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Added some extra docs on it, it's possible. Also it's a little bit weird that That |
||
fut | ||
} | ||
} | ||
} | ||
|
||
impl<RpcMiddleware, HttpMiddleware> hyper::service::Service<hyper::Request<hyper::Body>> | ||
for TowerService<RpcMiddleware, HttpMiddleware> | ||
where | ||
|
@@ -979,6 +999,7 @@ where | |
pub struct TowerServiceNoHttp<L> { | ||
inner: ServiceData, | ||
rpc_middleware: RpcServiceBuilder<L>, | ||
on_session_close: Option<SessionClose>, | ||
} | ||
|
||
impl<RpcMiddleware> hyper::service::Service<hyper::Request<hyper::Body>> for TowerServiceNoHttp<RpcMiddleware> | ||
|
@@ -1004,6 +1025,7 @@ where | |
let conn_guard = &self.inner.conn_guard; | ||
let stop_handle = self.inner.stop_handle.clone(); | ||
let conn_id = self.inner.conn_id; | ||
let on_session_close = self.on_session_close.take(); | ||
|
||
tracing::trace!(target: LOG_TARGET, "{:?}", request); | ||
|
||
|
@@ -1076,6 +1098,7 @@ where | |
sink, | ||
rx, | ||
pending_calls_completed, | ||
on_session_close, | ||
}; | ||
|
||
ws::background_task(params).await; | ||
|
@@ -1176,6 +1199,7 @@ fn process_connection<'a, RpcMiddleware, HttpMiddleware, U>( | |
conn_guard: conn_guard.clone(), | ||
}, | ||
rpc_middleware, | ||
on_session_close: None, | ||
}; | ||
|
||
let service = http_middleware.service(tower_service); | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: Could this be implemented also on
Drop
? And if we already calledclose()
we'd do nothing on drop, otherwise, we'll callself.0.send()
?There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I skipped it because I'm scared of that as the tower stuff requires Clone and I'm not sure whether something is dropped at some point.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Makes sense, thanks!