Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Show session and link details for AMQP 1.0 connection #12670

Merged
merged 2 commits into from
Nov 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 4 additions & 1 deletion deps/rabbit/src/rabbit_amqp_reader.erl
Original file line number Diff line number Diff line change
Expand Up @@ -967,7 +967,8 @@ silent_close_delay() ->
-spec info(rabbit_types:connection(), rabbit_types:info_keys()) ->
rabbit_types:infos().
info(Pid, InfoItems) ->
case InfoItems -- ?INFO_ITEMS of
KnownItems = [session_pids | ?INFO_ITEMS],
case InfoItems -- KnownItems of
[] ->
case gen_server:call(Pid, {info, InfoItems}, infinity) of
{ok, InfoList} ->
Expand Down Expand Up @@ -1065,6 +1066,8 @@ i(client_properties, #v1{connection = #v1_connection{properties = Props}}) ->
end;
i(channels, #v1{tracked_channels = Channels}) ->
maps:size(Channels);
i(session_pids, #v1{tracked_channels = Map}) ->
maps:values(Map);
i(channel_max, #v1{connection = #v1_connection{channel_max = Max}}) ->
Max;
i(reductions = Item, _State) ->
Expand Down
139 changes: 134 additions & 5 deletions deps/rabbit/src/rabbit_amqp_session.erl
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,8 @@
conserve_resources/3,
check_resource_access/4,
check_read_permitted_on_topic/4,
reset_authz/2
reset_authz/2,
info/1
]).

-export([init/1,
Expand Down Expand Up @@ -148,7 +149,9 @@
}).

-record(incoming_link, {
name :: binary(),
snd_settle_mode :: snd_settle_mode(),
target_address :: null | binary(),
%% The exchange is either defined in the ATTACH frame and static for
%% the life time of the link or dynamically provided in each message's
%% "to" field (address v2).
Expand Down Expand Up @@ -197,6 +200,8 @@
}).

-record(outgoing_link, {
name :: binary(),
source_address :: binary(),
%% Although the source address of a link might be an exchange name and binding key
%% or a topic filter, an outgoing link will always consume from a queue.
queue_name :: rabbit_amqqueue:name(),
Expand Down Expand Up @@ -490,6 +495,8 @@ conserve_resources(Pid, Source, {_, Conserve, _}) ->
reset_authz(Pid, User) ->
gen_server:cast(Pid, {reset_authz, User}).

handle_call(infos, _From, State) ->
reply(infos(State), State);
handle_call(Msg, _From, State) ->
Reply = {error, {not_understood, Msg}},
reply(Reply, State).
Expand Down Expand Up @@ -1262,11 +1269,11 @@ handle_attach(#'v1_0.attach'{
reply_frames([Reply], State);

handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
name = LinkName,
name = LinkName = {utf8, LinkName0},
handle = Handle = ?UINT(HandleInt),
source = Source,
snd_settle_mode = MaybeSndSettleMode,
target = Target,
target = Target = #'v1_0.target'{address = TargetAddress},
initial_delivery_count = DeliveryCount = ?UINT(DeliveryCountInt)
},
State0 = #state{incoming_links = IncomingLinks0,
Expand All @@ -1279,7 +1286,9 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
SndSettleMode = snd_settle_mode(MaybeSndSettleMode),
MaxMessageSize = persistent_term:get(max_message_size),
IncomingLink = #incoming_link{
name = LinkName0,
snd_settle_mode = SndSettleMode,
target_address = address(TargetAddress),
exchange = Exchange,
routing_key = RoutingKey,
queue_name_bin = QNameBin,
Expand Down Expand Up @@ -1316,9 +1325,10 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_SENDER,
end;

handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
name = LinkName,
name = LinkName = {utf8, LinkName0},
handle = Handle = ?UINT(HandleInt),
source = Source = #'v1_0.source'{filter = DesiredFilter},
source = Source = #'v1_0.source'{address = SourceAddress,
filter = DesiredFilter},
snd_settle_mode = SndSettleMode,
rcv_settle_mode = RcvSettleMode,
max_message_size = MaybeMaxMessageSize,
Expand Down Expand Up @@ -1431,6 +1441,8 @@ handle_attach(#'v1_0.attach'{role = ?AMQP_ROLE_RECEIVER,
offered_capabilities = OfferedCaps},
MaxMessageSize = max_message_size(MaybeMaxMessageSize),
Link = #outgoing_link{
name = LinkName0,
source_address = address(SourceAddress),
queue_name = queue_resource(Vhost, QNameBin),
queue_type = QType,
send_settled = SndSettled,
Expand Down Expand Up @@ -2672,6 +2684,11 @@ ensure_source_v1(Address,
Err
end.

address(undefined) ->
null;
address({utf8, String}) ->
String.

-spec ensure_target(#'v1_0.target'{},
rabbit_types:vhost(),
rabbit_types:user(),
Expand Down Expand Up @@ -3702,6 +3719,118 @@ format_status(
topic_permission_cache => TopicPermissionCache},
maps:update(state, State, Status).

-spec info(pid()) ->
{ok, rabbit_types:infos()} | {error, term()}.
info(Pid) ->
try gen_server:call(Pid, infos) of
Infos ->
{ok, Infos}
catch _:Reason ->
{error, Reason}
end.

infos(#state{cfg = #cfg{channel_num = ChannelNum,
max_handle = MaxHandle},
next_incoming_id = NextIncomingId,
incoming_window = IncomingWindow,
next_outgoing_id = NextOutgoingId,
remote_incoming_window = RemoteIncomingWindow,
remote_outgoing_window = RemoteOutgoingWindow,
outgoing_unsettled_map = OutgoingUnsettledMap,
incoming_links = IncomingLinks,
outgoing_links = OutgoingLinks,
incoming_management_links = IncomingManagementLinks,
outgoing_management_links = OutgoingManagementLinks
}) ->
[
{channel_number, ChannelNum},
{handle_max, MaxHandle},
{next_incoming_id, NextIncomingId},
{incoming_window, IncomingWindow},
{next_outgoing_id, NextOutgoingId},
{remote_incoming_window, RemoteIncomingWindow},
{remote_outgoing_window, RemoteOutgoingWindow},
{outgoing_unsettled_deliveries, maps:size(OutgoingUnsettledMap)},
{incoming_links,
info_incoming_management_links(IncomingManagementLinks) ++
info_incoming_links(IncomingLinks)},
{outgoing_links,
info_outgoing_management_links(OutgoingManagementLinks) ++
info_outgoing_links(OutgoingLinks)}
].

info_incoming_management_links(Links) ->
[info_incoming_link(Handle, Name, settled, ?MANAGEMENT_NODE_ADDRESS,
MaxMessageSize, DeliveryCount, Credit, 0)
|| Handle := #management_link{
name = Name,
max_message_size = MaxMessageSize,
delivery_count = DeliveryCount,
credit = Credit} <- Links].

info_incoming_links(Links) ->
[info_incoming_link(Handle, Name, SndSettleMode, TargetAddress, MaxMessageSize,
DeliveryCount, Credit, maps:size(IncomingUnconfirmedMap))
|| Handle := #incoming_link{
name = Name,
snd_settle_mode = SndSettleMode,
target_address = TargetAddress,
max_message_size = MaxMessageSize,
delivery_count = DeliveryCount,
credit = Credit,
incoming_unconfirmed_map = IncomingUnconfirmedMap} <- Links].

info_incoming_link(Handle, LinkName, SndSettleMode, TargetAddress,
MaxMessageSize, DeliveryCount, Credit, UnconfirmedMessages) ->
[{handle, Handle},
{link_name, LinkName},
{snd_settle_mode, SndSettleMode},
{target_address, TargetAddress},
{max_message_size, MaxMessageSize},
{delivery_count, DeliveryCount},
{credit, Credit},
{unconfirmed_messages, UnconfirmedMessages}].

info_outgoing_management_links(Links) ->
[info_outgoing_link(Handle, Name, ?MANAGEMENT_NODE_ADDRESS, <<>>,
true, MaxMessageSize, DeliveryCount, Credit)
|| Handle := #management_link{
name = Name,
max_message_size = MaxMessageSize,
delivery_count = DeliveryCount,
credit = Credit} <- Links].

info_outgoing_links(Links) ->
[begin
{DeliveryCount, Credit} = case ClientFlowCtl of
#client_flow_ctl{delivery_count = DC,
credit = C} ->
{DC, C};
credit_api_v1 ->
{'', ''}
end,
info_outgoing_link(Handle, Name, SourceAddress, QueueName#resource.name,
SendSettled, MaxMessageSize, DeliveryCount, Credit)

end
|| Handle := #outgoing_link{
name = Name,
source_address = SourceAddress,
queue_name = QueueName,
max_message_size = MaxMessageSize,
send_settled = SendSettled,
client_flow_ctl = ClientFlowCtl} <- Links].

info_outgoing_link(Handle, LinkName, SourceAddress, QueueNameBin, SendSettled,
MaxMessageSize, DeliveryCount, Credit) ->
[{handle, Handle},
{link_name, LinkName},
{source_address, SourceAddress},
{queue_name, QueueNameBin},
{send_settled, SendSettled},
{max_message_size, MaxMessageSize},
{delivery_count, DeliveryCount},
{credit, Credit}].

unwrap_simple_type(V = {list, _}) ->
V;
Expand Down
6 changes: 3 additions & 3 deletions deps/rabbit/src/rabbit_connection_tracking.erl
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@
list/0, list/1, list_on_node/1, list_on_node/2, list_of_user/1,
tracked_connection_from_connection_created/1,
tracked_connection_from_connection_state/1,
lookup/1, count/0]).
lookup/1, lookup/2, count/0]).

-export([count_local_tracked_items_in_vhost/1,
count_local_tracked_items_of_user/1]).
Expand Down Expand Up @@ -233,8 +233,8 @@ lookup(Name, [Node | Nodes]) when Node == node() ->
end;
lookup(Name, [Node | Nodes]) ->
case rabbit_misc:rpc_call(Node, ?MODULE, lookup, [Name, [Node]]) of
[] -> lookup(Name, Nodes);
[Row] -> Row
not_found -> lookup(Name, Nodes);
Row = #tracked_connection{} -> Row
end.

lookup_internal(Name, Node) ->
Expand Down
2 changes: 1 addition & 1 deletion deps/rabbitmq_management/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ define PROJECT_APP_EXTRA_KEYS
endef

DEPS = rabbit_common rabbit amqp_client cowboy cowlib rabbitmq_web_dispatch rabbitmq_management_agent oauth2_client
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers proper amqp10_client
TEST_DEPS = rabbitmq_ct_helpers rabbitmq_ct_client_helpers proper rabbitmq_amqp_client
LOCAL_DEPS += ranch ssl crypto public_key

# FIXME: Add Ranch as a BUILD_DEPS to be sure the correct version is picked.
Expand Down
4 changes: 4 additions & 0 deletions deps/rabbitmq_management/app.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ def all_beam_files(name = "all_beam_files"):
"src/rabbit_mgmt_wm_cluster_name.erl",
"src/rabbit_mgmt_wm_connection.erl",
"src/rabbit_mgmt_wm_connection_channels.erl",
"src/rabbit_mgmt_wm_connection_sessions.erl",
"src/rabbit_mgmt_wm_connection_user_name.erl",
"src/rabbit_mgmt_wm_connections.erl",
"src/rabbit_mgmt_wm_connections_vhost.erl",
Expand Down Expand Up @@ -182,6 +183,7 @@ def all_test_beam_files(name = "all_test_beam_files"):
"src/rabbit_mgmt_wm_cluster_name.erl",
"src/rabbit_mgmt_wm_connection.erl",
"src/rabbit_mgmt_wm_connection_channels.erl",
"src/rabbit_mgmt_wm_connection_sessions.erl",
"src/rabbit_mgmt_wm_connection_user_name.erl",
"src/rabbit_mgmt_wm_connections.erl",
"src/rabbit_mgmt_wm_connections_vhost.erl",
Expand Down Expand Up @@ -361,6 +363,7 @@ def all_srcs(name = "all_srcs"):
"priv/www/js/tmpl/queues.ejs",
"priv/www/js/tmpl/rate-options.ejs",
"priv/www/js/tmpl/registry.ejs",
"priv/www/js/tmpl/sessions-list.ejs",
"priv/www/js/tmpl/status.ejs",
"priv/www/js/tmpl/topic-permissions.ejs",
"priv/www/js/tmpl/user.ejs",
Expand Down Expand Up @@ -407,6 +410,7 @@ def all_srcs(name = "all_srcs"):
"src/rabbit_mgmt_wm_cluster_name.erl",
"src/rabbit_mgmt_wm_connection.erl",
"src/rabbit_mgmt_wm_connection_channels.erl",
"src/rabbit_mgmt_wm_connection_sessions.erl",
"src/rabbit_mgmt_wm_connection_user_name.erl",
"src/rabbit_mgmt_wm_connections.erl",
"src/rabbit_mgmt_wm_connections_vhost.erl",
Expand Down
19 changes: 15 additions & 4 deletions deps/rabbitmq_management/priv/www/js/dispatcher.js
Original file line number Diff line number Diff line change
Expand Up @@ -46,10 +46,21 @@ dispatcher_add(function(sammy) {
});
sammy.get('#/connections/:name', function() {
var name = esc(this.params['name']);
render({'connection': {path: '/connections/' + name,
options: {ranges: ['data-rates-conn']}},
'channels': '/connections/' + name + '/channels'},
'connection', '#/connections');
var connectionPath = '/connections/' + name;
var reqs = {
'connection': {
path: connectionPath,
options: { ranges: ['data-rates-conn'] }
}
};
// First, get the connection details to check the protocol
var connectionDetails = JSON.parse(sync_get(connectionPath));
if (connectionDetails.protocol === 'AMQP 1-0') {
reqs['sessions'] = connectionPath + '/sessions';
} else {
reqs['channels'] = connectionPath + '/channels';
}
render(reqs, 'connection', '#/connections');
});
sammy.del('#/connections', function() {
var options = {headers: {
Expand Down
28 changes: 27 additions & 1 deletion deps/rabbitmq_management/priv/www/js/global.js
Original file line number Diff line number Diff line change
Expand Up @@ -586,8 +586,34 @@ var HELP = {
</dl> ',

'container-id':
'Name of the client application as sent from client to RabbitMQ in the "container-id" field of the AMQP 1.0 <a target="_blank" href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-open">open</a> frame.'
'Name of the client application as sent from client to RabbitMQ in the "container-id" field of the AMQP 1.0 <a target="_blank" href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-open">open</a> frame.',

'incoming-links':
'<a target="_blank" href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#section-links">Links</a> where the client is the sender/publisher and RabbitMQ is the receiver of messages.',

'outgoing-links':
'<a target="_blank" href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#section-links">Links</a> where the client is the receiver/consumer and RabbitMQ is the sender of messages.',

'target-address':
'The "address" field of the link <a target="_blank" href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-target">target</a>.',

'source-address':
'The "address" field of the link <a target="_blank" href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-messaging-v1.0-os.html#type-source">source</a>.',

'amqp-source-queue':
'The client receives messages from this queue.',

'amqp-unconfirmed-messages':
'Number of messages that have been sent to queues but have not been confirmed by all queues.',

'snd-settle-mode':
'<a target="_blank" href="https://docs.oasis-open.org/amqp/core/v1.0/os/amqp-core-transport-v1.0-os.html#type-sender-settle-mode">Sender Settle Mode</a>',

'sender-settles':
'"true" if the sender sends all deliveries settled to the receiver. "false" if the sender sends all deliveries initially unsettled to the receiver.',

'outgoing-unsettled-deliveries':
'Number of messages that have been sent to consumers but have not yet been settled/acknowledged.'
};

///////////////////////////////////////////////////////////////////////////
Expand Down
13 changes: 13 additions & 0 deletions deps/rabbitmq_management/priv/www/js/tmpl/connection.ejs
Original file line number Diff line number Diff line change
Expand Up @@ -84,13 +84,26 @@
</div>
</div>

<% if (connection.protocol === 'AMQP 1-0') { %>

<div class="section">
<h2 class="updatable" >Sessions (<%=(sessions.length)%>)</h2>
<div class="hider updatable">
<%= format('sessions-list', {'sessions': sessions}) %>
</div>
</div>

<% } else { %>

<div class="section">
<h2 class="updatable" >Channels (<%=(channels.length)%>) </h2>
<div class="hider updatable">
<%= format('channels-list', {'channels': channels, 'mode': 'connection'}) %>
</div>
</div>

<% } %>

<% if (connection.ssl) { %>
<div class="section">
<h2>SSL</h2>
Expand Down
Loading
Loading