Skip to content

Commit

Permalink
Reset any queued stream on receipt of remote reset (#258)
Browse files Browse the repository at this point in the history
Fixes #256.

This PR changes `state::recv_reset` so any closed stream with queued send is immediately reset (and thus, the queue is cleared) on receipt of a `RST_STREAM` frame from the remote. 

This fixes the panic encountered by the test @goffrie posted in #256, where the stream is scheduled to close, receives a `RST_STREAM` frame, and sets the buffered capacity to 0, but isn't removed from the send queue, so we hit an assertion (or wrap, if debug assertions are disabled) when subtracting a sent frame's size from the buffered size.

Signed-off-by: Eliza Weisman <[email protected]>
  • Loading branch information
hawkw authored Apr 16, 2018
1 parent fabae35 commit 040f391
Show file tree
Hide file tree
Showing 2 changed files with 32 additions and 12 deletions.
7 changes: 5 additions & 2 deletions src/proto/streams/prioritize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -593,6 +593,8 @@ impl Prioritize {
// To be safe, we just always ask the stream.
let is_counted = stream.is_counted();
let is_pending_reset = stream.is_pending_reset_expiration();
trace!(" --> stream={:?}; is_pending_reset={:?};",
stream.id, is_pending_reset);

let frame = match stream.pending_send.pop_front(buffer) {
Some(Frame::Data(mut frame)) => {
Expand All @@ -603,13 +605,14 @@ impl Prioritize {

trace!(
" --> data frame; stream={:?}; sz={}; eos={:?}; window={}; \
available={}; requested={}",
available={}; requested={}; buffered={};",
frame.stream_id(),
sz,
frame.is_end_stream(),
stream_capacity,
stream.send_flow.available(),
stream.requested_send_capacity
stream.requested_send_capacity,
stream.buffered_send_data,
);

// Zero length data frames always have capacity to
Expand Down
37 changes: 27 additions & 10 deletions src/proto/streams/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,20 +214,37 @@ impl State {
}

/// The remote explicitly sent a RST_STREAM.
///
/// # Arguments
/// - `reason`: the reason field of the received RST_STREAM frame.
/// - `queued`: true if this stream has frames in the pending send queue.
pub fn recv_reset(&mut self, reason: Reason, queued: bool) {

match self.inner {
Closed(Cause::EndStream) if queued => {
// If the stream has a queued EOS frame, transition to peer
// reset.
trace!("recv_reset: reason={:?}; queued=true", reason);
self.inner = Closed(Cause::Proto(reason));
},
Closed(..) => {},
_ => {
trace!("recv_reset; reason={:?}", reason);
// If the stream is already in a `Closed` state, do nothing,
// provided that there are no frames still in the send queue.
Closed(..) if !queued => {},
// A notionally `Closed` stream may still have queued frames in
// the following cases:
//
// - if the cause is `Cause::Scheduled(..)` (i.e. we have not
// actually closed the stream yet).
// - if the cause is `Cause::EndStream`: we transition to this
// state when an EOS frame is *enqueued* (so that it's invalid
// to enqueue more frames), not when the EOS frame is *sent*;
// therefore, there may still be frames ahead of the EOS frame
// in the send queue.
//
// In either of these cases, we want to overwrite the stream's
// previous state with the received RST_STREAM, so that the queue
// will be cleared by `Prioritize::pop_frame`.
state => {
trace!(
"recv_reset; reason={:?}; state={:?}; queued={:?}",
reason, state, queued
);
self.inner = Closed(Cause::Proto(reason));
},

}
}

Expand Down

0 comments on commit 040f391

Please sign in to comment.