Skip to content

Commit

Permalink
Inject DQT into GET /api/definitions and /api/vhosts
Browse files Browse the repository at this point in the history
References #12776

(cherry picked from commit 51e6004)
  • Loading branch information
michaelklishin authored and mergify[bot] committed Nov 27, 2024
1 parent 1468489 commit 6746fbf
Show file tree
Hide file tree
Showing 5 changed files with 82 additions and 11 deletions.
35 changes: 35 additions & 0 deletions deps/rabbit/src/rabbit_queue_type.erl
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@
-behaviour(rabbit_registry_class).

-include("amqqueue.hrl").
-include("vhost.hrl").
-include_lib("rabbit_common/include/rabbit.hrl").
-include_lib("amqp10_common/include/amqp10_types.hrl").

Expand All @@ -23,6 +24,8 @@
to_binary/1,
default/0,
fallback/0,
inject_dqt/1,
vhosts_with_dqt/1,
is_enabled/1,
is_compatible/4,
declare/2,
Expand Down Expand Up @@ -317,6 +320,15 @@ short_alias_of(rabbit_stream_queue) ->
%% AMQP 1.0 management client
short_alias_of({utf8, <<"stream">>}) ->
<<"stream">>;
%% for cases where this function is used for
%% formatting of values that already might use these
%% short aliases
short_alias_of(<<"quorum">>) ->
<<"quorum">>;
short_alias_of(<<"classic">>) ->
<<"classic">>;
short_alias_of(<<"stream">>) ->
<<"stream">>;
short_alias_of(_Other) ->
undefined.

Expand Down Expand Up @@ -833,6 +845,29 @@ known_queue_type_names() ->
QTypeBins = lists:map(fun(X) -> atom_to_binary(X) end, QueueTypes),
?KNOWN_QUEUE_TYPES ++ QTypeBins.

inject_dqt(VHost) when ?is_vhost(VHost) ->
inject_dqt(vhost:to_map(VHost));
inject_dqt(VHost) when is_list(VHost) ->
inject_dqt(rabbit_data_coercion:to_map(VHost));
inject_dqt(M = #{default_queue_type := undefined}) ->
NQT = short_alias_of(default()),
Meta0 = maps:get(metadata, M, #{}),
Meta = Meta0#{default_queue_type => NQT},

M#{default_queue_type => NQT, metadata => Meta};
inject_dqt(M = #{default_queue_type := DQT}) ->
NQT = short_alias_of(DQT),
Meta0 = maps:get(metadata, M, #{}),
Meta = Meta0#{default_queue_type => NQT},

M#{default_queue_type => NQT, metadata => Meta}.

-spec vhosts_with_dqt([any()]) -> [map()].
vhosts_with_dqt(List) when is_list(List) ->
%% inject DQT (default queue type) at the top level and
%% its metadata
lists:map(fun inject_dqt/1, List).

-spec check_queue_limits(amqqueue:amqqueue()) ->
ok |
{error, queue_limit_exceeded, Reason :: string(), Args :: term()}.
Expand Down
22 changes: 17 additions & 5 deletions deps/rabbit/src/rabbit_vhost.erl
Original file line number Diff line number Diff line change
Expand Up @@ -511,13 +511,14 @@ default_queue_type(VirtualHost) ->
default_queue_type(VirtualHost, rabbit_queue_type:fallback()).
-spec default_queue_type(VirtualHost :: vhost:name(), Fallback :: rabbit_queue_type:queue_type()) -> rabbit_queue_type:queue_type().
default_queue_type(VirtualHost, FallbackQueueType) ->
NodeDefault = application:get_env(rabbit, default_queue_type, FallbackQueueType),
case exists(VirtualHost) of
false -> FallbackQueueType;
false -> NodeDefault;
true ->
Record = lookup(VirtualHost),
case vhost:get_default_queue_type(Record) of
undefined -> FallbackQueueType;
<<"undefined">> -> FallbackQueueType;
undefined -> NodeDefault;
<<"undefined">> -> NodeDefault;
Type -> Type
end
end.
Expand Down Expand Up @@ -622,8 +623,19 @@ i(tracing, VHost) -> rabbit_trace:enabled(vhost:get_name(VHost));
i(cluster_state, VHost) -> vhost_cluster_state(vhost:get_name(VHost));
i(description, VHost) -> vhost:get_description(VHost);
i(tags, VHost) -> vhost:get_tags(VHost);
i(default_queue_type, VHost) -> vhost:get_default_queue_type(VHost);
i(metadata, VHost) -> vhost:get_metadata(VHost);
i(default_queue_type, VHost) -> rabbit_queue_type:short_alias_of(default_queue_type(vhost:get_name(VHost)));
i(metadata, VHost) ->
DQT = rabbit_queue_type:short_alias_of(default_queue_type(vhost:get_name(VHost))),
case vhost:get_metadata(VHost) of
undefined ->
#{default_queue_type => DQT};
M = #{default_queue_type := undefined} ->
M#{default_queue_type => DQT};
M = #{default_queue_type := QT} ->
M#{default_queue_type => rabbit_queue_type:short_alias_of(QT)};
M when is_map(M) ->
M#{default_queue_type => DQT}
end;
i(Item, VHost) ->
rabbit_log:error("Don't know how to compute a virtual host info item '~ts' for virtual host '~tp'", [Item, VHost]),
throw({bad_argument, Item}).
Expand Down
15 changes: 14 additions & 1 deletion deps/rabbit/src/vhost.erl
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,14 @@
get_description/1,
get_tags/1,
get_default_queue_type/1,

set_limits/2,
set_metadata/2,
merge_metadata/2,
new_metadata/3,
is_tagged_with/2
is_tagged_with/2,

to_map/1
]).

-define(record_version, vhost_v2).
Expand Down Expand Up @@ -196,3 +199,13 @@ new_metadata(Description, Tags, DefaultQueueType) ->
-spec is_tagged_with(vhost(), tag()) -> boolean().
is_tagged_with(VHost, Tag) ->
lists:member(Tag, get_tags(VHost)).

-spec to_map(vhost()) -> map().
to_map(VHost) ->
#{
name => get_name(VHost),
description => get_description(VHost),
tags => get_tags(VHost),
default_queue_type => get_default_queue_type(VHost),
metadata => get_metadata(VHost)
}.
7 changes: 6 additions & 1 deletion deps/rabbitmq_management/src/rabbit_mgmt_wm_definitions.erl
Original file line number Diff line number Diff line change
Expand Up @@ -271,7 +271,12 @@ retain_whitelisted(Items) ->
retain_whitelisted_items(Name, List, Allowed) ->
{Name, [only_whitelisted_for_item(I, Allowed) || I <- List]}.

only_whitelisted_for_item(Item, Allowed) ->
only_whitelisted_for_item(Item, Allowed) when is_map(Item) ->
Map1 = maps:with(Allowed, Item),
maps:filter(fun(_Key, Val) ->
Val =/= undefined
end, Map1);
only_whitelisted_for_item(Item, Allowed) when is_list(Item) ->
[{K, Fact} || {K, Fact} <- Item, lists:member(K, Allowed), Fact =/= undefined].

strip_vhost(Item) ->
Expand Down
14 changes: 10 additions & 4 deletions deps/rabbitmq_management/src/rabbit_mgmt_wm_vhosts.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,9 +31,12 @@ to_json(ReqData, Context = #context{user = User}) ->
try
Basic = [rabbit_vhost:info(V)
|| V <- rabbit_mgmt_util:list_visible_vhosts(User)],
Data = rabbit_mgmt_util:augment_resources(Basic, ?DEFAULT_SORT,
?BASIC_COLUMNS, ReqData,
Context, fun augment/2),
Augmented = rabbit_mgmt_util:augment_resources(Basic, ?DEFAULT_SORT,
?BASIC_COLUMNS, ReqData,
Context, fun augment/2),
%% inject default DQT into virtual host metadata,
%% where necessary
Data = rabbit_queue_type:vhosts_with_dqt(Augmented),
rabbit_mgmt_util:reply(Data, ReqData, Context)
catch
{error, invalid_range_parameters, Reason} ->
Expand Down Expand Up @@ -64,4 +67,7 @@ augmented(ReqData, #context{user = User}) ->
end.

basic() ->
rabbit_vhost:info_all([name, description, tags, default_queue_type, metadata]).
Maps = lists:map(
fun maps:from_list/1,
rabbit_vhost:info_all([name, description, tags, default_queue_type, metadata])),
rabbit_queue_type:vhosts_with_dqt(Maps).

0 comments on commit 6746fbf

Please sign in to comment.