Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Per vhost message store #766

Merged
merged 55 commits into from
Dec 24, 2016
Merged
Show file tree
Hide file tree
Changes from 20 commits
Commits
Show all changes
55 commits
Select commit Hold shift + click to select a range
abaa770
message store supervisor
Mar 10, 2016
80436df
Starting and restarting app
Mar 10, 2016
ec15e97
Migration algorithm
Apr 6, 2016
d1bffd0
Work in progress: Migration to vhost based message store
Apr 19, 2016
b3fc71a
wip
Apr 19, 2016
492e23b
wip: migrating to vhost base messge store
Apr 20, 2016
325e2fc
New upgrade scope `queues` started by boot step before queue recovery
Apr 21, 2016
d1f4a9d
Remove duplicate migration. Match no_ack message
Apr 21, 2016
93ba24b
Moved directory selection to vhost_dir function
Apr 21, 2016
460abf2
Update tests
Jun 2, 2016
fdd7b87
Updated tests to support per-vhost message store
Jun 2, 2016
e6c76fd
Queue index per vhost location
Oct 19, 2016
4f12ebe
Tests for per-vhost message store. Stop vhost message store when dele…
Oct 19, 2016
fd4db66
Terminate message store ignoring file save errors
Oct 19, 2016
8369950
Allow restart of a vhost message store. purge_messages function to cl…
Oct 19, 2016
1c81c50
Move queues to vhost storage on upgrade
Oct 20, 2016
8a45f40
Migration to per-vhost store message
Oct 20, 2016
36f3e67
Purge vhost storage in transaction
Oct 20, 2016
008c4eb
Rename an upgrade function
michaelklishin Nov 9, 2016
4837af6
Merge branch 'master' into rabbitmq-server-567
michaelklishin Nov 10, 2016
05e9dbe
Naming and cosmetics
michaelklishin Nov 11, 2016
0b37027
Merge branch 'master' into rabbitmq-server-567
Nov 11, 2016
cc75e56
Avoid atom exhaustion in vhost message stores
Nov 11, 2016
eea4fc3
Use a fixed size encoding fn for vhost directory names
michaelklishin Nov 11, 2016
1643c34
Renames
michaelklishin Nov 11, 2016
460d413
We get a tuple from ETS here, not just a pid
michaelklishin Nov 11, 2016
245fd08
Fix test expectations, more renames, more logging
michaelklishin Nov 12, 2016
7a84a24
Towards functional message store migration fn
michaelklishin Nov 12, 2016
f6feb6b
Cosmetics
michaelklishin Nov 12, 2016
c135b70
New dir path for queue index data
Nov 14, 2016
3542d80
Merge branch 'master' into rabbitmq-server-567
Nov 23, 2016
c3ae581
Rollback to sequential queue migration. Move queue directories before…
Nov 24, 2016
944e0f4
Fix vhost dir location in test
Nov 24, 2016
03bdc78
Keep backup between mnesia upgrade and message store upgrade
Nov 28, 2016
5545152
Migrate queues in batches of 100 in parallel. Write recovery terms fo…
Nov 28, 2016
475e86b
Merge branch 'master' into rabbitmq-server-567
Nov 29, 2016
cfc9a88
Write per queue upgrade log to a separate file
Nov 30, 2016
f43990d
Make queue migration batch size configurable
Nov 30, 2016
321abd4
Merge branch 'master' into rabbitmq-server-567
dcorbacho Dec 13, 2016
806cd80
Replace dying_client_index and dying_clients set with a map
Dec 16, 2016
8a98bdb
Merge branch 'master' into rabbitmq-server-567
michaelklishin Dec 16, 2016
3d56870
fix typo
Dec 20, 2016
5f7553c
Merge branch 'master' into rabbitmq-server-567
Dec 20, 2016
308a2b5
Group recovery client refs by vhost
Dec 20, 2016
1ec2ac8
Pass queues and recovery terms to fold
Dec 20, 2016
eafad5a
Refs can be undefined
Dec 20, 2016
569d114
change error function with rabbit_log:error
Dec 21, 2016
e5557ad
Fix msg_store test
Dec 21, 2016
1403596
Force recover client references to be maps. Start empty message store…
Dec 21, 2016
5282534
Recover vhosts message stores in parallel after a crash
Dec 22, 2016
79685d0
Recover queues after non-clean shutdown
Dec 22, 2016
83d3222
Fix logs. Expect multiple logs during startup
Dec 23, 2016
12901fb
Revert to syncronous vhost recovery since there are concurency limita…
Dec 23, 2016
bcff954
Merge branch 'master' into rabbitmq-server-567
michaelklishin Dec 23, 2016
1d4e939
Don't log #resource records
michaelklishin Dec 23, 2016
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/rabbit.erl
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,14 @@
{requires, core_initialized},
{enables, routing_ready}]}).

-rabbit_boot_step({upgrade_queues,
[{description, "per-vhost message store migration"},
{mfa, {rabbit_upgrade,
maybe_migrate_queues_to_per_vhost_storage,
[]}},
{requires, [core_initialized]},
{enables, recovery}]}).

-rabbit_boot_step({recovery,
[{description, "exchange, queue and binding recovery"},
{mfa, {rabbit, recover, []}},
Expand Down
26 changes: 20 additions & 6 deletions src/rabbit_msg_store.erl
Original file line number Diff line number Diff line change
Expand Up @@ -718,7 +718,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->

Dir = filename:join(BaseDir, atom_to_list(Server)),

{ok, IndexModule} = application:get_env(msg_store_index_module),
{ok, IndexModule} = application:get_env(rabbit,msg_store_index_module),
rabbit_log:info("~w: using ~p to provide index~n", [Server, IndexModule]),

AttemptFileSummaryRecovery =
Expand Down Expand Up @@ -758,7 +758,7 @@ init([Server, BaseDir, ClientRefs, StartupFunState]) ->
DyingIndex = ets:new(rabbit_msg_store_dying_client_index,
[set, public, {keypos, #dying_client.client_ref}]),

{ok, FileSizeLimit} = application:get_env(msg_store_file_size_limit),
{ok, FileSizeLimit} = application:get_env(rabbit,msg_store_file_size_limit),

{ok, GCPid} = rabbit_msg_store_gc:start_link(
#gc_state { dir = Dir,
Expand Down Expand Up @@ -995,12 +995,26 @@ terminate(_Reason, State = #msstate { index_state = IndexState,
State2
end,
State3 = close_all_handles(State1),
ok = store_file_summary(FileSummaryEts, Dir),
%% Let file summary saving fail.
case store_file_summary(FileSummaryEts, Dir) of
ok -> ok;
{error, FSErr} ->
rabbit_log:error("Unable to store file summary"
" for vhost message store for directory ~p~n"
" Error: ~p~n",
[Dir, FSErr])
end,
[true = ets:delete(T) || T <- [FileSummaryEts, FileHandlesEts,
CurFileCacheEts, FlyingEts]],
IndexModule:terminate(IndexState),
ok = store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
{index_module, IndexModule}], Dir),
case store_recovery_terms([{client_refs, dict:fetch_keys(Clients)},
{index_module, IndexModule}], Dir) of
ok -> ok;
{error, RTErr} ->
rabbit_log:error("Unable to save message store recovery terms"
"for directory ~p~n Error: ~p~n",
[Dir, RTErr])
end,
State3 #msstate { index_state = undefined,
current_file_handle = undefined }.

Expand Down Expand Up @@ -1582,7 +1596,7 @@ read_recovery_terms(Dir) ->
end.

store_file_summary(Tid, Dir) ->
ok = ets:tab2file(Tid, filename:join(Dir, ?FILE_SUMMARY_FILENAME),
ets:tab2file(Tid, filename:join(Dir, ?FILE_SUMMARY_FILENAME),
[{extended_info, [object_count]}]).

recover_file_summary(false, _Dir) ->
Expand Down
10 changes: 8 additions & 2 deletions src/rabbit_msg_store_ets_index.erl
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,12 @@ delete_by_file(File, State) ->
ok.

terminate(#state { table = MsgLocations, dir = Dir }) ->
ok = ets:tab2file(MsgLocations, filename:join(Dir, ?FILENAME),
[{extended_info, [object_count]}]),
case ets:tab2file(MsgLocations, filename:join(Dir, ?FILENAME),
[{extended_info, [object_count]}]) of
ok -> ok;
{error, Err} ->
rabbit_log:error("Unable to save message store index"
" for directory ~p~n Error: ~p~n",
[Dir, Err])
end,
ets:delete(MsgLocations).
68 changes: 68 additions & 0 deletions src/rabbit_msg_store_vhost_sup.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
-module(rabbit_msg_store_vhost_sup).

-behaviour(supervisor2).

-export([start_link/3, init/1, add_vhost/2, delete_vhost/2,
client_init/5, successfully_recovered_state/2]).

%% Internal
-export([start_vhost/4]).

start_link(Name, ClientRefs, StartupFunState) ->
supervisor2:start_link({local, Name}, ?MODULE,
[Name, ClientRefs, StartupFunState]).

init([Name, ClientRefs, StartupFunState]) ->
{ok, {{simple_one_for_one, 1, 1},
[{rabbit_msg_store_vhost, {rabbit_msg_store_vhost_sup, start_vhost,
[Name, ClientRefs, StartupFunState]},
transient, infinity, supervisor, [rabbit_msg_store]}]}}.


add_vhost(Name, VHost) ->
supervisor2:start_child(Name, [VHost]).

start_vhost(Name, ClientRefs, StartupFunState, VHost) ->
VHostName = vhost_store_name(Name, VHost),
VHostDir = vhost_store_dir(VHost),
ok = rabbit_file:ensure_dir(VHostDir),
rabbit_msg_store:start_link(VHostName, VHostDir,
ClientRefs, StartupFunState).

delete_vhost(Name, VHost) ->
VHostName = vhost_store_name(Name, VHost),
case whereis(VHostName) of
undefined -> ok;
Pid -> supervisor2:terminate_child(Name, Pid)
end,
ok.

client_init(Server, Ref, MsgOnDiskFun, CloseFDsFun, VHost) ->
VHostName = maybe_start_vhost(Server, VHost),
rabbit_msg_store:client_init(VHostName, Ref, MsgOnDiskFun, CloseFDsFun).

maybe_start_vhost(Server, VHost) ->
VHostName = vhost_store_name(Server, VHost),
case whereis(VHostName) of
undefined -> add_vhost(Server, VHost);
_ -> ok
end,
VHostName.

vhost_store_name(Name, VHost) ->
VhostEncoded = rabbit_vhost:dir(VHost),
binary_to_atom(<<(atom_to_binary(Name, utf8))/binary, "_",
VhostEncoded/binary>>,
utf8).

vhost_store_dir(VHost) ->
Dir = rabbit_mnesia:dir(),
VhostEncoded = rabbit_vhost:dir(VHost),
binary_to_list(filename:join([Dir, VhostEncoded])).

successfully_recovered_state(Name, VHost) ->
VHostName = vhost_store_name(Name, VHost),
rabbit_msg_store:successfully_recovered_state(VHostName).

% force_recovery
% transform_dir
67 changes: 41 additions & 26 deletions src/rabbit_queue_index.erl
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,10 @@
read/3, next_segment_boundary/1, bounds/1, start/1, stop/0]).

-export([add_queue_ttl/0, avoid_zeroes/0, store_msg_size/0, store_msg/0]).
-export([scan_queue_segments/3]).

%% Migration to per-vhost message store
-export([move_to_vhost_store/1]).

-define(CLEAN_FILENAME, "clean.dot").

Expand Down Expand Up @@ -475,11 +479,10 @@ start(DurableQueueNames) ->
end, {[], sets:new()}, DurableQueueNames),

%% Any queue directory we've not been asked to recover is considered garbage
QueuesDir = queues_dir(),
rabbit_file:recursive_delete(
[filename:join(QueuesDir, DirName) ||
DirName <- all_queue_directory_names(QueuesDir),
not sets:is_element(DirName, DurableDirectories)]),
[DirName ||
DirName <- all_queue_directory_names(),
not sets:is_element(filename:basename(DirName), DurableDirectories)]),

rabbit_recovery_terms:clear(),

Expand All @@ -490,12 +493,9 @@ start(DurableQueueNames) ->

stop() -> rabbit_recovery_terms:stop().

all_queue_directory_names(Dir) ->
case rabbit_file:list_dir(Dir) of
{ok, Entries} -> [E || E <- Entries,
rabbit_file:is_dir(filename:join(Dir, E))];
{error, enoent} -> []
end.
all_queue_directory_names() ->
QueuesBaseDir = queues_base_dir(),
filelib:wildcard(filename:join([QueuesBaseDir, "*", "queues", "*"])).
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See below. It would be less risky if we used {node_database_dir}/vhosts/{vhost}/queues/{queue} for path naming.


%%----------------------------------------------------------------------------
%% startup and shutdown
Expand All @@ -508,14 +508,18 @@ erase_index_dir(Dir) ->
end.

blank_state(QueueName) ->
blank_state_dir(
filename:join(queues_dir(), queue_name_to_dir_name(QueueName))).
blank_state_dir(queue_dir(QueueName)).

blank_state_dir(Dir) ->
blank_state_dir_funs(Dir,
fun (_) -> ok end,
fun (_) -> ok end).

queue_dir(#resource{ virtual_host = VHost } = QueueName) ->
%% Queue directory is rabbit_mnesia_dir/:vhost/queues/:queue_id
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It would be a bit clearer if we used {node_database_dir}/vhosts/{vhost}/queues/{queue}.

filename:join([queues_base_dir(), rabbit_vhost:dir(VHost),
"queues", queue_name_to_dir_name(QueueName)]).

blank_state_dir_funs(Dir, OnSyncFun, OnSyncMsgFun) ->
{ok, MaxJournal} =
application:get_env(rabbit, queue_index_max_journal_entries),
Expand Down Expand Up @@ -629,8 +633,8 @@ queue_name_to_dir_name(Name = #resource { kind = queue }) ->
<<Num:128>> = erlang:md5(term_to_binary(Name)),
rabbit_misc:format("~.36B", [Num]).

queues_dir() ->
filename:join(rabbit_mnesia:dir(), "queues").
queues_base_dir() ->
rabbit_mnesia:dir().

%%----------------------------------------------------------------------------
%% msg store startup delta function
Expand Down Expand Up @@ -660,20 +664,19 @@ queue_index_walker({next, Gatherer}) when is_pid(Gatherer) ->
end.

queue_index_walker_reader(QueueName, Gatherer) ->
State = blank_state(QueueName),
ok = scan_segments(
ok = scan_queue_segments(
fun (_SeqId, MsgId, _MsgProps, true, _IsDelivered, no_ack, ok)
when is_binary(MsgId) ->
gatherer:sync_in(Gatherer, {MsgId, 1});
(_SeqId, _MsgId, _MsgProps, _IsPersistent, _IsDelivered,
_IsAcked, Acc) ->
Acc
end, ok, State),
end, ok, QueueName),
ok = gatherer:finish(Gatherer).

scan_segments(Fun, Acc, State) ->
State1 = #qistate { segments = Segments, dir = Dir } =
recover_journal(State),
scan_queue_segments(Fun, Acc, QueueName) ->
State = #qistate { segments = Segments, dir = Dir } =
recover_journal(blank_state(QueueName)),
Result = lists:foldr(
fun (Seg, AccN) ->
segment_entries_foldr(
Expand All @@ -682,8 +685,8 @@ scan_segments(Fun, Acc, State) ->
Fun(reconstruct_seq_id(Seg, RelSeq), MsgOrId, MsgProps,
IsPersistent, IsDelivered, IsAcked, AccM)
end, AccN, segment_find_or_new(Seg, Dir, Segments))
end, Acc, all_segment_nums(State1)),
{_SegmentCounts, _State} = terminate(State1),
end, Acc, all_segment_nums(State)),
{_SegmentCounts, _State} = terminate(State),
Result.

%%----------------------------------------------------------------------------
Expand Down Expand Up @@ -1353,15 +1356,13 @@ store_msg_segment(_) ->
%%----------------------------------------------------------------------------

foreach_queue_index(Funs) ->
QueuesDir = queues_dir(),
QueueDirNames = all_queue_directory_names(QueuesDir),
QueueDirNames = all_queue_directory_names(),
{ok, Gatherer} = gatherer:start_link(),
[begin
ok = gatherer:fork(Gatherer),
ok = worker_pool:submit_async(
fun () ->
transform_queue(filename:join(QueuesDir, QueueDirName),
Gatherer, Funs)
transform_queue(QueueDirName, Gatherer, Funs)
end)
end || QueueDirName <- QueueDirNames],
empty = gatherer:out(Gatherer),
Expand Down Expand Up @@ -1402,3 +1403,17 @@ drive_transform_fun(Fun, Hdl, Contents) ->
{Output, Contents1} -> ok = file_handle_cache:append(Hdl, Output),
drive_transform_fun(Fun, Hdl, Contents1)
end.

move_to_vhost_store(#resource{} = QueueName) ->
OldQueueDir = filename:join([queues_base_dir(), "queues",
queue_name_to_dir_name(QueueName)]),
NewQueueDir = queue_dir(QueueName),
case rabbit_file:is_dir(OldQueueDir) of
true ->
ok = rabbit_file:ensure_dir(NewQueueDir),
ok = rabbit_file:rename(OldQueueDir, NewQueueDir);
false ->
rabbit_log:info("Queue index directoy not found for queue ~p~n",
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

directoy should be directory

[QueueName])
end,
ok.
13 changes: 13 additions & 0 deletions src/rabbit_upgrade.erl
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
-module(rabbit_upgrade).

-export([maybe_upgrade_mnesia/0, maybe_upgrade_local/0,
maybe_migrate_queues_to_per_vhost_storage/0,
nodes_running/1, secondary_upgrade/1]).

-include("rabbit.hrl").
Expand Down Expand Up @@ -252,6 +253,18 @@ maybe_upgrade_local() ->

%% -------------------------------------------------------------------

maybe_migrate_queues_to_per_vhost_storage() ->
case rabbit_version:upgrades_required(queues) of
{error, version_not_available} -> version_not_available;
{error, starting_from_scratch} -> starting_from_scratch;
{error, _} = Err -> throw(Err);
{ok, []} -> ok;
{ok, Upgrades} -> apply_upgrades(queues, Upgrades,
fun() -> ok end)
end.

%% -------------------------------------------------------------------

apply_upgrades(Scope, Upgrades, Fun) ->
ok = rabbit_file:lock_file(lock_filename()),
info("~s upgrades: ~w to apply~n", [Scope, length(Upgrades)]),
Expand Down
Loading