-
Notifications
You must be signed in to change notification settings - Fork 1k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Add shutdown functionality to NodeStream
.
#560
Conversation
Add `NodeStream::shutdown` to allow triggering the shutdown process, and `NodeStream::poll_shutdown` as the internal way to drive any potential shutdown to completion. Also shutdown the `NodeStream` in `Node::shutdown`. Fixes libp2p#551.
}, | ||
Some(Ok(Async::Ready(Some(NodeEvent::OutboundSubstream { user_data, substream })))) => { | ||
match self.node.poll()? { | ||
Async::NotReady => (), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Getting rid of that odd | None
is nice.
core/src/nodes/handled_node.rs
Outdated
/// Handler that processes substreams. | ||
handler: THandler, | ||
|
||
is_shutdown: bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
nit: should it be is_shutting_down
? It is not a done deal right away, right?
core/src/nodes/node.rs
Outdated
@@ -552,7 +623,7 @@ mod node_stream { | |||
ns.open_substream(vec![1]).unwrap(); | |||
ns.poll().unwrap(); // poll past inbound | |||
ns.poll().unwrap(); // poll outbound | |||
assert_eq!(ns.is_outbound_closed(), false); | |||
assert_eq!(ns.is_outbound_open(), true); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This could be assert!
, but it's not an important change.
let mut out = Vec::with_capacity(self.outbound_substreams.len()); | ||
for (user_data, outbound) in self.outbound_substreams.drain() { | ||
out.push(user_data); | ||
self.muxer.destroy_outbound(outbound); | ||
} | ||
out | ||
} | ||
|
||
/// Trigger node shutdown. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This should document that poll()
will eventually produce None
once both endpoints are properly closed and all the outbound substreams are opened or errored.
core/src/nodes/node.rs
Outdated
@@ -208,21 +287,23 @@ where | |||
type Error = IoError; | |||
|
|||
fn poll(&mut self) -> Poll<Option<Self::Item>, Self::Error> { | |||
// Drive the shutdown process, if any. | |||
try_ready!(self.poll_shutdown()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This will stop polling the outbound substreams if we are in a shut down process, and I don't think that's good.
core/src/nodes/node.rs
Outdated
} | ||
|
||
/// Destroys the node stream and returns all the pending outbound substreams. | ||
pub fn close(mut self) -> Vec<TUserData> { | ||
pub fn close(&mut self) -> Vec<TUserData> { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I see close
as a way to immediately shut down a node if necessary, without properly shutting it down, and therefore taking ownership would be correct.
It's debatable whether close
is useful though.
I think you need a cancel_all_outgoing()
, but it should be separate from close
IMO.
core/src/nodes/handled_node.rs
Outdated
/// Handler that processes substreams. | ||
handler: THandler, | ||
|
||
is_shutdown: bool |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Needs a doc comment.
self.handler.inject_substream(substream, NodeHandlerEndpoint::Listener); | ||
}, | ||
Some(Ok(Async::Ready(Some(NodeEvent::OutboundSubstream { user_data, substream })))) => { | ||
match self.node.poll()? { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we need a Fuse
, because we're polling the node again after it returned None
.
Even though it doesn't cause any issue with Node
, it's still a violation of the Stream
API to poll again afterwards.
…e-handled_node * upstream/master: Remove the old API (libp2p#565) Change some `nat_traversal`s to consider a prefix. (libp2p#550) Add some documentation to listeners stream (libp2p#547) Add shutdown functionality to `NodeStream`. (libp2p#560) Shut down yamux and fix mplex shutdown. (libp2p#559)
…e-handled_node_tasks * dp/chore/test-core-handled_node: Fix broken test Clarify the behaviour of is_shutting_down Fixes broken test after upstream changes Remove the old API (libp2p#565) Change some `nat_traversal`s to consider a prefix. (libp2p#550) Add some documentation to listeners stream (libp2p#547) Add shutdown functionality to `NodeStream`. (libp2p#560) Shut down yamux and fix mplex shutdown. (libp2p#559)
…ref-debug-impl * upstream/master: Use paritytech/rust-secp256k1 (libp2p#598) Use websocket 0.21.0 (libp2p#597) Reexport multihash from the facade (libp2p#587) Add substrate to the list of projects using libp2p (libp2p#595) Remove spaces before semicolons (libp2p#591) Add protocol to report external address view. (libp2p#566) Add a TransportExt trait (libp2p#533) libp2p#399 remove tokio_current_thread tests (libp2p#577) Remove even more unused files (libp2p#581) Fix the polling process in handled node (libp2p#582) Fix panicking when Kad responder is destroyed (libp2p#575) Remove other unused files (libp2p#570) Fix panic in raw swarm (libp2p#571) Remove two unused files (libp2p#567) New core (libp2p#568) Remove the old API (libp2p#565) Change some `nat_traversal`s to consider a prefix. (libp2p#550) Add some documentation to listeners stream (libp2p#547) Add shutdown functionality to `NodeStream`. (libp2p#560)
* ci: unset `RUSTFLAGS` value in semver job Don't fail semver-checking if a dependency version has warnings, such as deprecation notices. Related: libp2p#4932 (comment). Related: obi1kenobi/cargo-semver-checks#589. Pull-Request: libp2p#4942. * deps(webrtc): bump alpha versions Bumps versions of `libp2p-webrtc` and `libp2p-webrtc-websys` up one minor version. Fixes: libp2p#4953. Pull-Request: libp2p#4959. * feat(request-response): derive `PartialOrd`,`Ord` for `{Out,In}RequestId` Pull-Request: libp2p#4956. * refactor(connection-limits): make `check_limit` a free-function Pull-Request: libp2p#4958. * chore(webrtc-utils): bump version to allow for new release We didn't bump this crate's version despite it depending on `libp2p_noise`. As such, we can't release `libp2p-webrtc-websys` at the moment because it needs a new release of this crate. Pull-Request: libp2p#4968. * feat(webrtc-websys): hide `libp2p_noise` from the public API Currently, `libp2p-webrtc-websys` exposes the `libp2p_noise` dependency in its public API. It should really be a private dependency of the crate. By wrapping it in a new-type, we can achieve this. Pull-Request: libp2p#4969. * fix(kad): iterator progress to be decided by any of new peers Pull-Request: libp2p#4932. * chore(quic): set `max_idle_timeout` to quinn default timeout Resolves libp2p#4917. Pull-Request: libp2p#4965. * feat(core): impl Display on ListenerId Fixes: libp2p#4935. Pull-Request: libp2p#4936. * feat(server): support websocket Pull-Request: libp2p#4937. * feat(swarm): implement `Copy` and `Clone` for `FromSwarm` We can make `FromSwarm` implement `Copy` and `Close` which makes it much easier to a) generate code in `libp2p-swarm-derive` b) manually wrap a `NetworkBehaviour` Previously, we couldn't do this because `ConnectionClosed` would have a `handler` field that cannot be cloned / copied. Related: libp2p#4076. Related: libp2p#4581. Pull-Request: libp2p#4825. * deps: bump wasm-bindgen-futures from 0.4.38 to 0.4.39 Pull-Request: libp2p#4946. * feat(connection-limit): add function to mutate `ConnectionLimits` Resolves: libp2p#4826. Pull-Request: libp2p#4964. * deps: bump web-sys from 0.3.65 to 0.3.66 Pull-Request: libp2p#4976. * deps: bump wasm-bindgen-test from 0.3.38 to 0.3.39 Pull-Request: libp2p#4975. * fix(kad): don't assume `QuerId`s are unique We mistakenly assumed that `QueryId`s are unique in that, only a single request will be emitted per `QueryId`. This is wrong. A bootstrap for example will issue multiple requests as part of the same `QueryId`. Thus, we cannot use the `QueryId` as a key for the `FuturesMap`. Instead, we use a `FuturesTupleSet` to associate the `QueryId` with the in-flight request. Related: libp2p#4901. Resolves: libp2p#4948. Pull-Request: libp2p#4971. * fix(webrtc example): clarify idle connection timeout When I ran the `example/browser-webrtc` example I discovered it would break after a ping or two. The `Ping` idle timeout needed to be extended, on both the server and the wasm client, which is what this PR fixes. I also added a small note to the README about ensuring `wasm-pack` is install for the users who are new to the ecosystem. Fixes: libp2p#4950. Pull-Request: libp2p#4966. * docs(examples/readme): fix broken link Related: libp2p#3536. Pull-Request: libp2p#4984. * feat(yamux): auto-tune (dynamic) stream receive window libp2p/rust-yamux#176 enables auto-tuning for the Yamux stream receive window. While preserving small buffers on low-latency and/or low-bandwidth connections, this change allows for high-latency and/or high-bandwidth connections to exhaust the available bandwidth on a single stream. Using the [libp2p perf](https://github.com/libp2p/test-plans/blob/master/perf/README.md) benchmark tools (60ms, 10Gbit/s) shows an **improvement from 33 Mbit/s to 1.3 Gbit/s** in single stream throughput. See libp2p/rust-yamux#176 for details. To ship the above Rust Yamux change in a libp2p patch release (non-breaking), this pull request uses `yamux` `v0.13` (new version) by default and falls back to `yamux` `v0.12` (old version) when setting any configuration options. Thus default users benefit from the increased performance, while power users with custom configurations maintain the old behavior. Pull-Request: libp2p#4970. * deps: bump actions/deploy-pages from 2 to 3 Pull-Request: libp2p#4978. * deps: bump the axum group with 2 updates Pull-Request: libp2p#4943. * chore(webrtc-websys): remove unused dependencies Pull-Request: libp2p#4973. * chore(quic): fix link to PR in changelog Pull-Request: libp2p#4993. * deps: bump tokio from 1.34.0 to 1.35.0 Pull-Request: libp2p#4995. * deps: bump syn from 2.0.39 to 2.0.40 Pull-Request: libp2p#4996. * deps: bump once_cell from 1.18.0 to 1.19.0 Pull-Request: libp2p#4998. --------- Co-authored-by: Predrag Gruevski <[email protected]> Co-authored-by: Doug A <[email protected]> Co-authored-by: Darius Clark <[email protected]> Co-authored-by: zhiqiangxu <[email protected]> Co-authored-by: Thomas Eizinger <[email protected]> Co-authored-by: maqi <[email protected]> Co-authored-by: stormshield-frb <[email protected]> Co-authored-by: Max Inden <[email protected]> Co-authored-by: dependabot[bot] <49699333+dependabot[bot]@users.noreply.github.com> Co-authored-by: NAHO <[email protected]>
Add
NodeStream::shutdown
to allow triggering the shutdown process, andNodeStream::poll_shutdown
as the internal way to drive any potential shutdown to completion.Also shutdown the
NodeStream
inNode::shutdown
.Fixes #551.