Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Per vhost message store #766

Merged
merged 55 commits into from
Dec 24, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
abaa770
message store supervisor
Mar 10, 2016
80436df
Starting and restarting app
Mar 10, 2016
ec15e97
Migration algorithm
Apr 6, 2016
d1bffd0
Work in progress: Migration to vhost based message store
Apr 19, 2016
b3fc71a
wip
Apr 19, 2016
492e23b
wip: migrating to vhost base messge store
Apr 20, 2016
325e2fc
New upgrade scope `queues` started by boot step before queue recovery
Apr 21, 2016
d1f4a9d
Remove duplicate migration. Match no_ack message
Apr 21, 2016
93ba24b
Moved directory selection to vhost_dir function
Apr 21, 2016
460abf2
Update tests
Jun 2, 2016
fdd7b87
Updated tests to support per-vhost message store
Jun 2, 2016
e6c76fd
Queue index per vhost location
Oct 19, 2016
4f12ebe
Tests for per-vhost message store. Stop vhost message store when dele…
Oct 19, 2016
fd4db66
Terminate message store ignoring file save errors
Oct 19, 2016
8369950
Allow restart of a vhost message store. purge_messages function to cl…
Oct 19, 2016
1c81c50
Move queues to vhost storage on upgrade
Oct 20, 2016
8a45f40
Migration to per-vhost store message
Oct 20, 2016
36f3e67
Purge vhost storage in transaction
Oct 20, 2016
008c4eb
Rename an upgrade function
michaelklishin Nov 9, 2016
4837af6
Merge branch 'master' into rabbitmq-server-567
michaelklishin Nov 10, 2016
05e9dbe
Naming and cosmetics
michaelklishin Nov 11, 2016
0b37027
Merge branch 'master' into rabbitmq-server-567
Nov 11, 2016
cc75e56
Avoid atom exhaustion in vhost message stores
Nov 11, 2016
eea4fc3
Use a fixed size encoding fn for vhost directory names
michaelklishin Nov 11, 2016
1643c34
Renames
michaelklishin Nov 11, 2016
460d413
We get a tuple from ETS here, not just a pid
michaelklishin Nov 11, 2016
245fd08
Fix test expectations, more renames, more logging
michaelklishin Nov 12, 2016
7a84a24
Towards functional message store migration fn
michaelklishin Nov 12, 2016
f6feb6b
Cosmetics
michaelklishin Nov 12, 2016
c135b70
New dir path for queue index data
Nov 14, 2016
3542d80
Merge branch 'master' into rabbitmq-server-567
Nov 23, 2016
c3ae581
Rollback to sequential queue migration. Move queue directories before…
Nov 24, 2016
944e0f4
Fix vhost dir location in test
Nov 24, 2016
03bdc78
Keep backup between mnesia upgrade and message store upgrade
Nov 28, 2016
5545152
Migrate queues in batches of 100 in parallel. Write recovery terms fo…
Nov 28, 2016
475e86b
Merge branch 'master' into rabbitmq-server-567
Nov 29, 2016
cfc9a88
Write per queue upgrade log to a separate file
Nov 30, 2016
f43990d
Make queue migration batch size configurable
Nov 30, 2016
321abd4
Merge branch 'master' into rabbitmq-server-567
dcorbacho Dec 13, 2016
806cd80
Replace dying_client_index and dying_clients set with a map
Dec 16, 2016
8a98bdb
Merge branch 'master' into rabbitmq-server-567
michaelklishin Dec 16, 2016
3d56870
fix typo
Dec 20, 2016
5f7553c
Merge branch 'master' into rabbitmq-server-567
Dec 20, 2016
308a2b5
Group recovery client refs by vhost
Dec 20, 2016
1ec2ac8
Pass queues and recovery terms to fold
Dec 20, 2016
eafad5a
Refs can be undefined
Dec 20, 2016
569d114
change error function with rabbit_log:error
Dec 21, 2016
e5557ad
Fix msg_store test
Dec 21, 2016
1403596
Force recover client references to be maps. Start empty message store…
Dec 21, 2016
5282534
Recover vhosts message stores in parallel after a crash
Dec 22, 2016
79685d0
Recover queues after non-clean shutdown
Dec 22, 2016
83d3222
Fix logs. Expect multiple logs during startup
Dec 23, 2016
12901fb
Revert to syncronous vhost recovery since there are concurency limita…
Dec 23, 2016
bcff954
Merge branch 'master' into rabbitmq-server-567
michaelklishin Dec 23, 2016
1d4e939
Don't log #resource records
michaelklishin Dec 23, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions scripts/rabbitmq-env
Original file line number Diff line number Diff line change
Expand Up @@ -236,9 +236,11 @@ rmq_normalize_path_var RABBITMQ_PLUGINS_DIR
[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS=${LOGS}
[ "x" != "x$RABBITMQ_LOGS" ] && export RABBITMQ_LOGS_source=environment
[ "x" = "x$RABBITMQ_LOGS" ] && RABBITMQ_LOGS="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}.log"
[ "x" = "x$RABBITMQ_UPGRADE_LOG" ] && RABBITMQ_UPGRADE_LOG="${RABBITMQ_LOG_BASE}/${RABBITMQ_NODENAME}_upgrade.log"

rmq_normalize_path_var \
RABBITMQ_LOGS
rmq_normalize_path_var RABBITMQ_LOGS

rmq_normalize_path_var RABBITMQ_UPGRADE_LOG

[ "x" = "x$RABBITMQ_CTL_ERL_ARGS" ] && RABBITMQ_CTL_ERL_ARGS=${CTL_ERL_ARGS}

Expand All @@ -254,7 +256,8 @@ rmq_check_if_shared_with_mnesia \
RABBITMQ_PLUGINS_EXPAND_DIR \
RABBITMQ_ENABLED_PLUGINS_FILE \
RABBITMQ_PLUGINS_DIR \
RABBITMQ_LOGS
RABBITMQ_LOGS \
RABBITMQ_UPGRADE_LOG

##--- End of overridden <var_name> variables

Expand Down
3 changes: 3 additions & 0 deletions scripts/rabbitmq-server
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,11 @@ RABBITMQ_LISTEN_ARG=
if [ "$RABBITMQ_LOGS" = '-' ]; then
SASL_ERROR_LOGGER=tty
RABBIT_LAGER_HANDLER=tty
RABBITMQ_LAGER_HANDLER_UPGRADE=tty
else
SASL_ERROR_LOGGER=false
RABBIT_LAGER_HANDLER='"'${RABBITMQ_LOGS}'"'
RABBITMQ_LAGER_HANDLER_UPGRADE='"'${RABBITMQ_UPGRADE_LOG}'"'
fi

# Bump ETS table limit to 50000
Expand Down Expand Up @@ -216,6 +218,7 @@ start_rabbitmq_server() {
-sasl sasl_error_logger "$SASL_ERROR_LOGGER" \
-rabbit lager_log_root "\"$RABBITMQ_LOG_BASE\"" \
-rabbit lager_handler "$RABBIT_LAGER_HANDLER" \
-rabbit lager_handler_upgrade "$RABBITMQ_LAGER_HANDLER_UPGRADE" \
-rabbit enabled_plugins_file "\"$RABBITMQ_ENABLED_PLUGINS_FILE\"" \
-rabbit plugins_dir "\"$RABBITMQ_PLUGINS_DIR\"" \
-rabbit plugins_expand_dir "\"$RABBITMQ_PLUGINS_EXPAND_DIR\"" \
Expand Down
8 changes: 8 additions & 0 deletions src/rabbit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,14 @@
{requires, core_initialized},
{enables, routing_ready}]}).

-rabbit_boot_step({upgrade_queues,
[{description, "per-vhost message store migration"},
{mfa, {rabbit_upgrade,
maybe_migrate_queues_to_per_vhost_storage,
[]}},
{requires, [core_initialized]},
{enables, recovery}]}).

-rabbit_boot_step({recovery,
[{description, "exchange, queue and binding recovery"},
{mfa, {rabbit, recover, []}},
Expand Down
19 changes: 13 additions & 6 deletions src/rabbit_lager.erl
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ configure_lager() ->
%% messages to the default sink. To know the list of expected extra
%% sinks, we look at the 'lager_extra_sinks' compilation option.
Sinks0 = application:get_env(lager, extra_sinks, []),
Sinks1 = configure_extra_sinks(Sinks0,
Sinks1 = configure_extra_sinks(Sinks0,
[error_logger | list_expected_sinks()]),
%% TODO Waiting for basho/lager#303
%% Sinks2 = lists:keystore(error_logger_lager_event, 1, Sinks1,
Expand All @@ -231,18 +231,25 @@ configure_lager() ->
configure_extra_sinks(Sinks, [SinkName | Rest]) ->
Sink0 = proplists:get_value(SinkName, Sinks, []),
Sink1 = case proplists:is_defined(handlers, Sink0) of
false -> lists:keystore(handlers, 1, Sink0,
{handlers,
[{lager_forwarder_backend,
lager_util:make_internal_sink_name(lager)
}]});
false -> default_sink_config(SinkName, Sink0);
true -> Sink0
end,
Sinks1 = lists:keystore(SinkName, 1, Sinks, {SinkName, Sink1}),
configure_extra_sinks(Sinks1, Rest);
configure_extra_sinks(Sinks, []) ->
Sinks.

default_sink_config(rabbit_log_upgrade_lager_event, Sink) ->
Handlers = lager_handlers(application:get_env(rabbit,
lager_handler_upgrade,
tty)),
lists:keystore(handlers, 1, Sink, {handlers, Handlers});
default_sink_config(_, Sink) ->
lists:keystore(handlers, 1, Sink,
{handlers,
[{lager_forwarder_backend,
lager_util:make_internal_sink_name(lager)}]}).

list_expected_sinks() ->
case application:get_env(rabbit, lager_extra_sinks) of
{ok, List} ->
Expand Down
1 change: 1 addition & 0 deletions src/rabbit_log.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,7 @@ make_internal_sink_name(rabbit_log_channel) -> rabbit_log_channel_lager_event;
make_internal_sink_name(rabbit_log_mirroring) -> rabbit_log_mirroring_lager_event;
make_internal_sink_name(rabbit_log_queue) -> rabbit_log_queue_lager_event;
make_internal_sink_name(rabbit_log_federation) -> rabbit_log_federation_lager_event;
make_internal_sink_name(rabbit_log_upgrade) -> rabbit_log_upgrade_lager_event;
make_internal_sink_name(Category) ->
lager_util:make_internal_sink_name(Category).

Expand Down
105 changes: 57 additions & 48 deletions src/rabbit_msg_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@

-behaviour(gen_server2).

-export([start_link/4, successfully_recovered_state/1,
-export([start_link/4, start_global_store_link/4, successfully_recovered_state/1,
client_init/4, client_terminate/1, client_delete_and_terminate/1,
client_ref/1, close_all_indicated/1,
write/3, write_flow/3, read/2, contains/2, remove/2]).
Expand Down Expand Up @@ -63,7 +63,7 @@
%% the module for index ops,
%% rabbit_msg_store_ets_index by default
index_module,
%% %% where are messages?
%% where are messages?
index_state,
%% current file name as number
current_file,
Expand Down Expand Up @@ -91,8 +91,6 @@
flying_ets,
%% set of dying clients
dying_clients,
%% index of file positions for client death messages
dying_client_index,
%% map of references of all registered clients
%% to callbacks
clients,
Expand Down Expand Up @@ -474,15 +472,20 @@
%% public API
%%----------------------------------------------------------------------------

start_link(Server, Dir, ClientRefs, StartupFunState) ->
gen_server2:start_link({local, Server}, ?MODULE,
[Server, Dir, ClientRefs, StartupFunState],
start_link(Name, Dir, ClientRefs, StartupFunState) when is_atom(Name) ->
gen_server2:start_link(?MODULE,
[Name, Dir, ClientRefs, StartupFunState],
[{timeout, infinity}]).

start_global_store_link(Name, Dir, ClientRefs, StartupFunState) when is_atom(Name) ->
gen_server2:start_link({local, Name}, ?MODULE,
[Name, Dir, ClientRefs, StartupFunState],
[{timeout, infinity}]).

successfully_recovered_state(Server) ->
gen_server2:call(Server, successfully_recovered_state, infinity).

client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) ->
client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun) when is_pid(Server); is_atom(Server) ->
{IState, IModule, Dir, GCPid,
FileHandlesEts, FileSummaryEts, CurFileCacheEts, FlyingEts} =
gen_server2:call(
Expand Down Expand Up @@ -522,7 +525,7 @@ write_flow(MsgId, Msg,
%% rabbit_amqqueue_process process via the
%% rabbit_variable_queue. We are accessing the
%% rabbit_amqqueue_process process dictionary.
credit_flow:send(whereis(Server), CreditDiscBound),
credit_flow:send(Server, CreditDiscBound),
client_write(MsgId, Msg, flow, CState).

write(MsgId, Msg, CState) -> client_write(MsgId, Msg, noflow, CState).
Expand All @@ -548,7 +551,7 @@ remove(MsgIds, CState = #client_msstate { client_ref = CRef }) ->
[client_update_flying(-1, MsgId, CState) || MsgId <- MsgIds],
server_cast(CState, {remove, CRef, MsgIds}).

set_maximum_since_use(Server, Age) ->
set_maximum_since_use(Server, Age) when is_pid(Server); is_atom(Server) ->
gen_server2:cast(Server, {set_maximum_since_use, Age}).

%%----------------------------------------------------------------------------
Expand Down Expand Up @@ -699,27 +702,25 @@ client_update_flying(Diff, MsgId, #client_msstate { flying_ets = FlyingEts,
end.

clear_client(CRef, State = #msstate { cref_to_msg_ids = CTM,
dying_clients = DyingClients,
dying_client_index = DyingIndex }) ->
ets:delete(DyingIndex, CRef),
dying_clients = DyingClients }) ->
State #msstate { cref_to_msg_ids = dict:erase(CRef, CTM),
dying_clients = sets:del_element(CRef, DyingClients) }.
dying_clients = maps:remove(CRef, DyingClients) }.


%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------

init([Server, BaseDir, ClientRefs, StartupFunState]) ->
init([Name, BaseDir, ClientRefs, StartupFunState]) ->
process_flag(trap_exit, true),

ok = file_handle_cache:register_callback(?MODULE, set_maximum_since_use,
[self()]),

Dir = filename:join(BaseDir, atom_to_list(Server)),
Dir = filename:join(BaseDir, atom_to_list(Name)),

{ok, IndexModule} = application:get_env(msg_store_index_module),
rabbit_log:info("~w: using ~p to provide index~n", [Server, IndexModule]),
{ok, IndexModule} = application:get_env(rabbit, msg_store_index_module),
rabbit_log:info("~tp: using ~p to provide index~n", [Dir, IndexModule]),

AttemptFileSummaryRecovery =
case ClientRefs of
Expand All @@ -738,7 +739,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->

{CleanShutdown, IndexState, ClientRefs1} =
recover_index_and_client_refs(IndexModule, FileSummaryRecovered,
ClientRefs, Dir, Server),
ClientRefs, Dir),
Clients = dict:from_list(
[{CRef, {undefined, undefined, undefined}} ||
CRef <- ClientRefs1]),
Expand All @@ -755,10 +756,8 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
[ordered_set, public]),
CurFileCacheEts = ets:new(rabbit_msg_store_cur_file, [set, public]),
FlyingEts = ets:new(rabbit_msg_store_flying, [set, public]),
DyingIndex = ets:new(rabbit_msg_store_dying_client_index,
[set, public, {keypos, #dying_client.client_ref}]),

{ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit),
{ok, FileSizeLimit} = application:get_env(rabbit, msg_store_file_size_limit),

{ok, GCPid} = rabbit_msg_store_gc:start_link(
#gc_state { dir = Dir,
Expand Down Expand Up @@ -787,8 +786,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
file_summary_ets = FileSummaryEts,
cur_file_cache_ets = CurFileCacheEts,
flying_ets = FlyingEts,
dying_clients = sets:new(),
dying_client_index = DyingIndex,
dying_clients = #{},
clients = Clients,
successfully_recovered = CleanShutdown,
file_size_limit = FileSizeLimit,
Expand Down Expand Up @@ -866,14 +864,14 @@ handle_call({contains, MsgId}, From, State) ->

handle_cast({client_dying, CRef},
State = #msstate { dying_clients = DyingClients,
dying_client_index = DyingIndex,
current_file_handle = CurHdl,
current_file = CurFile }) ->
DyingClients1 = sets:add_element(CRef, DyingClients),
{ok, CurOffset} = file_handle_cache:current_virtual_offset(CurHdl),
true = ets:insert_new(DyingIndex, #dying_client{client_ref = CRef,
file = CurFile,
offset = CurOffset}),
DyingClients1 = maps:put(CRef,
#dying_client{client_ref = CRef,
file = CurFile,
offset = CurOffset},
DyingClients),
noreply(State #msstate { dying_clients = DyingClients1 });

handle_cast({client_delete, CRef},
Expand Down Expand Up @@ -995,12 +993,25 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
State2
end,
State3 = close_all_handles(State1),
ok = store_file_summary(FileSummaryEts, Dir),
case store_file_summary(FileSummaryEts, Dir) of
ok -> ok;
{error, FSErr} ->
rabbit_log:error("Unable to store file summary"
" for vhost message store for directory ~p~n"
"Error: ~p~n",
[Dir, FSErr])
end,
[true = ets:delete(T) || T <- [FileSummaryEts, FileHandlesEts,
CurFileCacheEts, FlyingEts]],
IndexModule:terminate(IndexState),
ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
{index_module, IndexModule}], Dir),
case store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
{index_module, IndexModule}], Dir) of
ok -> ok;
{error, RTErr} ->
rabbit_log:error("Unable to save message store recovery terms"
"for directory ~p~nError: ~p~n",
[Dir, RTErr])
end,
State3 #msstate { index_state = undefined,
current_file_handle = undefined }.

Expand Down Expand Up @@ -1357,17 +1368,15 @@ blind_confirm(CRef, MsgIds, ActionTaken, State) ->
%% msg and thus should be ignored. Note that this (correctly) returns
%% false when testing to remove the death msg itself.
should_mask_action(CRef, MsgId,
State = #msstate { dying_clients = DyingClients,
dying_client_index = DyingIndex }) ->
case {sets:is_element(CRef, DyingClients), index_lookup(MsgId, State)} of
{false, Location} ->
State = #msstate{dying_clients = DyingClients}) ->
case {maps:find(CRef, DyingClients), index_lookup(MsgId, State)} of
{error, Location} ->
{false, Location};
{true, not_found} ->
{{ok, _}, not_found} ->
{true, not_found};
{true, #msg_location { file = File, offset = Offset,
ref_count = RefCount } = Location} ->
[#dying_client { file = DeathFile, offset = DeathOffset }] =
ets:lookup(DyingIndex, CRef),
{{ok, Client}, #msg_location { file = File, offset = Offset,
ref_count = RefCount } = Location} ->
#dying_client{file = DeathFile, offset = DeathOffset} = Client,
{case {{DeathFile, DeathOffset} < {File, Offset}, RefCount} of
{true, _} -> true;
{false, 0} -> false_if_increment;
Expand Down Expand Up @@ -1538,16 +1547,16 @@ index_delete_by_file(File, #msstate { index_module = Index,
%% shutdown and recovery
%%----------------------------------------------------------------------------

recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir, _Server) ->
recover_index_and_client_refs(IndexModule, _Recover, undefined, Dir) ->
{false, IndexModule:new(Dir), []};
recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir, Server) ->
rabbit_log:warning("~w: rebuilding indices from scratch~n", [Server]),
recover_index_and_client_refs(IndexModule, false, _ClientRefs, Dir) ->
rabbit_log:warning("~tp : rebuilding indices from scratch~n", [Dir]),
{false, IndexModule:new(Dir), []};
recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir, Server) ->
recover_index_and_client_refs(IndexModule, true, ClientRefs, Dir) ->
Fresh = fun (ErrorMsg, ErrorArgs) ->
rabbit_log:warning("~w: " ++ ErrorMsg ++ "~n"
rabbit_log:warning("~tp : " ++ ErrorMsg ++ "~n"
"rebuilding indices from scratch~n",
[Server | ErrorArgs]),
[Dir | ErrorArgs]),
{false, IndexModule:new(Dir), []}
end,
case read_recovery_terms(Dir) of
Expand Down Expand Up @@ -1582,7 +1591,7 @@ read_recovery_terms(Dir) ->
end.

store_file_summary(Tid, Dir) ->
ok = ets:tab2file(Tid, filename:join(Dir, ?FILE_SUMMARY_FILENAME),
ets:tab2file(Tid, filename:join(Dir, ?FILE_SUMMARY_FILENAME),
[{extended_info, [object_count]}]).

recover_file_summary(false, _Dir) ->
Expand Down
10 changes: 8 additions & 2 deletions src/rabbit_msg_store_ets_index.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ delete_by_file(File, State) ->
ok.

terminate(#state { table = MsgLocations, dir = Dir }) ->
ok = ets:tab2file(MsgLocations, filename:join(Dir, ?FILENAME),
[{extended_info, [object_count]}]),
case ets:tab2file(MsgLocations, filename:join(Dir, ?FILENAME),
[{extended_info, [object_count]}]) of
ok -> ok;
{error, Err} ->
rabbit_log:error("Unable to save message store index"
" for directory ~p.~nError: ~p~n",
[Dir, Err])
end,
ets:delete(MsgLocations).
Loading