Skip to content

Commit

Permalink
Revert dd08c9204760
Browse files Browse the repository at this point in the history
  • Loading branch information
Simon MacMullen committed Aug 14, 2013
1 parent 471692a commit 01b80ec
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 102 deletions.
89 changes: 18 additions & 71 deletions src/delegate.erl
Original file line number Diff line number Diff line change
Expand Up @@ -18,22 +18,15 @@

-behaviour(gen_server2).

-export([start_link/1, invoke_no_result/2, invoke/2, monitor/2,
demonitor/1, demonitor/2, call/2, cast/2]).
-export([start_link/1, invoke_no_result/2, invoke/2, call/2, cast/2]).

-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).

-record(state, {node, monitors, name}).

%%----------------------------------------------------------------------------

-ifdef(use_specs).

-export_type([monitor_ref/0]).

-type(monitor_ref() :: reference() | {atom(), pid()}).

-spec(start_link/1 ::
(non_neg_integer()) -> {'ok', pid()} | ignore | {'error', any()}).
-spec(invoke/2 ::
Expand All @@ -42,10 +35,6 @@
[{pid(), term()}]}).
-spec(invoke_no_result/2 ::
(pid() | [pid()], fun ((pid()) -> any())) -> 'ok').
-spec(monitor/2 :: ('process', pid()) -> monitor_ref()).
-spec(demonitor/1 :: (monitor_ref()) -> 'true').
-spec(demonitor/2 :: (monitor_ref(), ['flush']) -> 'true').

-spec(call/2 ::
( pid(), any()) -> any();
([pid()], any()) -> {[{pid(), any()}], [{pid(), term()}]}).
Expand All @@ -61,8 +50,7 @@
%%----------------------------------------------------------------------------

start_link(Num) ->
Name = delegate_name(Num),
gen_server2:start_link({local, Name}, ?MODULE, [Name], []).
gen_server2:start_link({local, delegate_name(Num)}, ?MODULE, [], []).

invoke(Pid, Fun) when is_pid(Pid) andalso node(Pid) =:= node() ->
Fun(Pid);
Expand Down Expand Up @@ -90,7 +78,7 @@ invoke(Pids, Fun) when is_list(Pids) ->
case orddict:fetch_keys(Grouped) of
[] -> {[], []};
RemoteNodes -> gen_server2:multi_call(
RemoteNodes, delegate(self(), RemoteNodes),
RemoteNodes, delegate(RemoteNodes),
{invoke, Fun, Grouped}, infinity)
end,
BadPids = [{Pid, {exit, {nodedown, BadNode}, []}} ||
Expand Down Expand Up @@ -118,27 +106,12 @@ invoke_no_result(Pids, Fun) when is_list(Pids) ->
{LocalPids, Grouped} = group_pids_by_node(Pids),
case orddict:fetch_keys(Grouped) of
[] -> ok;
RemoteNodes -> gen_server2:abcast(
RemoteNodes, delegate(self(), RemoteNodes),
{invoke, Fun, Grouped})
RemoteNodes -> gen_server2:abcast(RemoteNodes, delegate(RemoteNodes),
{invoke, Fun, Grouped})
end,
safe_invoke(LocalPids, Fun), %% must not die
ok.

monitor(Type, Pid) when node(Pid) =:= node() ->
erlang:monitor(Type, Pid);
monitor(Type, Pid) ->
Name = delegate(Pid, [node(Pid)]),
gen_server2:cast(Name, {monitor, Type, self(), Pid}),
{Name, Pid}.

demonitor(Ref) -> ?MODULE:demonitor(Ref, []).

demonitor(Ref, Options) when is_reference(Ref) ->
erlang:demonitor(Ref, Options);
demonitor({Name, Pid}, Options) ->
gen_server2:cast(Name, {demonitor, Pid, Options}).

call(PidOrPids, Msg) ->
invoke(PidOrPids, fun (P) -> gen_server2:call(P, Msg, infinity) end).

Expand All @@ -161,10 +134,10 @@ group_pids_by_node(Pids) ->
delegate_name(Hash) ->
list_to_atom("delegate_" ++ integer_to_list(Hash)).

delegate(Pid, RemoteNodes) ->
delegate(RemoteNodes) ->
case get(delegate) of
undefined -> Name = delegate_name(
erlang:phash2(Pid,
erlang:phash2(self(),
delegate_sup:count(RemoteNodes))),
put(delegate, Name),
Name;
Expand All @@ -182,48 +155,22 @@ safe_invoke(Pid, Fun) when is_pid(Pid) ->

%%----------------------------------------------------------------------------

init([Name]) ->
{ok, #state{node = node(), monitors = dict:new(), name = Name}, hibernate,
init([]) ->
{ok, node(), hibernate,
{backoff, ?HIBERNATE_AFTER_MIN, ?HIBERNATE_AFTER_MIN, ?DESIRED_HIBERNATE}}.

handle_call({invoke, Fun, Grouped}, _From, State = #state{node = Node}) ->
{reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), State, hibernate}.

handle_cast({monitor, Type, WantsMonitor, Pid},
State = #state{monitors = Monitors}) ->
Ref = erlang:monitor(Type, Pid),
Monitors1 = dict:store(Pid, {WantsMonitor, Ref}, Monitors),
{noreply, State#state{monitors = Monitors1}, hibernate};

handle_cast({demonitor, Pid, Options},
State = #state{monitors = Monitors}) ->
{noreply, case dict:find(Pid, Monitors) of
{ok, {_WantsMonitor, Ref}} ->
erlang:demonitor(Ref, Options),
State#state{monitors = dict:erase(Pid, Monitors)};
error ->
State
end, hibernate};

handle_cast({invoke, Fun, Grouped}, State = #state{node = Node}) ->
safe_invoke(orddict:fetch(Node, Grouped), Fun),
{noreply, State, hibernate}.
handle_call({invoke, Fun, Grouped}, _From, Node) ->
{reply, safe_invoke(orddict:fetch(Node, Grouped), Fun), Node, hibernate}.

handle_info({'DOWN', Ref, process, Pid, Info},
State = #state{monitors = Monitors, name = Name}) ->
{noreply, case dict:find(Pid, Monitors) of
{ok, {WantsMonitor, Ref}} ->
WantsMonitor ! {'DOWN', {Name, Pid}, process, Pid, Info},
State#state{monitors = dict:erase(Pid, Monitors)};
error ->
State
end, hibernate};
handle_cast({invoke, Fun, Grouped}, Node) ->
safe_invoke(orddict:fetch(Node, Grouped), Fun),
{noreply, Node, hibernate}.

handle_info(_Info, State) ->
{noreply, State, hibernate}.
handle_info(_Info, Node) ->
{noreply, Node, hibernate}.

terminate(_Reason, _State) ->
ok.

code_change(_OldVsn, State, _Extra) ->
{ok, State}.
code_change(_OldVsn, Node, _Extra) ->
{ok, Node}.
42 changes: 17 additions & 25 deletions src/pmon.erl
Original file line number Diff line number Diff line change
Expand Up @@ -16,26 +16,22 @@

-module(pmon).

-export([new/0, new/1, monitor/2, monitor_all/2, demonitor/2,
is_monitored/2, erase/2, monitored/1, is_empty/1]).
-export([new/0, monitor/2, monitor_all/2, demonitor/2, is_monitored/2, erase/2,
monitored/1, is_empty/1]).

-compile({no_auto_import, [monitor/2]}).

-record(state, {dict, module}).

-ifdef(use_specs).

%%----------------------------------------------------------------------------

-export_type([?MODULE/0]).

-opaque(?MODULE() :: #state{dict :: dict(),
module :: atom()}).
-opaque(?MODULE() :: dict()).

-type(item() :: pid() | {atom(), node()}).

-spec(new/0 :: () -> ?MODULE()).
-spec(new/1 :: ('erlang' | 'delegate') -> ?MODULE()).
-spec(monitor/2 :: (item(), ?MODULE()) -> ?MODULE()).
-spec(monitor_all/2 :: ([item()], ?MODULE()) -> ?MODULE()).
-spec(demonitor/2 :: (item(), ?MODULE()) -> ?MODULE()).
Expand All @@ -46,33 +42,29 @@

-endif.

new() -> new(erlang).

new(Module) -> #state{dict = dict:new(),
module = Module}.
new() -> dict:new().

monitor(Item, S = #state{dict = M, module = Module}) ->
monitor(Item, M) ->
case dict:is_key(Item, M) of
true -> S;
false -> S#state{dict = dict:store(
Item, Module:monitor(process, Item), M)}
true -> M;
false -> dict:store(Item, erlang:monitor(process, Item), M)
end.

monitor_all([], S) -> S; %% optimisation
monitor_all([Item], S) -> monitor(Item, S); %% optimisation
monitor_all(Items, S) -> lists:foldl(fun monitor/2, S, Items).
monitor_all([], M) -> M; %% optimisation
monitor_all([Item], M) -> monitor(Item, M); %% optimisation
monitor_all(Items, M) -> lists:foldl(fun monitor/2, M, Items).

demonitor(Item, S = #state{dict = M, module = Module}) ->
demonitor(Item, M) ->
case dict:find(Item, M) of
{ok, MRef} -> Module:demonitor(MRef),
S#state{dict = dict:erase(Item, M)};
{ok, MRef} -> erlang:demonitor(MRef),
dict:erase(Item, M);
error -> M
end.

is_monitored(Item, #state{dict = M}) -> dict:is_key(Item, M).
is_monitored(Item, M) -> dict:is_key(Item, M).

erase(Item, S = #state{dict = M}) -> S#state{dict = dict:erase(Item, M)}.
erase(Item, M) -> dict:erase(Item, M).

monitored(#state{dict = M}) -> dict:fetch_keys(M).
monitored(M) -> dict:fetch_keys(M).

is_empty(#state{dict = M}) -> dict:size(M) == 0.
is_empty(M) -> dict:size(M) == 0.
2 changes: 1 addition & 1 deletion src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ init_state(Q) ->
exclusive_consumer = none,
has_had_consumers = false,
active_consumers = queue:new(),
senders = pmon:new(delegate),
senders = pmon:new(),
msg_id_to_channel = gb_trees:empty(),
status = running},
rabbit_event:init_stats_timer(State, #q.stats_timer).
Expand Down
10 changes: 5 additions & 5 deletions src/rabbit_mirror_queue_slave.erl
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ init(Q = #amqqueue { name = QName }) ->
msg_id_ack = dict:new(),

msg_id_status = dict:new(),
known_senders = pmon:new(delegate),
known_senders = pmon:new(),

depth_delta = undefined
},
Expand Down Expand Up @@ -274,8 +274,7 @@ handle_info({'DOWN', _MonitorRef, process, MPid, _Reason},
noreply(State);

handle_info({'DOWN', _MonitorRef, process, ChPid, _Reason}, State) ->
local_sender_death(ChPid, State),
noreply(State);
noreply(local_sender_death(ChPid, State));

handle_info({'EXIT', _Pid, Reason}, State) ->
{stop, Reason, State};
Expand Down Expand Up @@ -606,15 +605,16 @@ stop_rate_timer(State) -> rabbit_misc:stop_timer(State, #state.rate_timer_ref).
ensure_monitoring(ChPid, State = #state { known_senders = KS }) ->
State #state { known_senders = pmon:monitor(ChPid, KS) }.

local_sender_death(ChPid, #state { known_senders = KS }) ->
local_sender_death(ChPid, State = #state { known_senders = KS }) ->
%% The channel will be monitored iff we have received a delivery
%% from it but not heard about its death from the master. So if it
%% is monitored we need to point the death out to the master (see
%% essay).
ok = case pmon:is_monitored(ChPid, KS) of
false -> ok;
true -> confirm_sender_death(ChPid)
end.
end,
State.

confirm_sender_death(Pid) ->
%% We have to deal with the possibility that we'll be promoted to
Expand Down

0 comments on commit 01b80ec

Please sign in to comment.