Skip to content

Commit

Permalink
Small refactor of stream tests
Browse files Browse the repository at this point in the history
  • Loading branch information
acogoluegnes committed Nov 17, 2023
1 parent 2041835 commit f118d2f
Showing 1 changed file with 62 additions and 90 deletions.
152 changes: 62 additions & 90 deletions deps/rabbitmq_stream/test/rabbit_stream_SUITE.erl
Original file line number Diff line number Diff line change
Expand Up @@ -338,27 +338,27 @@ test_super_stream_creation_deletion(Config) ->
Ss = atom_to_binary(?FUNCTION_NAME, utf8),
Partitions = [unicode:characters_to_binary([Ss, <<"-">>, integer_to_binary(N)]) || N <- lists:seq(0, 2)],
Bks = [integer_to_binary(N) || N <- lists:seq(0, 2)],
SsCreationFrame = frame({request, 1, {create_super_stream, Ss, Partitions, Bks, #{}}}),
SsCreationFrame = request({create_super_stream, Ss, Partitions, Bks, #{}}),
ok = T:send(S, SsCreationFrame),
{Cmd1, _} = receive_commands(T, S, C),
?assertMatch({response, 1, {create_super_stream, ?RESPONSE_CODE_OK}},
Cmd1),

PartitionsFrame = frame({request, 1, {partitions, Ss}}),
PartitionsFrame = request({partitions, Ss}),
ok = T:send(S, PartitionsFrame),
{Cmd2, _} = receive_commands(T, S, C),
?assertMatch({response, 1, {partitions, ?RESPONSE_CODE_OK, Partitions}},
Cmd2),
[begin
RouteFrame = frame({request, 1, {route, Rk, Ss}}),
RouteFrame = request({route, Rk, Ss}),
ok = T:send(S, RouteFrame),
{Command, _} = receive_commands(T, S, C),
?assertMatch({response, 1, {route, ?RESPONSE_CODE_OK, _}}, Command),
{response, 1, {route, ?RESPONSE_CODE_OK, [P]}} = Command,
?assertEqual(unicode:characters_to_binary([Ss, <<"-">>, Rk]), P)
end || Rk <- Bks],

SsDeletionFrame = frame({request, 1, {delete_super_stream, Ss}}),
SsDeletionFrame = request({delete_super_stream, Ss}),
ok = T:send(S, SsDeletionFrame),
{Cmd3, _} = receive_commands(T, S, C),
?assertMatch({response, 1, {delete_super_stream, ?RESPONSE_CODE_OK}},
Expand All @@ -370,8 +370,8 @@ test_super_stream_creation_deletion(Config) ->
Cmd4),

%% not the same number of partitions and binding keys
SsCreationBadFrame = frame({request, 1, {create_super_stream, Ss,
[<<"s1">>, <<"s2">>], [<<"bk1">>], #{}}}),
SsCreationBadFrame = request({create_super_stream, Ss,
[<<"s1">>, <<"s2">>], [<<"bk1">>], #{}}),
ok = T:send(S, SsCreationBadFrame),
{Cmd5, _} = receive_commands(T, S, C),
?assertMatch({response, 1, {create_super_stream, ?RESPONSE_CODE_PRECONDITION_FAILED}},
Expand All @@ -396,8 +396,7 @@ test_metadata(Config) ->
C3 = test_create_stream(Transport, S, Stream, C2),
GetStreamNodes =
fun() ->
MetadataFrame =
rabbit_stream_core:frame({request, 1, {metadata, [Stream]}}),
MetadataFrame = request({metadata, [Stream]}),
ok = Transport:send(S, MetadataFrame),
{CmdMetadata, _} = receive_commands(Transport, S, C3),
{response, 1,
Expand Down Expand Up @@ -441,8 +440,7 @@ test_metadata(Config) ->
length(GetStreamNodes()) == 3
end),

DeleteStreamFrame =
rabbit_stream_core:frame({request, 1, {delete_stream, Stream}}),
DeleteStreamFrame = request({delete_stream, Stream}),
ok = Transport:send(S, DeleteStreamFrame),
{CmdDelete, C4} = receive_commands(Transport, S, C3),
?assertMatch({response, 1, {delete_stream, ?RESPONSE_CODE_OK}},
Expand Down Expand Up @@ -488,59 +486,51 @@ test_gc_publishers(Config) ->

unauthenticated_client_rejected_tcp_connected(Config) ->
Port = get_stream_port(Config),
{ok, S} =
gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
{ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
?assertEqual(ok, gen_tcp:send(S, <<"invalid data">>)),
?assertEqual(closed, wait_for_socket_close(gen_tcp, S, 1)).

timeout_tcp_connected(Config) ->
Port = get_stream_port(Config),
{ok, S} =
gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
{ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
?assertEqual(closed, wait_for_socket_close(gen_tcp, S, 1)).

unauthenticated_client_rejected_peer_properties_exchanged(Config) ->
Port = get_stream_port(Config),
{ok, S} =
gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
{ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
C0 = rabbit_stream_core:init(0),
test_peer_properties(gen_tcp, S, C0),
?assertEqual(ok, gen_tcp:send(S, <<"invalid data">>)),
?assertEqual(closed, wait_for_socket_close(gen_tcp, S, 1)).

timeout_peer_properties_exchanged(Config) ->
Port = get_stream_port(Config),
{ok, S} =
gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
{ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
C0 = rabbit_stream_core:init(0),
test_peer_properties(gen_tcp, S, C0),
?assertEqual(closed, wait_for_socket_close(gen_tcp, S, 1)).

unauthenticated_client_rejected_authenticating(Config) ->
Port = get_stream_port(Config),
{ok, S} =
gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
{ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
C0 = rabbit_stream_core:init(0),
test_peer_properties(gen_tcp, S, C0),
SaslHandshakeFrame =
rabbit_stream_core:frame({request, 1, sasl_handshake}),
SaslHandshakeFrame = request(sasl_handshake),
?assertEqual(ok, gen_tcp:send(S, SaslHandshakeFrame)),
?awaitMatch({error, closed}, gen_tcp:send(S, <<"invalid data">>),
?WAIT).

timeout_authenticating(Config) ->
Port = get_stream_port(Config),
{ok, S} =
gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
{ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
C0 = rabbit_stream_core:init(0),
test_peer_properties(gen_tcp, S, C0),
_Frame = rabbit_stream_core:frame({request, 1, sasl_handshake}),
_Frame = request(sasl_handshake),
?assertEqual(closed, wait_for_socket_close(gen_tcp, S, 1)).

timeout_close_sent(Config) ->
Port = get_stream_port(Config),
{ok, S} =
gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
{ok, S} = gen_tcp:connect("localhost", Port, [{active, false}, {mode, binary}]),
C0 = rabbit_stream_core:init(0),
C1 = test_peer_properties(gen_tcp, S, C0),
C2 = test_authenticate(gen_tcp, S, C1),
Expand All @@ -560,18 +550,15 @@ timeout_close_sent(Config) ->
max_segment_size_bytes_validation(Config) ->
Transport = gen_tcp,
Port = get_stream_port(Config),
{ok, S} =
Transport:connect("localhost", Port,
[{active, false}, {mode, binary}]),
{ok, S} = Transport:connect("localhost", Port,
[{active, false}, {mode, binary}]),
C0 = rabbit_stream_core:init(0),
C1 = test_peer_properties(Transport, S, C0),
C2 = test_authenticate(Transport, S, C1),
Stream = <<"stream-max-segment-size">>,
CreateStreamFrame =
rabbit_stream_core:frame({request, 1,
{create_stream, Stream,
#{<<"stream-max-segment-size-bytes">> =>
<<"3000000001">>}}}),
CreateStreamFrame = request({create_stream, Stream,
#{<<"stream-max-segment-size-bytes">> =>
<<"3000000001">>}}),
ok = Transport:send(S, CreateStreamFrame),
{Cmd, C3} = receive_commands(Transport, S, C2),
?assertMatch({response, 1,
Expand Down Expand Up @@ -629,10 +616,8 @@ set_filter_size(Config) ->
],

C3 = lists:foldl(fun({Size, ExpectedResponseCode}, Conn0) ->
Frame = rabbit_stream_core:frame(
{request, 1,
{create_stream, Stream,
#{<<"stream-filter-size-bytes">> => integer_to_binary(Size)}}}),
Frame = request({create_stream, Stream,
#{<<"stream-filter-size-bytes">> => integer_to_binary(Size)}}),
ok = Transport:send(S, Frame),
{Cmd, Conn1} = receive_commands(Transport, S, Conn0),
?assertMatch({response, 1, {create_stream, ExpectedResponseCode}}, Cmd),
Expand Down Expand Up @@ -666,38 +651,38 @@ vhost_queue_limit(Config) ->
Name = atom_to_binary(?FUNCTION_NAME, utf8),
Partitions = [unicode:characters_to_binary([Name, <<"-">>, integer_to_binary(N)]) || N <- lists:seq(0, PartitionCount)],
Bks = [integer_to_binary(N) || N <- lists:seq(0, PartitionCount)],
SsCreationFrame = frame({request, 1, {create_super_stream, Name, Partitions, Bks, #{}}}),
SsCreationFrame = request({create_super_stream, Name, Partitions, Bks, #{}}),
ok = T:send(S, SsCreationFrame),
{Cmd1, _} = receive_commands(T, S, C),
?assertMatch({response, 1, {create_super_stream, ?RESPONSE_CODE_OK}},
Cmd1),

SsCreationFrameKo = frame({request, 1, {create_super_stream,
<<"exceed-queue-limit">>,
[<<"s1">>, <<"s2">>, <<"s3">>],
[<<"1">>, <<"2">>, <<"3">>], #{}}}),
SsCreationFrameKo = request({create_super_stream,
<<"exceed-queue-limit">>,
[<<"s1">>, <<"s2">>, <<"s3">>],
[<<"1">>, <<"2">>, <<"3">>], #{}}),

ok = T:send(S, SsCreationFrameKo),
{Cmd2, _} = receive_commands(T, S, C),
?assertMatch({response, 1, {create_super_stream, ?RESPONSE_CODE_PRECONDITION_FAILED}},
Cmd2),

CreateStreamFrame = rabbit_stream_core:frame({request, 1, {create_stream, <<"exceed-queue-limit">>, #{}}}),
CreateStreamFrame = request({create_stream, <<"exceed-queue-limit">>, #{}}),
ok = T:send(S, CreateStreamFrame),
{Cmd3, C} = receive_commands(T, S, C),
?assertMatch({response, 1, {create_stream, ?RESPONSE_CODE_PRECONDITION_FAILED}}, Cmd3),

SsDeletionFrame = frame({request, 1, {delete_super_stream, Name}}),
SsDeletionFrame = request({delete_super_stream, Name}),
ok = T:send(S, SsDeletionFrame),
{Cmd4, _} = receive_commands(T, S, C),
?assertMatch({response, 1, {delete_super_stream, ?RESPONSE_CODE_OK}},
Cmd4),

ok = T:send(S, rabbit_stream_core:frame({request, 1, {create_stream, Name, #{}}})),
ok = T:send(S, request({create_stream, Name, #{}})),
{Cmd5, C} = receive_commands(T, S, C),
?assertMatch({response, 1, {create_stream, ?RESPONSE_CODE_OK}}, Cmd5),

ok = T:send(S, rabbit_stream_core:frame({request, 1, {delete_stream, Name}})),
ok = T:send(S, request({delete_stream, Name})),
{Cmd6, C} = receive_commands(T, S, C),
?assertMatch({response, 1, {delete_stream, ?RESPONSE_CODE_OK}}, Cmd6),

Expand Down Expand Up @@ -845,8 +830,7 @@ test_server(Transport, Stream, Config) ->
ok.

test_peer_properties(Transport, S, C0) ->
PeerPropertiesFrame =
rabbit_stream_core:frame({request, 1, {peer_properties, #{}}}),
PeerPropertiesFrame = request({peer_properties, #{}}),
ok = Transport:send(S, PeerPropertiesFrame),
{Cmd, C} = receive_commands(Transport, S, C0),
?assertMatch({response, 1, {peer_properties, ?RESPONSE_CODE_OK, _}},
Expand All @@ -857,8 +841,7 @@ test_authenticate(Transport, S, C0) ->
tune(Transport, S, test_plain_sasl_authenticate(Transport, S, sasl_handshake(Transport, S, C0))).

sasl_handshake(Transport, S, C0) ->
SaslHandshakeFrame =
rabbit_stream_core:frame({request, 1, sasl_handshake}),
SaslHandshakeFrame = request(sasl_handshake),
ok = Transport:send(S, SaslHandshakeFrame),
Plain = <<"PLAIN">>,
AmqPlain = <<"AMQPLAIN">>,
Expand All @@ -882,16 +865,16 @@ plain_sasl_authenticate(Transport, S, C1, Username, Password) ->
sasl_authenticate(Transport, S, C1, <<"PLAIN">>, <<Null:8, Username/binary, Null:8, Password/binary>>).

expect_successful_authentication({SaslAuth, C2} = _SaslReponse) ->
{response, 2, {sasl_authenticate, ?RESPONSE_CODE_OK}} = SaslAuth,
C2.
?assertEqual({response, 2, {sasl_authenticate, ?RESPONSE_CODE_OK}},
SaslAuth),
C2.
expect_unsuccessful_authentication({SaslAuth, C2} = _SaslReponse, ExpectedError) ->
{response, 2, {sasl_authenticate, ExpectedError}} = SaslAuth,
C2.
?assertEqual({response, 2, {sasl_authenticate, ExpectedError}},
SaslAuth),
C2.

sasl_authenticate(Transport, S, C1, AuthMethod, AuthBody) ->
SaslAuthenticateFrame =
rabbit_stream_core:frame({request, 2,
{sasl_authenticate, AuthMethod, AuthBody}}),
SaslAuthenticateFrame = request(2, {sasl_authenticate, AuthMethod, AuthBody}),
ok = Transport:send(S, SaslAuthenticateFrame),
receive_commands(Transport, S, C1).

Expand All @@ -905,17 +888,15 @@ tune(Transport, S, C2) ->
ok = Transport:send(S, TuneFrame),

VirtualHost = <<"/">>,
OpenFrame =
rabbit_stream_core:frame({request, 3, {open, VirtualHost}}),
OpenFrame = request(3, {open, VirtualHost}),
ok = Transport:send(S, OpenFrame),
{{response, 3, {open, ?RESPONSE_CODE_OK, _ConnectionProperties}},
C4} =
receive_commands(Transport, S, C3),
C4.

test_create_stream(Transport, S, Stream, C0) ->
CreateStreamFrame =
rabbit_stream_core:frame({request, 1, {create_stream, Stream, #{}}}),
CreateStreamFrame = request({create_stream, Stream, #{}}),
ok = Transport:send(S, CreateStreamFrame),
{Cmd, C} = receive_commands(Transport, S, C0),
?assertMatch({response, 1, {create_stream, ?RESPONSE_CODE_OK}}, Cmd),
Expand All @@ -931,8 +912,7 @@ test_delete_stream(Transport, S, Stream, C0, true) ->
test_metadata_update_stream_deleted(Transport, S, Stream, C1).

do_test_delete_stream(Transport, S, Stream, C0) ->
DeleteStreamFrame =
rabbit_stream_core:frame({request, 1, {delete_stream, Stream}}),
DeleteStreamFrame = request({delete_stream, Stream}),
ok = Transport:send(S, DeleteStreamFrame),
{Cmd, C1} = receive_commands(Transport, S, C0),
?assertMatch({response, 1, {delete_stream, ?RESPONSE_CODE_OK}}, Cmd),
Expand All @@ -947,12 +927,10 @@ test_declare_publisher(Transport, S, PublisherId, Stream, C0) ->
test_declare_publisher(Transport, S, PublisherId, <<>>, Stream, C0).

test_declare_publisher(Transport, S, PublisherId, Reference, Stream, C0) ->
DeclarePublisherFrame =
rabbit_stream_core:frame({request, 1,
{declare_publisher,
PublisherId,
Reference,
Stream}}),
DeclarePublisherFrame = request({declare_publisher,
PublisherId,
Reference,
Stream}),
ok = Transport:send(S, DeclarePublisherFrame),
{Cmd, C} = receive_commands(Transport, S, C0),
?assertMatch({response, 1, {declare_publisher, ?RESPONSE_CODE_OK}},
Expand All @@ -976,8 +954,7 @@ test_publish_confirm(Transport, S, publish = PublishCmd, PublisherId,
ExpectedConfirmCommand, C0) ->
BodySize = byte_size(Body),
Messages = [<<Sequence:64, 0:1, BodySize:31, Body:BodySize/binary>>],
PublishFrame =
rabbit_stream_core:frame({PublishCmd, PublisherId, 1, Messages}),
PublishFrame = frame({PublishCmd, PublisherId, 1, Messages}),
ok = Transport:send(S, PublishFrame),
{Cmd, C} = receive_commands(Transport, S, C0),
?assertMatch({ExpectedConfirmCommand, PublisherId, [Sequence]}, Cmd),
Expand All @@ -990,8 +967,7 @@ test_publish_confirm(Transport, S, publish_v2 = PublishCmd, PublisherId,
FilterValueSize = byte_size(FilterValue),
Messages = [<<Sequence:64, FilterValueSize:16, FilterValue:FilterValueSize/binary,
0:1, BodySize:31, Body:BodySize/binary>>],
PublishFrame =
rabbit_stream_core:frame({PublishCmd, PublisherId, 1, Messages}),
PublishFrame = frame({PublishCmd, PublisherId, 1, Messages}),
ok = Transport:send(S, PublishFrame),
{Cmd, C} = receive_commands(Transport, S, C0),
case ExpectedConfirmCommand of
Expand All @@ -1018,18 +994,14 @@ test_subscribe(Transport,
SubscriptionProperties,
ExpectedResponseCode,
C0) ->
SubCmd =
{request, 1,
{subscribe, SubscriptionId, Stream, 0, 10, SubscriptionProperties}},
SubscribeFrame = rabbit_stream_core:frame(SubCmd),
SubscribeFrame = request({subscribe, SubscriptionId, Stream, 0, 10, SubscriptionProperties}),
ok = Transport:send(S, SubscribeFrame),
{Cmd, C} = receive_commands(Transport, S, C0),
?assertMatch({response, 1, {subscribe, ExpectedResponseCode}}, Cmd),
C.

test_unsubscribe(Transport, Socket, SubscriptionId, C0) ->
UnsubCmd = {request, 1, {unsubscribe, SubscriptionId}},
UnsubscribeFrame = rabbit_stream_core:frame(UnsubCmd),
UnsubscribeFrame = request({unsubscribe, SubscriptionId}),
ok = Transport:send(Socket, UnsubscribeFrame),
{Cmd, C} = receive_commands(Transport, Socket, C0),
?assertMatch({response, 1, {unsubscribe, ?RESPONSE_CODE_OK}}, Cmd),
Expand Down Expand Up @@ -1080,10 +1052,7 @@ test_deliver_v2(Transport, S, SubscriptionId, COffset, Body, C0) ->
C.

test_exchange_command_versions(Transport, S, C0) ->
ExCmd =
{request, 1,
{exchange_command_versions, [{deliver, ?VERSION_1, ?VERSION_2}]}},
ExFrame = rabbit_stream_core:frame(ExCmd),
ExFrame = request({exchange_command_versions, [{deliver, ?VERSION_1, ?VERSION_2}]}),
ok = Transport:send(S, ExFrame),
{Cmd, C} = receive_commands(Transport, S, C0),
?assertMatch({response, 1,
Expand All @@ -1093,8 +1062,7 @@ test_exchange_command_versions(Transport, S, C0) ->
C.

test_stream_stats(Transport, S, Stream, C0) ->
SICmd = {request, 1, {stream_stats, Stream}},
SIFrame = rabbit_stream_core:frame(SICmd),
SIFrame = request({stream_stats, Stream}),
ok = Transport:send(S, SIFrame),
{Cmd, C} = receive_commands(Transport, S, C0),
?assertMatch({response, 1,
Expand All @@ -1106,9 +1074,7 @@ test_stream_stats(Transport, S, Stream, C0) ->

test_close(Transport, S, C0) ->
CloseReason = <<"OK">>,
CloseFrame =
rabbit_stream_core:frame({request, 1,
{close, ?RESPONSE_CODE_OK, CloseReason}}),
CloseFrame = request({close, ?RESPONSE_CODE_OK, CloseReason}),
ok = Transport:send(S, CloseFrame),
{{response, 1, {close, ?RESPONSE_CODE_OK}}, C} =
receive_commands(Transport, S, C0),
Expand Down Expand Up @@ -1159,3 +1125,9 @@ get_global_counters(Config) ->
rabbit_global_counters,
overview,
[])).

request(Cmd) ->
request(1, Cmd).

request(CorrId, Cmd) ->
rabbit_stream_core:frame({request, CorrId, Cmd}).

0 comments on commit f118d2f

Please sign in to comment.