From e0066cac5c443464a36b3eb4f68b9fd8fb860e38 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Jean-S=C3=A9bastien=20P=C3=A9dron?= Date: Tue, 5 May 2015 22:07:50 +0200 Subject: [PATCH] File handle cache: Pay attention to memory use ... and clear read cache if necessary. This solves an issue where sync'ing a mirrored queue could take all available memory (way above the high watermark) and possibly crash the node. Fixes #134. --- src/file_handle_cache.erl | 32 ++++++++++++++++++++++++++++++++ src/rabbit_memory_monitor.erl | 21 +++++++++++++++------ 2 files changed, 47 insertions(+), 6 deletions(-) diff --git a/src/file_handle_cache.erl b/src/file_handle_cache.erl index d0fd524fb8cd..cd7ba6d38953 100644 --- a/src/file_handle_cache.erl +++ b/src/file_handle_cache.erl @@ -353,6 +353,7 @@ read(Ref, Count) -> read_buffer_rem = BufRem - Count, read_buffer_usage = BufUsg + Count }]}; ([Handle0]) -> + maybe_reduce_read_cache([Ref]), Handle = #handle{read_buffer = Buf, read_buffer_pos = BufPos, read_buffer_rem = BufRem, @@ -962,6 +963,37 @@ tune_read_buffer_limit(Handle = #handle{read_buffer = Buf, false -> Usg * 2 end, Lim)}. +maybe_reduce_read_cache(SparedRefs) -> + case rabbit_memory_monitor:memory_use(bytes) of + {_, infinity} -> ok; + {MemUse, MemLimit} when MemUse < MemLimit -> ok; + {MemUse, MemLimit} -> reduce_read_cache( + (MemUse - MemLimit) * 2, + SparedRefs) + end. + +reduce_read_cache(MemToFree, SparedRefs) -> + Handles = lists:sort( + fun({_, H1}, {_, H2}) -> H1 < H2 end, + [{R, H} || {{R, fhc_handle}, H} <- get(), + not lists:member(R, SparedRefs) + andalso size(H#handle.read_buffer) > 0]), + FreedMem = lists:foldl( + fun + (_, Freed) when Freed >= MemToFree -> + Freed; + ({Ref, #handle{read_buffer = Buf} = Handle}, Freed) -> + Handle1 = reset_read_buffer(Handle), + put({Ref, fhc_handle}, Handle1), + Freed + size(Buf) + end, 0, Handles), + if + FreedMem < MemToFree andalso SparedRefs =/= [] -> + reduce_read_cache(MemToFree - FreedMem, []); + true -> + ok + end. + infos(Items, State) -> [{Item, i(Item, State)} || Item <- Items]. i(total_limit, #fhc_state{limit = Limit}) -> Limit; diff --git a/src/rabbit_memory_monitor.erl b/src/rabbit_memory_monitor.erl index 451ee1f44354..049e65ba4c3f 100644 --- a/src/rabbit_memory_monitor.erl +++ b/src/rabbit_memory_monitor.erl @@ -25,7 +25,7 @@ -behaviour(gen_server2). -export([start_link/0, register/2, deregister/1, - report_ram_duration/2, stop/0, conserve_resources/3]). + report_ram_duration/2, stop/0, conserve_resources/3, memory_use/1]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -92,6 +92,19 @@ conserve_resources(Pid, disk, Conserve) -> conserve_resources(_Pid, _Source, _Conserve) -> ok. +memory_use(bytes) -> + MemoryLimit = vm_memory_monitor:get_memory_limit(), + {erlang:memory(total), case MemoryLimit > 0.0 of + true -> MemoryLimit; + false -> infinity + end}; +memory_use(ratio) -> + MemoryLimit = vm_memory_monitor:get_memory_limit(), + case MemoryLimit > 0.0 of + true -> erlang:memory(total) / MemoryLimit; + false -> infinity + end. + %%---------------------------------------------------------------------------- %% Gen_server callbacks %%---------------------------------------------------------------------------- @@ -223,11 +236,7 @@ desired_duration_average(#state{disk_alarm = false, queue_duration_count = Count}) -> {ok, LimitThreshold} = application:get_env(rabbit, vm_memory_high_watermark_paging_ratio), - MemoryLimit = vm_memory_monitor:get_memory_limit(), - MemoryRatio = case MemoryLimit > 0.0 of - true -> erlang:memory(total) / MemoryLimit; - false -> infinity - end, + MemoryRatio = memory_use(ratio), if MemoryRatio =:= infinity -> 0.0; MemoryRatio < LimitThreshold orelse Count == 0 ->