-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add shutdown functionality to NodeStream
.
#560
Changes from 1 commit
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -124,10 +124,12 @@ where | |
TMuxer: StreamMuxer, | ||
THandler: NodeHandler<Substream<TMuxer>>, | ||
{ | ||
/// Node that handles the muxing. Can be `None` if the handled node is shutting down. | ||
node: Option<NodeStream<TMuxer, TAddrFut, THandler::OutboundOpenInfo>>, | ||
/// Node that handles the muxing. | ||
node: NodeStream<TMuxer, TAddrFut, THandler::OutboundOpenInfo>, | ||
/// Handler that processes substreams. | ||
handler: THandler, | ||
|
||
is_shutdown: bool | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Needs a doc comment. |
||
} | ||
|
||
impl<TMuxer, TAddrFut, THandler> HandledNode<TMuxer, TAddrFut, THandler> | ||
|
@@ -140,8 +142,9 @@ where | |
#[inline] | ||
pub fn new(muxer: TMuxer, multiaddr_future: TAddrFut, handler: THandler) -> Self { | ||
HandledNode { | ||
node: Some(NodeStream::new(muxer, multiaddr_future)), | ||
node: NodeStream::new(muxer, multiaddr_future), | ||
handler, | ||
is_shutdown: false | ||
} | ||
} | ||
|
||
|
@@ -151,40 +154,41 @@ where | |
self.handler.inject_event(event); | ||
} | ||
|
||
/// Returns true if the inbound channel of the muxer is closed. | ||
/// Returns true if the inbound channel of the muxer is open. | ||
/// | ||
/// If `true` is returned, then no more inbound substream will be received. | ||
/// If `true` is returned, more inbound substream will be received. | ||
#[inline] | ||
pub fn is_inbound_closed(&self) -> bool { | ||
self.node.as_ref().map(|n| n.is_inbound_closed()).unwrap_or(true) | ||
pub fn is_inbound_open(&self) -> bool { | ||
self.node.is_inbound_open() | ||
} | ||
|
||
/// Returns true if the outbound channel of the muxer is closed. | ||
/// Returns true if the outbound channel of the muxer is open. | ||
/// | ||
/// If `true` is returned, then no more outbound substream will be opened. | ||
/// If `true` is returned, more outbound substream will be opened. | ||
#[inline] | ||
pub fn is_outbound_closed(&self) -> bool { | ||
self.node.as_ref().map(|n| n.is_outbound_closed()).unwrap_or(true) | ||
pub fn is_outbound_open(&self) -> bool { | ||
self.node.is_outbound_open() | ||
} | ||
|
||
/// Returns true if the handled node is in the process of shutting down. | ||
#[inline] | ||
pub fn is_shutting_down(&self) -> bool { | ||
self.node.is_none() | ||
self.is_shutdown | ||
} | ||
|
||
/// Indicates to the handled node that it should shut down. After calling this method, the | ||
/// `Stream` will end in the not-so-distant future. | ||
/// | ||
/// After this method returns, `is_shutting_down()` should return true. | ||
pub fn shutdown(&mut self) { | ||
if let Some(node) = self.node.take() { | ||
for user_data in node.close() { | ||
self.handler.inject_outbound_closed(user_data); | ||
} | ||
self.node.shutdown_all(); | ||
self.is_shutdown = true; | ||
|
||
for user_data in self.node.close() { | ||
self.handler.inject_outbound_closed(user_data); | ||
} | ||
|
||
self.handler.shutdown(); | ||
self.handler.shutdown() | ||
} | ||
} | ||
|
||
|
@@ -201,60 +205,54 @@ where | |
loop { | ||
let mut node_not_ready = false; | ||
|
||
match self.node.as_mut().map(|n| n.poll()) { | ||
Some(Ok(Async::NotReady)) | None => {}, | ||
Some(Ok(Async::Ready(Some(NodeEvent::InboundSubstream { substream })))) => { | ||
self.handler.inject_substream(substream, NodeHandlerEndpoint::Listener); | ||
}, | ||
Some(Ok(Async::Ready(Some(NodeEvent::OutboundSubstream { user_data, substream })))) => { | ||
match self.node.poll()? { | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I think we need a |
||
Async::NotReady => (), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Getting rid of that odd |
||
Async::Ready(Some(NodeEvent::InboundSubstream { substream })) => { | ||
self.handler.inject_substream(substream, NodeHandlerEndpoint::Listener) | ||
} | ||
Async::Ready(Some(NodeEvent::OutboundSubstream { user_data, substream })) => { | ||
let endpoint = NodeHandlerEndpoint::Dialer(user_data); | ||
self.handler.inject_substream(substream, endpoint); | ||
}, | ||
Some(Ok(Async::Ready(None))) => { | ||
self.handler.inject_substream(substream, endpoint) | ||
} | ||
Async::Ready(None) => { | ||
node_not_ready = true; | ||
self.node = None; | ||
self.handler.shutdown(); | ||
}, | ||
Some(Ok(Async::Ready(Some(NodeEvent::Multiaddr(result))))) => { | ||
self.handler.inject_multiaddr(result); | ||
}, | ||
Some(Ok(Async::Ready(Some(NodeEvent::OutboundClosed { user_data })))) => { | ||
self.handler.inject_outbound_closed(user_data); | ||
}, | ||
Some(Ok(Async::Ready(Some(NodeEvent::InboundClosed)))) => { | ||
self.handler.inject_inbound_closed(); | ||
}, | ||
Some(Err(err)) => { | ||
self.node = None; | ||
return Err(err); | ||
}, | ||
if !self.is_shutdown { | ||
self.handler.shutdown() | ||
} | ||
} | ||
Async::Ready(Some(NodeEvent::Multiaddr(result))) => { | ||
self.handler.inject_multiaddr(result) | ||
} | ||
Async::Ready(Some(NodeEvent::OutboundClosed { user_data })) => { | ||
self.handler.inject_outbound_closed(user_data) | ||
} | ||
Async::Ready(Some(NodeEvent::InboundClosed)) => { | ||
self.handler.inject_inbound_closed() | ||
} | ||
} | ||
|
||
match self.handler.poll() { | ||
Ok(Async::NotReady) => { | ||
match self.handler.poll()? { | ||
Async::NotReady => { | ||
if node_not_ready { | ||
break; | ||
break | ||
} | ||
}, | ||
Ok(Async::Ready(Some(NodeHandlerEvent::OutboundSubstreamRequest(user_data)))) => { | ||
if let Some(node) = self.node.as_mut() { | ||
match node.open_substream(user_data) { | ||
} | ||
Async::Ready(Some(NodeHandlerEvent::OutboundSubstreamRequest(user_data))) => { | ||
if self.node.is_outbound_open() { | ||
match self.node.open_substream(user_data) { | ||
Ok(()) => (), | ||
Err(user_data) => self.handler.inject_outbound_closed(user_data), | ||
} | ||
} else { | ||
self.handler.inject_outbound_closed(user_data); | ||
} | ||
}, | ||
Ok(Async::Ready(Some(NodeHandlerEvent::Custom(event)))) => { | ||
} | ||
Async::Ready(Some(NodeHandlerEvent::Custom(event))) => { | ||
return Ok(Async::Ready(Some(event))); | ||
}, | ||
Ok(Async::Ready(None)) => { | ||
return Ok(Async::Ready(None)); | ||
}, | ||
Err(err) => { | ||
return Err(err); | ||
}, | ||
} | ||
Async::Ready(None) => { | ||
return Ok(Async::Ready(None)) | ||
} | ||
} | ||
} | ||
|
||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should it be
is_shutting_down
? It is not a done deal right away, right?