Skip to content

Commit

Permalink
Allow subscribing to node down events from the node monitor.
Browse files Browse the repository at this point in the history
  • Loading branch information
Simon MacMullen committed Mar 1, 2013
1 parent a1314b9 commit e04f3ae
Showing 1 changed file with 17 additions and 5 deletions.
22 changes: 17 additions & 5 deletions src/rabbit_node_monitor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
write_cluster_status/1, read_cluster_status/0,
update_cluster_status/0, reset_cluster_status/0]).
-export([notify_node_up/0, notify_joined_cluster/0, notify_left_cluster/1]).
-export([partitions/0]).
-export([partitions/0, subscribe/1]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
Expand All @@ -33,7 +33,7 @@
-define(SERVER, ?MODULE).
-define(RABBIT_UP_RPC_TIMEOUT, 2000).

-record(state, {monitors, partitions}).
-record(state, {monitors, partitions, subscribers}).

%%----------------------------------------------------------------------------

Expand All @@ -54,6 +54,7 @@
-spec(notify_left_cluster/1 :: (node()) -> 'ok').

-spec(partitions/0 :: () -> {node(), [{atom(), node()}]}).
-spec(subscribe/1 :: (pid()) -> 'ok').

-endif.

Expand Down Expand Up @@ -179,6 +180,9 @@ notify_left_cluster(Node) ->
partitions() ->
gen_server:call(?SERVER, partitions, infinity).

subscribe(Pid) ->
gen_server:cast(?SERVER, {subscribe, Pid}).

%%----------------------------------------------------------------------------
%% gen_server callbacks
%%----------------------------------------------------------------------------
Expand All @@ -190,8 +194,9 @@ init([]) ->
%% happen.
process_flag(trap_exit, true),
{ok, _} = mnesia:subscribe(system),
{ok, #state{monitors = pmon:new(),
partitions = []}}.
{ok, #state{monitors = pmon:new(),
subscribers = pmon:new(),
partitions = []}}.

handle_call(partitions, _From, State = #state{partitions = Partitions}) ->
{reply, {node(), Partitions}, State};
Expand Down Expand Up @@ -232,17 +237,24 @@ handle_cast({left_cluster, Node}, State) ->
write_cluster_status({del_node(Node, AllNodes), del_node(Node, DiscNodes),
del_node(Node, RunningNodes)}),
{noreply, State};
handle_cast({subscribe, Pid}, State = #state{subscribers = Subscribers}) ->
{noreply, State#state{subscribers = pmon:monitor(Pid, Subscribers)}};
handle_cast(_Msg, State) ->
{noreply, State}.

handle_info({'DOWN', _MRef, process, {rabbit, Node}, _Reason},
State = #state{monitors = Monitors}) ->
State = #state{monitors = Monitors, subscribers = Subscribers}) ->
rabbit_log:info("rabbit on node ~p down~n", [Node]),
{AllNodes, DiscNodes, RunningNodes} = read_cluster_status(),
write_cluster_status({AllNodes, DiscNodes, del_node(Node, RunningNodes)}),
ok = handle_dead_rabbit(Node),
[P ! {node_down, Node} || P <- pmon:monitored(Subscribers)],
{noreply, State#state{monitors = pmon:erase({rabbit, Node}, Monitors)}};

handle_info({'DOWN', _MRef, process, Pid, _Reason},
State = #state{subscribers = Subscribers}) ->
{noreply, State#state{subscribers = pmon:erase(Pid, Subscribers)}};

handle_info({mnesia_system_event,
{inconsistent_database, running_partitioned_network, Node}},
State = #state{partitions = Partitions}) ->
Expand Down

0 comments on commit e04f3ae

Please sign in to comment.