diff --git a/src/frames.rs b/src/frames.rs index 6d5957f8..9064eb8f 100644 --- a/src/frames.rs +++ b/src/frames.rs @@ -92,6 +92,10 @@ impl Frames { pub(crate) fn clear_expected_replies(&self, channel_id: ChannelId, error: Error) { self.inner.lock().clear_expected_replies(channel_id, error); } + + pub(crate) fn poison(&self) -> Option { + self.inner.lock().poison.clone() + } } #[derive(Default)] diff --git a/src/io_loop.rs b/src/io_loop.rs index 4a56f85c..9c1e7e64 100644 --- a/src/io_loop.rs +++ b/src/io_loop.rs @@ -170,6 +170,11 @@ impl IoLoop { } } self.heartbeat.cancel(); + self.clear_serialized_frames( + self.frames.poison().unwrap_or_else(|| { + Error::InvalidConnectionState(ConnectionState::Closed) + }), + ); let internal_rpc = self.internal_rpc.clone(); if self.killswitch.killed() { internal_rpc.register_internal_future(std::future::poll_fn(move |cx| { @@ -247,12 +252,16 @@ impl IoLoop { } self.stop(); self.channels.set_connection_error(error.clone()); + self.clear_serialized_frames(error.clone()); + Err(error) + } + + fn clear_serialized_frames(&mut self, error: Error) { for (_, resolver) in std::mem::take(&mut self.serialized_frames) { if let Some(resolver) = resolver { resolver.swear(Err(error.clone())); } } - Err(error) } fn attempt_flush(&mut self, writable_context: &mut Context<'_>) -> Result<()> {