Skip to content

Commit

Permalink
Merge pull request #611 from rabbitmq/rabbitmq-server-590
Browse files Browse the repository at this point in the history
Auto deleted queue log
  • Loading branch information
michaelklishin committed Feb 5, 2016
2 parents 831a60f + e9ee125 commit 2374ae8
Show file tree
Hide file tree
Showing 4 changed files with 56 additions and 6 deletions.
2 changes: 0 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,6 @@ PACKAGES_DIR ?= $(abspath PACKAGES)

DEPS = ranch lager $(PLUGINS)

dep_lager = git https://github.com/rabbitmq/lager.git master

define usage_xml_to_erl
$(subst __,_,$(patsubst $(DOCS_DIR)/rabbitmq%.1.xml, src/rabbit_%_usage.erl, $(subst -,_,$(1))))
endef
Expand Down
45 changes: 43 additions & 2 deletions src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,7 @@ init_it(Recover, From, State = #q{q = #amqqueue{exclusive_owner = Owner}}) ->
{_, Terms} = recovery_status(Recover),
BQS = bq_init(BQ, Q, Terms),
%% Rely on terminate to delete the queue.
log_delete_exclusive(Owner, State),
{stop, {shutdown, missing_owner},
State#q{backing_queue = BQ, backing_queue_state = BQS}}
end.
Expand Down Expand Up @@ -701,7 +702,13 @@ handle_ch_down(DownPid, State = #q{consumers = Consumers,
exclusive_consumer = Holder1},
notify_decorators(State2),
case should_auto_delete(State2) of
true -> {stop, State2};
true ->
log_auto_delete(
io_lib:format(
"because all of its consumers (~p) were on a channel that was closed",
[length(ChCTags)]),
State),
{stop, State2};
false -> {ok, requeue_and_run(ChAckTags,
ensure_expiry_timer(State2))}
end
Expand Down Expand Up @@ -939,6 +946,7 @@ prioritise_call(Msg, _From, _Len, State) ->
prioritise_cast(Msg, _Len, State) ->
case Msg of
delete_immediately -> 8;
{delete_exclusive, _Pid} -> 8;
{set_ram_duration_target, _Duration} -> 8;
{set_maximum_since_use, _Age} -> 8;
{run_backing_queue, _Mod, _Fun} -> 6;
Expand Down Expand Up @@ -1063,7 +1071,13 @@ handle_call({basic_cancel, ChPid, ConsumerTag, OkMsg}, _From,
notify_decorators(State1),
case should_auto_delete(State1) of
false -> reply(ok, ensure_expiry_timer(State1));
true -> stop(ok, State1)
true ->
log_auto_delete(
io_lib:format(
"because its last consumer with tag '~s' was cancelled",
[ConsumerTag]),
State),
stop(ok, State1)
end
end;

Expand Down Expand Up @@ -1165,6 +1179,10 @@ handle_cast({reject, false, AckTags, ChPid}, State) ->
end) end,
fun () -> ack(AckTags, ChPid, State) end));

handle_cast({delete_exclusive, ConnPid}, State) ->
log_delete_exclusive(ConnPid, State),
stop(State);

handle_cast(delete_immediately, State) ->
stop(State);

Expand Down Expand Up @@ -1284,6 +1302,7 @@ handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason},
%% match what people expect (see bug 21824). However we need this
%% monitor-and-async- delete in case the connection goes away
%% unexpectedly.
log_delete_exclusive(DownPid, State),
stop(State);

handle_info({'DOWN', _MonitorRef, process, DownPid, _Reason}, State) ->
Expand Down Expand Up @@ -1347,3 +1366,25 @@ handle_pre_hibernate(State = #q{backing_queue = BQ,
{hibernate, stop_rate_timer(State1)}.

format_message_queue(Opt, MQ) -> rabbit_misc:format_message_queue(Opt, MQ).

log_delete_exclusive({ConPid, ConRef}, State) ->
log_delete_exclusive(ConPid, State);
log_delete_exclusive(ConPid, #q{ q = #amqqueue{ name = Resource } }) ->
#resource{ name = QName, virtual_host = VHost } = Resource,
rabbit_queue:debug("Deleting exclusive queue '~s' in vhost '~s' " ++
" because its declaring connection ~p was closed",
[QName, VHost, ConPid]).

log_auto_delete(Reason, #q{ q = #amqqueue{ name = Resource } }) ->
#resource{ name = QName, virtual_host = VHost } = Resource,
rabbit_queue:debug("Deleting auto-delete queue '~s' in vhost '~s' " ++
Reason,
[QName, VHost]).








3 changes: 2 additions & 1 deletion src/rabbit_lager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,8 @@ configure_lager() ->
%% messages to the default sink. To know the list of expected extra
%% sinks, we look at the 'lager_extra_sinks' compilation option.
Sinks0 = application:get_env(lager, extra_sinks, []),
Sinks1 = configure_extra_sinks(Sinks0, list_expected_sinks()),
Sinks1 = configure_extra_sinks(Sinks0,
[error_logger | list_expected_sinks()]),
%% TODO Waiting for basho/lager#303
%% Sinks2 = lists:keystore(error_logger_lager_event, 1, Sinks1,
%% {error_logger_lager_event,
Expand Down
12 changes: 11 additions & 1 deletion src/rabbit_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,20 @@ log(Category, Level, Fmt) -> log(Category, Level, Fmt, []).
log(Category, Level, Fmt, Args) when is_list(Args) ->
Sink = case Category of
default -> ?LAGER_SINK;
_ -> lager_util:make_internal_sink_name(Category)
_ -> make_internal_sink_name(Category)
end,
lager:log(Sink, Level, self(), Fmt, Args).

make_internal_sink_name(Category) when Category == channel;
Category == connection;
Category == mirroring;
Category == queue;
Category == federation ->
lager_util:make_internal_sink_name(list_to_atom("rabbit_" ++
atom_to_list(Category)));
make_internal_sink_name(Category) ->
lager_util:make_internal_sink_name(Category).

debug(Format) -> debug(Format, []).
debug(Format, Args) -> debug(self(), Format, Args).
debug(Metadata, Format, Args) ->
Expand Down

0 comments on commit 2374ae8

Please sign in to comment.