Skip to content

Commit

Permalink
HTTP/2: Fix tunneled streams bugs
Browse files Browse the repository at this point in the history
* Tunneled streams can now close the stream.

* Data received on tunneled streams now result in
  WINDOW_UPDATE frames being sent if necessary,
  and flow control is handled.

This was detected as part of writing a new Cowboy test
suite for benchmarking Websocket, but should help other
uses too.
  • Loading branch information
essen committed Jan 2, 2025
1 parent 8efcedd commit 23f96b8
Showing 1 changed file with 63 additions and 50 deletions.
113 changes: 63 additions & 50 deletions src/gun_http2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,8 @@ parse(Data, State0=#http2_state{status=Status, http2_machine=HTTP2Machine, strea
case frame(State0, Frame, CookieStore0, EvHandler, EvHandlerState0) of
{Error={error, _}, CookieStore, EvHandlerState} ->
{Error, CookieStore, EvHandlerState};
{[{state, State}, close], CookieStore, EvHandlerState} ->
{[{state, State}, close], CookieStore, EvHandlerState};
{{state, State}, CookieStore, EvHandlerState} ->
parse(Rest, State, CookieStore, EvHandler, EvHandlerState)
end;
Expand Down Expand Up @@ -379,67 +381,37 @@ maybe_ack_or_notify(State=#http2_state{reply_to=ReplyTo, socket=Socket,

data_frame(State0, StreamID, IsFin, Data, CookieStore0, EvHandler, EvHandlerState0) ->
case get_stream_by_id(State0, StreamID) of
Stream=#stream{tunnel=undefined} ->
Stream=#stream{tunnel=undefined, handler_state=Handlers0} ->
{ok, Dec, Handlers} = gun_content_handler:handle(IsFin, Data, Handlers0),
{StateOrError, EvHandlerState} = data_frame1(State0,
StreamID, IsFin, Data, EvHandler, EvHandlerState0, Stream),
StreamID, IsFin, Data, EvHandler, EvHandlerState0,
Stream#stream{handler_state=Handlers}, Dec),
{StateOrError, CookieStore0, EvHandlerState};
Stream=#stream{tunnel=#tunnel{protocol=Proto, protocol_state=ProtoState0}} ->
% %% @todo What about IsFin?
{Commands, CookieStore, EvHandlerState1} = Proto:handle(Data,
ProtoState0, CookieStore0, EvHandler, EvHandlerState0),
%% The frame/parse functions only handle state or error commands.
{ResCommands, EvHandlerState} = tunnel_commands(Commands,
Stream, State0, EvHandler, EvHandlerState1),
{ResCommands, CookieStore, EvHandlerState}
{StateOrError, EvHandlerState1} = data_frame1(State0,
StreamID, IsFin, Data, EvHandler, EvHandlerState0,
Stream, 0),
case StateOrError of
{state, State} ->
{Commands, CookieStore, EvHandlerState2} = Proto:handle(Data,
ProtoState0, CookieStore0, EvHandler, EvHandlerState1),
%% The frame/parse functions only handle state or error commands.
{ResCommands, EvHandlerState} = tunnel_commands(Commands,
Stream, State, EvHandler, EvHandlerState2),
{ResCommands, CookieStore, EvHandlerState};
Error = {error, _} ->
{Error, CookieStore0, EvHandlerState1}
end
end.

%% Send errors are returned. Other errors cause the stream to be deleted.
tunnel_commands(Command, Stream, State, EvHandler, EvHandlerState)
when not is_list(Command) ->
tunnel_commands([Command], Stream, State, EvHandler, EvHandlerState);
tunnel_commands([], Stream, State, _EvHandler, EvHandlerState) ->
{{state, store_stream(State, Stream)}, EvHandlerState};
tunnel_commands([{send, IsFin, Data}|Tail], Stream=#stream{id=StreamID},
State0, EvHandler, EvHandlerState0) ->
case maybe_send_data(State0, StreamID,
IsFin, Data, EvHandler, EvHandlerState0) of
{{state, State}, EvHandlerState} ->
tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState);
ErrorResult={{error, _Reason}, _EvHandlerState} ->
ErrorResult
end;
tunnel_commands([{state, ProtoState}|Tail], Stream=#stream{tunnel=Tunnel},
State, EvHandler, EvHandlerState) ->
tunnel_commands(Tail, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}},
State, EvHandler, EvHandlerState);
tunnel_commands([{error, Reason}|_], #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo},
State, _EvHandler, EvHandlerState) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
{stream_error, Reason, 'Tunnel closed unexpectedly.'}},
{{state, delete_stream(State, StreamID)}, EvHandlerState};
%% @todo Set a timeout for closing the Websocket stream.
tunnel_commands([{closing, _}|Tail], Stream, State, EvHandler, EvHandlerState) ->
tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState);
%% @todo Maybe we should stop increasing the window when not in active mode. (HTTP/2 Websocket only.)
tunnel_commands([{active, _}|Tail], Stream, State, EvHandler, EvHandlerState) ->
tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState).

continue_stream_ref(#http2_state{socket=#{handle_continue_stream_ref := ContinueStreamRef}}, StreamRef) ->
case ContinueStreamRef of
[_|_] -> ContinueStreamRef ++ [StreamRef];
_ -> [ContinueStreamRef, StreamRef]
end;
continue_stream_ref(State, StreamRef) ->
stream_ref(State, StreamRef).

data_frame1(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0,
Stream=#stream{ref=StreamRef, reply_to=ReplyTo, flow=Flow0, handler_state=Handlers0}) ->
{ok, Dec, Handlers} = gun_content_handler:handle(IsFin, Data, Handlers0),
Stream=#stream{ref=StreamRef, reply_to=ReplyTo, flow=Flow0}, Dec) ->
Flow = case Flow0 of
infinity -> infinity;
_ -> Flow0 - Dec
end,
State1 = store_stream(State0, Stream#stream{flow=Flow, handler_state=Handlers}),
State1 = store_stream(State0, Stream#stream{flow=Flow}),
{StateOrError, EvHandlerState} = case byte_size(Data) of
%% We do not send a WINDOW_UPDATE if the DATA frame was of size 0.
0 when IsFin =:= fin ->
Expand Down Expand Up @@ -475,6 +447,47 @@ data_frame1(State0, StreamID, IsFin, Data, EvHandler, EvHandlerState0,
{Error, EvHandlerState}
end.

%% Send errors are returned. Other errors cause the stream to be deleted.
tunnel_commands(Command, Stream, State, EvHandler, EvHandlerState)
when not is_list(Command) ->
tunnel_commands([Command], Stream, State, EvHandler, EvHandlerState);
tunnel_commands([], Stream, State, _EvHandler, EvHandlerState) ->
{{state, store_stream(State, Stream)}, EvHandlerState};
tunnel_commands([{send, IsFin, Data}|Tail], Stream=#stream{id=StreamID},
State0, EvHandler, EvHandlerState0) ->
case maybe_send_data(State0, StreamID,
IsFin, Data, EvHandler, EvHandlerState0) of
{{state, State}, EvHandlerState} ->
tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState);
ErrorResult={{error, _Reason}, _EvHandlerState} ->
ErrorResult
end;
tunnel_commands([{state, ProtoState}|Tail], Stream=#stream{tunnel=Tunnel},
State, EvHandler, EvHandlerState) ->
tunnel_commands(Tail, Stream#stream{tunnel=Tunnel#tunnel{protocol_state=ProtoState}},
State, EvHandler, EvHandlerState);
tunnel_commands([{error, Reason}|_], #stream{id=StreamID, ref=StreamRef, reply_to=ReplyTo},
State, _EvHandler, EvHandlerState) ->
ReplyTo ! {gun_error, self(), stream_ref(State, StreamRef),
{stream_error, Reason, 'Tunnel closed unexpectedly.'}},
{{state, delete_stream(State, StreamID)}, EvHandlerState};
%% @todo Set a timeout for closing the Websocket stream.
tunnel_commands([{closing, _}|Tail], Stream, State, EvHandler, EvHandlerState) ->
tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState);
tunnel_commands([close|_Tail], #stream{id=StreamID}, State, _EvHandler, EvHandlerState) ->
{[{state, delete_stream(State, StreamID)}, close], EvHandlerState};
%% @todo Maybe we should stop increasing the window when not in active mode. (HTTP/2 Websocket only.)
tunnel_commands([{active, _}|Tail], Stream, State, EvHandler, EvHandlerState) ->
tunnel_commands(Tail, Stream, State, EvHandler, EvHandlerState).

continue_stream_ref(#http2_state{socket=#{handle_continue_stream_ref := ContinueStreamRef}}, StreamRef) ->
case ContinueStreamRef of
[_|_] -> ContinueStreamRef ++ [StreamRef];
_ -> [ContinueStreamRef, StreamRef]
end;
continue_stream_ref(State, StreamRef) ->
stream_ref(State, StreamRef).

headers_frame(State0=#http2_state{opts=Opts},
StreamID, IsFin, Headers, #{status := Status}, _BodyLen,
CookieStore0, EvHandler, EvHandlerState0) ->
Expand Down

0 comments on commit 23f96b8

Please sign in to comment.