Skip to content

Commit

Permalink
Khepri: replace join_cluster by mnesia_to_khepri:sync_cluster_membership
Browse files Browse the repository at this point in the history
  • Loading branch information
dcorbacho committed Apr 18, 2023
1 parent 7c9ae77 commit aaa6cac
Show file tree
Hide file tree
Showing 3 changed files with 26 additions and 33 deletions.
4 changes: 2 additions & 2 deletions deps/rabbit/src/rabbit_db_cluster.erl
Original file line number Diff line number Diff line change
Expand Up @@ -76,9 +76,9 @@ join_using_khepri(RemoteNode, NodeType) ->
ok ->
case join_using_mnesia(RemoteNode, NodeType) of
ok ->
rabbit_khepri:join_cluster(RemoteNode);
rabbit_khepri:init_cluster();
{ok, already_member} ->
rabbit_khepri:join_cluster(RemoteNode);
rabbit_khepri:init_cluster();
Error ->
Error
end;
Expand Down
50 changes: 20 additions & 30 deletions deps/rabbit/src/rabbit_khepri.erl
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,6 @@
-export([do_join/1]).
%% To add the current node to an existing cluster
-export([check_join_cluster/1,
join_cluster/1,
leave_cluster/1]).
-export([is_clustered/0]).
-export([check_cluster_consistency/0,
Expand Down Expand Up @@ -435,10 +434,26 @@ cli_cluster_status() ->
init_cluster() ->
%% Ensure the local Khepri store is running before we can join it. It
%% could be stopped if RabbitMQ is not running for instance.
ok = setup(),
khepri:info(?RA_CLUSTER_NAME),
_ = application:ensure_all_started(khepri_mnesia_migration),
mnesia_to_khepri:sync_cluster_membership(?STORE_ID).
rabbit_log:debug("Khepri clustering: starting Mnesia..."),
IsRunning = rabbit_mnesia:is_running(),
try
case IsRunning of
true -> ok;
false -> rabbit_mnesia:start_mnesia(false)
end,
rabbit_log:debug("Khepri clustering: starting Khepri..."),
ok = setup(),
khepri:info(?RA_CLUSTER_NAME),
rabbit_log:debug("Khepri clustering: starting khepri_mnesia_migration..."),
_ = application:ensure_all_started(khepri_mnesia_migration),
rabbit_log:debug("Khepri clustering: syncing cluster membership"),
mnesia_to_khepri:sync_cluster_membership(?STORE_ID)
after
case IsRunning of
true -> ok;
false -> rabbit_mnesia:stop_mnesia()
end
end.

%%%%%%%%
%% TODO run_peer_discovery!!
Expand Down Expand Up @@ -468,31 +483,6 @@ check_join_cluster(DiscoveryNode) ->
end
end.

join_cluster(DiscoveryNode) ->
{ClusterNodes, _} = discover_cluster([DiscoveryNode]),
case me_in_nodes(ClusterNodes) of
false ->
case check_cluster_consistency(DiscoveryNode, false) of
{ok, _S} ->
ThisNode = node(),
retry_khepri_op(fun() -> add_member(ThisNode, [DiscoveryNode]) end, 60);
Error ->
Error
end;
true ->
%% DiscoveryNode thinks that we are part of a cluster, but
%% do we think so ourselves?
case are_we_clustered_with(DiscoveryNode) of
true ->
rabbit_log:info("Asked to join a cluster but already a member of it: ~tp", [ClusterNodes]),
{ok, already_member};
false ->
Msg = format_inconsistent_cluster_message(DiscoveryNode, node()),
rabbit_log:error(Msg),
{error, {inconsistent_cluster, Msg}}
end
end.

discover_cluster(Nodes) ->
case lists:foldl(fun (_, {ok, Res}) -> {ok, Res};
(Node, _) -> discover_cluster0(Node)
Expand Down
5 changes: 4 additions & 1 deletion deps/rabbit/src/rabbit_mnesia.erl
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,10 @@
on_node_down/1,

%% Helpers for diagnostics commands
schema_info/1
schema_info/1,

start_mnesia/1,
stop_mnesia/0
]).

%% Mnesia queries
Expand Down

0 comments on commit aaa6cac

Please sign in to comment.