Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

CQ: Merge lazy/default behavior into a unified mode #4522

Merged
merged 39 commits into from
Oct 1, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
39 commits
Select commit Hold shift + click to select a range
341e908
CQ: Merge lazy/default behavior into a unified mode
lhoguin Apr 4, 2022
892db25
WIP putting back flush in reduce_memory_usage and other tweaks
lhoguin Apr 12, 2022
1aed8c4
WIP Remove unused callback
lhoguin Apr 12, 2022
397b051
Don't force writing to disk in publish_delivered1
lhoguin May 10, 2022
42e5411
When DeltaCount = 0 we can look at Len directly
lhoguin May 10, 2022
ace4987
Don't reduce memory use
lhoguin May 10, 2022
a5bf555
Remove a missed distinction between default/lazy
lhoguin May 11, 2022
ea0f4c1
Don't use q1 q2 and q4 at all anymore
lhoguin May 11, 2022
d576ca9
Fetch from disk based on consume rate
lhoguin May 12, 2022
de93e89
Fix warning
lhoguin May 19, 2022
38f335e
Rework CQ stats code to be obvious and efficient
lhoguin Jun 3, 2022
e475f46
CQ: Use only ram_pending_acks, not qi_pending_acks
lhoguin Jun 8, 2022
0f8a8ba
CQ: Use maps instead of gb_trees for pending acks
lhoguin Jun 8, 2022
1fb4426
Tweak test suites following CQ changes
lhoguin Jun 9, 2022
a31be66
CQ: Fix prop suite after removal of lazy and other changes
lhoguin Jun 9, 2022
3683ab9
CQ: Use v2 sets instead of gb_sets for confirms
lhoguin Jun 10, 2022
9d5533e
CQv2: Add a simpler confirms code path to improve performance
lhoguin Jun 16, 2022
f4c5f51
CQ: Fix an xref error
lhoguin Jun 16, 2022
615e667
CQ: Enable read/write concurrency for old msg store ets
lhoguin Jun 17, 2022
2b291b1
CQ: Enable read/write concurrency for old msg store ets
mkuratczyk Jun 17, 2022
0c99efe
CQv2: Optimise the per-queue store
lhoguin Jun 27, 2022
3b8ee13
CQv2: Always do the CRC32 check if it was computed on write
lhoguin Jun 27, 2022
649ebbb
CQv2 store: Use raw for file:write_file for the file header
lhoguin Jun 28, 2022
432f8d2
CQv2: Read many messages at once from v2 store when possible
lhoguin Jul 5, 2022
962cc0a
CQv2: Fix property suite
lhoguin Jul 18, 2022
f3963a5
CQv2: Sync/handle confirms before conversion
lhoguin Aug 1, 2022
8051b00
CQv2: Small fixes of and via the property suite
lhoguin Sep 5, 2022
723cc54
CQ: Some cleanup
lhoguin Sep 5, 2022
23f1346
CQ: Some more cleanup
lhoguin Sep 5, 2022
f590201
CQ: Enable borken checks in backing_queue_SUITE again
lhoguin Sep 6, 2022
f1ae007
CQ: Fix test compilation error following rebase
lhoguin Sep 6, 2022
1d7ce62
CQ: Remove a couple more unneded callbacks
lhoguin Sep 8, 2022
0e0635f
CQ: Make the resume/1 function sync to disk
lhoguin Sep 8, 2022
c86254a
CQ: Don't use outgoing rate to throttle purging the queue
lhoguin Sep 8, 2022
ef25732
system_SUITE: wait for messages to be queued
lhoguin Sep 9, 2022
73dd0ac
rabbit_prometheus_http_SUITE: Update tests for new CQs
lhoguin Sep 9, 2022
69efad9
CQ: Update shards count for the property suite
lhoguin Sep 9, 2022
e09cbeb
CQ: Fix channel_operation_timeout_SUITE mixed versions
lhoguin Sep 9, 2022
1eb1710
CQ: Update long description at the top of the module
lhoguin Sep 20, 2022
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion deps/rabbit/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -309,7 +309,7 @@ suites = [
PACKAGE,
name = "classic_queue_prop_SUITE",
size = "large",
shard_count = 5,
shard_count = 3,
sharding_method = "case",
deps = [
"@proper//:erlang_app",
Expand Down
8 changes: 0 additions & 8 deletions deps/rabbit/src/rabbit_amqqueue_process.erl
Original file line number Diff line number Diff line change
Expand Up @@ -1274,9 +1274,6 @@ prioritise_cast(Msg, _Len, State) ->
%% stack are optimised for that) and to make things easier to reason
%% about. Finally, we prioritise ack over resume since it should
%% always reduce memory use.
%% bump_reduce_memory_use is prioritised over publishes, because sending
%% credit to self is hard to reason about. Consumers can continue while
%% reduce_memory_use is in progress.

consumer_bias(#q{backing_queue = BQ, backing_queue_state = BQS}, Low, High) ->
case BQ:msg_rates(BQS) of
Expand All @@ -1294,7 +1291,6 @@ prioritise_info(Msg, _Len, #q{q = Q}) ->
{drop_expired, _Version} -> 8;
emit_stats -> 7;
sync_timeout -> 6;
bump_reduce_memory_use -> 1;
_ -> 0
end.

Expand Down Expand Up @@ -1776,10 +1772,6 @@ handle_info({bump_credit, Msg}, State = #q{backing_queue = BQ,
%% rabbit_variable_queue:msg_store_write/4.
credit_flow:handle_bump_msg(Msg),
noreply(State#q{backing_queue_state = BQ:resume(BQS)});
handle_info(bump_reduce_memory_use, State = #q{backing_queue = BQ,
backing_queue_state = BQS0}) ->
BQS1 = BQ:handle_info(bump_reduce_memory_use, BQS0),
noreply(State#q{backing_queue_state = BQ:resume(BQS1)});

handle_info(Info, State) ->
{stop, {unhandled_info, Info}, State}.
Expand Down
5 changes: 0 additions & 5 deletions deps/rabbit/src/rabbit_backing_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -257,11 +257,6 @@
[ack()], Acc, state())
-> Acc.

%% Called when rabbit_amqqueue_process receives a message via
%% handle_info and it should be processed by the backing
%% queue
-callback handle_info(term(), state()) -> state().

-spec info_keys() -> rabbit_types:info_keys().

info_keys() -> ?INFO_KEYS.
31 changes: 17 additions & 14 deletions deps/rabbit/src/rabbit_classic_queue_index_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

-export([erase/1, init/3, reset_state/1, recover/7,
terminate/3, delete_and_terminate/1,
publish/7, ack/2, read/3]).
publish/7, publish/8, ack/2, read/3]).

%% Recovery. Unlike other functions in this module, these
%% apply to all queues all at once.
Expand Down Expand Up @@ -111,7 +111,7 @@
%% and there are outstanding unconfirmed messages.
%% In that case the buffer is flushed to disk when
%% the queue requests a sync (after a timeout).
confirms = gb_sets:new() :: gb_sets:set(),
confirms = sets:new([{version,2}]) :: sets:set(),

%% Segments we currently know of along with the
%% number of unacked messages remaining in the
Expand Down Expand Up @@ -156,7 +156,7 @@

%% Types copied from rabbit_queue_index.

-type on_sync_fun() :: fun ((gb_sets:set()) -> ok).
-type on_sync_fun() :: fun ((sets:set()) -> ok).
-type contains_predicate() :: fun ((rabbit_types:msg_id()) -> boolean()).
-type shutdown_terms() :: list() | 'non_clean_shutdown'.

Expand Down Expand Up @@ -290,7 +290,7 @@ recover_segments(State0 = #qi { queue_name = Name, dir = Dir }, Terms, IsMsgStor
list_to_integer(filename:basename(F, ?SEGMENT_EXTENSION))
|| F <- SegmentFiles]),
%% We use a temporary store state to check that messages do exist.
StoreState0 = rabbit_classic_queue_store_v2:init(Name, OnSyncMsgFun),
StoreState0 = rabbit_classic_queue_store_v2:init(Name),
{State1, StoreState} = recover_segments(State0, ContainsCheckFun, StoreState0, CountersRef, Segments),
_ = rabbit_classic_queue_store_v2:terminate(StoreState),
State1
Expand Down Expand Up @@ -482,7 +482,7 @@ recover_index_v1_dirty(State0 = #qi{ queue_name = Name }, Terms, IsMsgStoreClean
recover_index_v1_common(State0 = #qi{ queue_name = Name, dir = Dir },
V1State, CountersRef) ->
%% Use a temporary per-queue store state to store embedded messages.
StoreState0 = rabbit_classic_queue_store_v2:init(Name, fun(_, _) -> ok end),
StoreState0 = rabbit_classic_queue_store_v2:init(Name),
%% Go through the v1 index and publish messages to the v2 index.
{LoSeqId, HiSeqId, _} = rabbit_queue_index:bounds(V1State),
%% When resuming after a crash we need to double check the messages that are both
Expand Down Expand Up @@ -564,9 +564,12 @@ delete_and_terminate(State = #qi { dir = Dir,
rabbit_types:message_properties(), boolean(),
non_neg_integer() | infinity, State) -> State when State::state().

publish(MsgId, SeqId, Location, Props, IsPersistent, TargetRamCount, State) ->
publish(MsgId, SeqId, Location, Props, IsPersistent, true, TargetRamCount, State).

%% Because we always persist to the msg_store, the Msg(Or)Id argument
%% here is always a binary, never a record.
publish(MsgId, SeqId, Location, Props, IsPersistent, TargetRamCount,
publish(MsgId, SeqId, Location, Props, IsPersistent, ShouldConfirm, TargetRamCount,
State0 = #qi { write_buffer = WriteBuffer0,
segments = Segments }) ->
?DEBUG("~0p ~0p ~0p ~0p ~0p ~0p ~0p", [MsgId, SeqId, Location, Props, IsPersistent, TargetRamCount, State0]),
Expand All @@ -583,7 +586,7 @@ publish(MsgId, SeqId, Location, Props, IsPersistent, TargetRamCount,
end,
%% When publisher confirms have been requested for this
%% message we mark the message as unconfirmed.
State = maybe_mark_unconfirmed(MsgId, Props, State2),
State = maybe_mark_unconfirmed(MsgId, Props, ShouldConfirm, State2),
maybe_flush_buffer(State, SegmentEntryCount).

new_segment_file(Segment, SegmentEntryCount, State = #qi{ segments = Segments }) ->
Expand Down Expand Up @@ -657,9 +660,9 @@ reduce_fd_usage(SegmentToOpen, State = #qi{ fds = OpenFds0 }) ->
end.

maybe_mark_unconfirmed(MsgId, #message_properties{ needs_confirming = true },
State = #qi { confirms = Confirms }) ->
State#qi{ confirms = gb_sets:add_element(MsgId, Confirms) };
maybe_mark_unconfirmed(_, _, State) ->
true, State = #qi { confirms = Confirms }) ->
State#qi{ confirms = sets:add_element(MsgId, Confirms) };
maybe_mark_unconfirmed(_, _, _, State) ->
State.

maybe_flush_buffer(State = #qi { write_buffer = WriteBuffer,
Expand Down Expand Up @@ -1055,19 +1058,19 @@ sync(State0 = #qi{ confirms = Confirms,
on_sync = OnSyncFun }) ->
?DEBUG("~0p", [State0]),
State = flush_buffer(State0, full, segment_entry_count()),
_ = case gb_sets:is_empty(Confirms) of
_ = case sets:is_empty(Confirms) of
true ->
ok;
false ->
OnSyncFun(Confirms)
end,
State#qi{ confirms = gb_sets:new() }.
State#qi{ confirms = sets:new([{version,2}]) }.

-spec needs_sync(state()) -> 'false'.

needs_sync(State = #qi{ confirms = Confirms }) ->
?DEBUG("~0p", [State]),
case gb_sets:is_empty(Confirms) of
case sets:is_empty(Confirms) of
true -> false;
false -> confirms
end.
Expand Down Expand Up @@ -1183,7 +1186,7 @@ stop(VHost) ->

pre_publish(MsgOrId, SeqId, Location, Props, IsPersistent, TargetRamCount, State) ->
?DEBUG("~0p ~0p ~0p ~0p ~0p ~0p ~0p", [MsgOrId, SeqId, Location, Props, IsPersistent, TargetRamCount, State]),
publish(MsgOrId, SeqId, Location, Props, IsPersistent, TargetRamCount, State).
publish(MsgOrId, SeqId, Location, Props, IsPersistent, false, TargetRamCount, State).

flush_pre_publish_cache(TargetRamCount, State) ->
?DEBUG("~0p ~0p", [TargetRamCount, State]),
Expand Down
Loading