diff --git a/src/rabbit_mirror_queue_sync.erl b/src/rabbit_mirror_queue_sync.erl index 2d8bdfa8600a..a97a9b50c86a 100644 --- a/src/rabbit_mirror_queue_sync.erl +++ b/src/rabbit_mirror_queue_sync.erl @@ -18,7 +18,7 @@ -include("rabbit.hrl"). --export([master_prepare/4, master_go/8, slave/7]). +-export([master_prepare/4, master_go/8, slave/7, conserve_resources/3]). -define(SYNC_PROGRESS_INTERVAL, 1000000). @@ -198,7 +198,7 @@ syncer(Ref, Log, MPid, SPids) -> [] -> Log("all slaves already synced", []); SPids1 -> MPid ! {ready, self()}, Log("mirrors ~p to sync", [[node(SPid) || SPid <- SPids1]]), - syncer_loop(Ref, MPid, SPids1) + syncer_check_resources(Ref, MPid, SPids1) end. await_slaves(Ref, SPids) -> @@ -217,12 +217,43 @@ await_slaves(Ref, SPids) -> %% 'sync_start' and so will not reply. We need to act as though they are %% down. +syncer_check_resources(Ref, MPid, SPids) -> + rabbit_alarm:register(self(), {?MODULE, conserve_resources, []}), + %% Before we ask the master node to send the first batch of messages + %% over here, we check if one node is already short on memory. If + %% that's the case, we wait for the alarm to be cleared before + %% starting the syncer loop. + AlarmedNodes = lists:any( + fun + ({{resource_limit, memory, _}, _}) -> true; + ({_, _}) -> false + end, rabbit_alarm:get_alarms()), + if + not AlarmedNodes -> + MPid ! {next, Ref}, + syncer_loop(Ref, MPid, SPids); + true -> + case wait_for_resources(Ref, SPids) of + cancel -> ok; + SPids1 -> MPid ! {next, Ref}, + syncer_loop(Ref, MPid, SPids1) + end + end. + syncer_loop(Ref, MPid, SPids) -> - MPid ! {next, Ref}, receive + {conserve_resources, memory, true} -> + case wait_for_resources(Ref, SPids) of + cancel -> ok; + SPids1 -> syncer_loop(Ref, MPid, SPids1) + end; + {conserve_resources, _, _} -> + %% Ignore other alerts. + syncer_loop(Ref, MPid, SPids); {msgs, Ref, Msgs} -> SPids1 = wait_for_credit(SPids), broadcast(SPids1, {sync_msgs, Ref, Msgs}), + MPid ! {next, Ref}, syncer_loop(Ref, MPid, SPids1); {cancel, Ref} -> %% We don't tell the slaves we will die - so when we do @@ -239,6 +270,10 @@ broadcast(SPids, Msg) -> SPid ! Msg end || SPid <- SPids]. +conserve_resources(Pid, Source, {_, Conserve, _}) -> + Pid ! {conserve_resources, Source, Conserve}, + ok. + wait_for_credit(SPids) -> case credit_flow:blocked() of true -> receive @@ -252,6 +287,24 @@ wait_for_credit(SPids) -> false -> SPids end. +wait_for_resources(Ref, SPids) -> + receive + {conserve_resources, memory, false} -> + SPids; + {conserve_resources, _, _} -> + %% Ignore other alerts. + wait_for_resources(Ref, SPids); + {cancel, Ref} -> + %% We don't tell the slaves we will die - so when we do + %% they interpret that as a failure, which is what we + %% want. + cancel; + {'DOWN', _, process, SPid, _} -> + credit_flow:peer_down(SPid), + SPids1 = wait_for_credit(lists:delete(SPid, SPids)), + wait_for_resources(Ref, SPids1) + end. + %% Syncer %% --------------------------------------------------------------------------- %% Slave