Skip to content

Commit

Permalink
File handle cache: Pay attention to memory use
Browse files Browse the repository at this point in the history
... and clear read cache if necessary.

This solves an issue where sync'ing a mirrored queue could take all
available memory (way above the high watermark) and possibly crash the
node.

Fixes #134.
  • Loading branch information
dumbbell committed May 5, 2015
1 parent c29a33f commit 37468b3
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 6 deletions.
38 changes: 38 additions & 0 deletions src/file_handle_cache.erl
Original file line number Diff line number Diff line change
Expand Up @@ -353,6 +353,7 @@ read(Ref, Count) ->
read_buffer_rem = BufRem - Count,
read_buffer_usage = BufUsg + Count }]};
([Handle0]) ->
maybe_reduce_read_cache([Ref]),
Handle = #handle{read_buffer = Buf,
read_buffer_pos = BufPos,
read_buffer_rem = BufRem,
Expand Down Expand Up @@ -962,6 +963,43 @@ tune_read_buffer_limit(Handle = #handle{read_buffer = Buf,
false -> Usg * 2
end, Lim)}.

maybe_reduce_read_cache(SparedRefs) ->
case rabbit_memory_monitor:memory_use(bytes) of
{_, infinity} -> ok;
{MemUse, MemLimit} when MemUse < MemLimit -> ok;
{MemUse, MemLimit} -> reduce_read_cache(
(MemUse - MemLimit) * 2,
SparedRefs)
end.

reduce_read_cache(MemToFree, SparedRefs) ->
Handles = lists:sort(
fun({_, H1}, {_, H2}) -> H1 < H2 end,
[{R, H} || {{R, fhc_handle}, H} <- get(),
not lists:member(R, SparedRefs)
andalso size(H#handle.read_buffer) > 0]),
FreedMem = lists:foldl(
fun
(_, Freed) when Freed >= MemToFree ->
Freed;
({Ref, #handle{read_buffer = Buf} = Handle}, Freed) ->
Handle1 = reset_read_buffer(Handle),
put({Ref, fhc_handle}, Handle1),
Freed + size(Buf)
end, 0, Handles),
if
FreedMem > 0 -> rabbit_log:info(
"File handle cache: freed ~p / ~p bytes (~p handles)~n",
[FreedMem, MemToFree, length(Handles)]);
true -> ok
end,
if
FreedMem < MemToFree andalso SparedRefs =/= [] ->
reduce_read_cache(MemToFree - FreedMem, []);
true ->
erlang:garbage_collect() %% FIXME: Good idea?
end.

infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items].

i(total_limit, #fhc_state{limit = Limit}) -> Limit;
Expand Down
21 changes: 15 additions & 6 deletions src/rabbit_memory_monitor.erl
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
-behaviour(gen_server2).

-export([start_link/0, register/2, deregister/1,
report_ram_duration/2, stop/0, conserve_resources/3]).
report_ram_duration/2, stop/0, conserve_resources/3, memory_use/1]).

-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).
Expand Down Expand Up @@ -92,6 +92,19 @@ conserve_resources(Pid, disk, Conserve) ->
conserve_resources(_Pid, _Source, _Conserve) ->
ok.

memory_use(bytes) ->
MemoryLimit = vm_memory_monitor:get_memory_limit(),
{erlang:memory(total), case MemoryLimit > 0.0 of
true -> MemoryLimit;
false -> infinity
end};
memory_use(ratio) ->
MemoryLimit = vm_memory_monitor:get_memory_limit(),
case MemoryLimit > 0.0 of
true -> erlang:memory(total) / MemoryLimit;
false -> infinity
end.

%%----------------------------------------------------------------------------
%% Gen_server callbacks
%%----------------------------------------------------------------------------
Expand Down Expand Up @@ -223,11 +236,7 @@ desired_duration_average(#state{disk_alarm = false,
queue_duration_count = Count}) ->
{ok, LimitThreshold} =
application:get_env(rabbit, vm_memory_high_watermark_paging_ratio),
MemoryLimit = vm_memory_monitor:get_memory_limit(),
MemoryRatio = case MemoryLimit > 0.0 of
true -> erlang:memory(total) / MemoryLimit;
false -> infinity
end,
MemoryRatio = memory_use(ratio),
if MemoryRatio =:= infinity ->
0.0;
MemoryRatio < LimitThreshold orelse Count == 0 ->
Expand Down

0 comments on commit 37468b3

Please sign in to comment.