Skip to content

Commit

Permalink
Don't use q1 q2 and q4 at all anymore
Browse files Browse the repository at this point in the history
  • Loading branch information
lhoguin committed May 11, 2022
1 parent 00a8ca1 commit fd21f4a
Showing 1 changed file with 22 additions and 52 deletions.
74 changes: 22 additions & 52 deletions deps/rabbit/src/rabbit_variable_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -280,11 +280,11 @@
-behaviour(rabbit_backing_queue).

-record(vqstate,
{ q1,
q2,
{ q1, %% Unused.
q2, %% Unused.
delta,
q3,
q4,
q4, %% Unused.
next_seq_id,
%% seq_id() of first undelivered message
%% everything before this seq_id() was delivered at least once
Expand Down Expand Up @@ -329,12 +329,12 @@
io_batch_size,

%% default queue or lazy queue
mode,
mode, %% Unused.
version = 1,
%% number of reduce_memory_usage executions, once it
%% reaches a threshold the queue will manually trigger a runtime GC
%% see: maybe_execute_gc/1
memory_reduction_run_count,
memory_reduction_run_count, %% Unused.
%% Queue data is grouped by VHost. We need to store it
%% to work with queue index.
virtual_host,
Expand Down Expand Up @@ -964,17 +964,15 @@ info(message_bytes_paged_out, #vqstate{delta_transient_bytes = PagedOutBytes}) -
PagedOutBytes;
info(head_message_timestamp, #vqstate{
q3 = Q3,
%% @todo Drop Q4 variable.
q4 = Q4,
ram_pending_ack = RPA,
qi_pending_ack = QPA}) ->
head_message_timestamp(Q3, Q4, RPA, QPA);
head_message_timestamp(Q3, RPA, QPA);
info(disk_reads, #vqstate{disk_read_count = Count}) ->
Count;
info(disk_writes, #vqstate{disk_write_count = Count}) ->
Count;
info(backing_queue_status, #vqstate {
q1 = Q1, q2 = Q2, delta = Delta, q3 = Q3, q4 = Q4,
delta = Delta, q3 = Q3,
mode = Mode,
version = Version,
len = Len,
Expand All @@ -988,11 +986,11 @@ info(backing_queue_status, #vqstate {

[ {mode , Mode},
{version , Version},
{q1 , ?QUEUE:len(Q1)},
{q2 , ?QUEUE:len(Q2)},
{q1 , 0},
{q2 , 0},
{delta , Delta},
{q3 , ?QUEUE:len(Q3)},
{q4 , ?QUEUE:len(Q4)},
{q4 , 0},
{len , Len},
{target_ram_count , TargetRamCount},
{next_seq_id , NextSeqId},
Expand Down Expand Up @@ -1341,12 +1339,11 @@ convert_from_v2_to_v1_loop(QueueName, V1Index0, V2Index0, V2Store0,
%% are paged out - we assume some will soon be paged in rather than
%% forcing it to happen. Pending ack msgs are included as they are
%% regarded as unprocessed until acked, this also prevents the result
%% apparently oscillating during repeated rejects. Q3 is only checked
%% when Q4 is empty as any Q4 msg will be earlier.
head_message_timestamp(Q3, Q4, RPA, QPA) ->
%% apparently oscillating during repeated rejects.
head_message_timestamp(Q3, RPA, QPA) ->
HeadMsgs = [ HeadMsgStatus#msg_status.msg ||
HeadMsgStatus <-
[ get_qs_head([Q4, Q3]),
[ get_q_head(Q3),
get_pa_head(RPA),
get_pa_head(QPA) ],
HeadMsgStatus /= undefined,
Expand All @@ -1364,15 +1361,6 @@ head_message_timestamp(Q3, Q4, RPA, QPA) ->
false -> lists:min(Timestamps)
end.

get_qs_head(Qs) ->
catch lists:foldl(
fun (Q, Acc) ->
case get_q_head(Q) of
undefined -> Acc;
Val -> throw(Val)
end
end, undefined, Qs).

get_q_head(Q) ->
?QUEUE:get(Q, undefined).

Expand Down Expand Up @@ -1960,27 +1948,15 @@ purge_and_index_reset(State) ->
State1 = purge1(process_delivers_and_acks_fun(none), State),
a(reset_qi_state(State1)).

%% This function removes messages from each of {q1, q2, q3, q4}.
%%
%% With remove_queue_entries/3 q1 and q4 are emptied, while q2 and q3
%% are specially handled by purge_betas_and_deltas/2.
%% This function removes messages from each of delta and q3.
%%
%% purge_betas_and_deltas/2 loads messages from the queue index,
%% filling up q3 and in some cases moving messages form q2 to q3 while
%% resetting q2 to an empty queue (see maybe_deltas_to_betas/2). The
%% messages loaded into q3 are removed by calling
%% filling up q3. The messages loaded into q3 are removed by calling
%% remove_queue_entries/3 until there are no more messages to be read
%% from the queue index. Messages are read in batches from the queue
%% index.
purge1(AfterFun, State = #vqstate { q4 = Q4}) ->
State1 = remove_queue_entries(Q4, AfterFun, State),

State2 = #vqstate {q1 = Q1} =
purge_betas_and_deltas(AfterFun, State1#vqstate{q4 = ?QUEUE:new()}),

State3 = remove_queue_entries(Q1, AfterFun, State2),

a(State3#vqstate{q1 = ?QUEUE:new()}).
purge1(AfterFun, State) ->
a(purge_betas_and_deltas(AfterFun, State)).

reset_qi_state(State = #vqstate{ index_mod = IndexMod,
index_state = IndexState0,
Expand Down Expand Up @@ -2592,12 +2568,9 @@ qi_ack_iterator(State) ->

msg_iterator(State) -> istate(start, State).

istate(start, State) -> {q4, State#vqstate.q4, State};
istate(q4, State) -> {q3, State#vqstate.q3, State};
istate(start, State) -> {q3, State#vqstate.q3, State};
istate(q3, State) -> {delta, State#vqstate.delta, State};
istate(delta, State) -> {q2, State#vqstate.q2, State};
istate(q2, State) -> {q1, State#vqstate.q1, State};
istate(q1, _State) -> done.
istate(delta, _State) -> done.

next({ack, It}, IndexState) ->
case gb_trees:next(It) of
Expand Down Expand Up @@ -2694,7 +2667,6 @@ maybe_deltas_to_betas(_DelsAndAcksFun,
State;
maybe_deltas_to_betas(DelsAndAcksFun,
State = #vqstate {
q2 = Q2,
delta = Delta,
q3 = Q3,
index_mod = IndexMod,
Expand Down Expand Up @@ -2736,11 +2708,9 @@ maybe_deltas_to_betas(DelsAndAcksFun,
Q3b = ?QUEUE:join(Q3, Q3a),
case DeltaCount - Q3aLen of
0 ->
%% delta is now empty, but it wasn't before, so
%% can now join q2 onto q3
State2 #vqstate { q2 = ?QUEUE:new(),
delta = ?BLANK_DELTA,
q3 = ?QUEUE:join(Q3b, Q2),
%% delta is now empty
State2 #vqstate { delta = ?BLANK_DELTA,
q3 = Q3b,
delta_transient_bytes = 0};
N when N > 0 ->
Delta1 = d(#delta { start_seq_id = DeltaSeqId1,
Expand Down

0 comments on commit fd21f4a

Please sign in to comment.