From ecf46002e0bf827208dae90d4082efbbebc47b74 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lo=C3=AFc=20Hoguin?= Date: Mon, 4 Mar 2024 10:08:22 +0100 Subject: [PATCH] Remove availability of CQv1 We reject CQv1 in rabbit.schema as well. Most of the v1 code is still around as it is needed for conversion to v2. It will be removed at a later time when conversion is no longer supported. We don't shard the CQ property suite anymore: there's only 1 case remaining. --- deps/rabbit/BUILD.bazel | 2 - deps/rabbit/priv/schema/rabbit.schema | 4 +- deps/rabbit/src/rabbit_amqqueue_process.erl | 6 +- .../src/rabbit_classic_queue_index_v2.erl | 2 +- deps/rabbit/src/rabbit_queue_index.erl | 89 +--- deps/rabbit/src/rabbit_variable_queue.erl | 466 +++--------------- deps/rabbit/test/backing_queue_SUITE.erl | 42 +- deps/rabbit/test/classic_queue_prop_SUITE.erl | 298 +---------- deps/rabbit/test/priority_queue_SUITE.erl | 4 +- deps/rabbit/test/unicode_SUITE.erl | 8 - .../test/rabbit_mgmt_http_SUITE.erl | 10 +- deps/rabbitmq_mqtt/test/shared_SUITE.erl | 5 +- deps/rabbitmq_mqtt/test/v5_SUITE.erl | 3 +- 13 files changed, 116 insertions(+), 823 deletions(-) diff --git a/deps/rabbit/BUILD.bazel b/deps/rabbit/BUILD.bazel index 80891cf7382c..8571bc0b8caf 100644 --- a/deps/rabbit/BUILD.bazel +++ b/deps/rabbit/BUILD.bazel @@ -319,8 +319,6 @@ rabbitmq_integration_suite( rabbitmq_integration_suite( name = "classic_queue_prop_SUITE", size = "large", - shard_count = 6, - sharding_method = "case", deps = [ "@proper//:erlang_app", ], diff --git a/deps/rabbit/priv/schema/rabbit.schema b/deps/rabbit/priv/schema/rabbit.schema index 226a86e96531..53d4556094cf 100644 --- a/deps/rabbit/priv/schema/rabbit.schema +++ b/deps/rabbit/priv/schema/rabbit.schema @@ -2538,8 +2538,8 @@ end}. {translation, "rabbit.classic_queue_default_version", fun(Conf) -> - case cuttlefish:conf_get("classic_queue.default_version", Conf, 1) of - 1 -> 1; + case cuttlefish:conf_get("classic_queue.default_version", Conf, 2) of + 1 -> cuttlefish:invalid("Classic queues v1 are no longer supported"); 2 -> 2; _ -> cuttlefish:unset() end diff --git a/deps/rabbit/src/rabbit_amqqueue_process.erl b/deps/rabbit/src/rabbit_amqqueue_process.erl index f645faef88b9..cd74ccc393b3 100644 --- a/deps/rabbit/src/rabbit_amqqueue_process.erl +++ b/deps/rabbit/src/rabbit_amqqueue_process.erl @@ -477,12 +477,8 @@ init_queue_mode(Mode, State = #q {backing_queue = BQ, init_queue_version(Version0, State = #q {backing_queue = BQ, backing_queue_state = BQS}) -> - %% When the version is undefined we use the default version 1. - %% We want to BQ:set_queue_version in all cases because a v2 - %% policy might have been deleted, for example, and we want - %% the queue to go back to v1. Version = case Version0 of - undefined -> rabbit_misc:get_env(rabbit, classic_queue_default_version, 1); + undefined -> 2; _ -> Version0 end, BQS1 = BQ:set_queue_version(Version, BQS), diff --git a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl index 4f8a2403e121..ef0f042c6214 100644 --- a/deps/rabbit/src/rabbit_classic_queue_index_v2.erl +++ b/deps/rabbit/src/rabbit_classic_queue_index_v2.erl @@ -1078,7 +1078,7 @@ sync(State0 = #qi{ confirms = Confirms, end, State#qi{ confirms = sets:new([{version,2}]) }. --spec needs_sync(state()) -> 'false'. +-spec needs_sync(state()) -> 'false' | 'confirms'. needs_sync(State = #qi{ confirms = Confirms }) -> ?DEBUG("~0p", [State]), diff --git a/deps/rabbit/src/rabbit_queue_index.erl b/deps/rabbit/src/rabbit_queue_index.erl index 346b39abec78..249e870af775 100644 --- a/deps/rabbit/src/rabbit_queue_index.erl +++ b/deps/rabbit/src/rabbit_queue_index.erl @@ -313,7 +313,9 @@ init_for_conversion(#resource{ virtual_host = VHost } = Name, OnSyncFun, OnSyncM 'undefined' | non_neg_integer(), qistate()}. recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered, - ContainsCheckFun, OnSyncFun, OnSyncMsgFun, Context) -> + ContainsCheckFun, OnSyncFun, OnSyncMsgFun, + %% We only allow using this module when converting to v2. + convert) -> #{segment_entry_count := SegmentEntryCount} = rabbit_vhost:read_config(VHost), put(segment_entry_count, SegmentEntryCount), VHostDir = rabbit_vhost:msg_store_dir_path(VHost), @@ -323,10 +325,10 @@ recover(#resource{ virtual_host = VHost } = Name, Terms, MsgStoreRecovered, CleanShutdown = Terms /= non_clean_shutdown, case CleanShutdown andalso MsgStoreRecovered of true -> case proplists:get_value(segments, Terms, non_clean_shutdown) of - non_clean_shutdown -> init_dirty(false, ContainsCheckFun, State1, Context); + non_clean_shutdown -> init_dirty(false, ContainsCheckFun, State1); RecoveredCounts -> init_clean(RecoveredCounts, State1) end; - false -> init_dirty(CleanShutdown, ContainsCheckFun, State1, Context) + false -> init_dirty(CleanShutdown, ContainsCheckFun, State1) end. -spec terminate(rabbit_types:vhost(), [any()], qistate()) -> qistate(). @@ -644,7 +646,7 @@ init_clean(RecoveredCounts, State) -> -define(RECOVER_BYTES, 2). -define(RECOVER_COUNTER_SIZE, 2). -init_dirty(CleanShutdown, ContainsCheckFun, State, Context) -> +init_dirty(CleanShutdown, ContainsCheckFun, State) -> %% Recover the journal completely. This will also load segments %% which have entries in the journal and remove duplicates. The %% counts will correctly reflect the combination of the segment @@ -679,84 +681,7 @@ init_dirty(CleanShutdown, ContainsCheckFun, State, Context) -> %% recovery fails with a crash. State2 = flush_journal(State1 #qistate { segments = Segments1, dirty_count = DirtyCount }), - case Context of - convert -> - {Count, Bytes, State2}; - main -> - %% We try to see if there are segment files from the v2 index. - case rabbit_file:wildcard(".*\\.qi", Dir) of - %% We are recovering a dirty queue that was using the v2 index or in - %% the process of converting from v2 to v1. - [_|_] -> - #resource{virtual_host = VHost, name = QName} = State2#qistate.queue_name, - rabbit_log:info("Queue ~ts in vhost ~ts recovered ~b total messages before resuming convert", - [QName, VHost, Count]), - CountersRef = counters:new(?RECOVER_COUNTER_SIZE, []), - State3 = recover_index_v2_dirty(State2, ContainsCheckFun, CountersRef), - {Count + counters:get(CountersRef, ?RECOVER_COUNT), - Bytes + counters:get(CountersRef, ?RECOVER_BYTES), - State3}; - %% Otherwise keep default values. - [] -> - {Count, Bytes, State2} - end - end. - -recover_index_v2_dirty(State0 = #qistate { queue_name = Name, - on_sync = OnSyncFun, - on_sync_msg = OnSyncMsgFun }, - ContainsCheckFun, CountersRef) -> - #resource{virtual_host = VHost, name = QName} = Name, - rabbit_log:info("Converting queue ~ts in vhost ~ts from v2 to v1 after unclean shutdown", [QName, VHost]), - %% We cannot use the counts/bytes because some messages may be in both - %% the v1 and v2 indexes after a crash. - {_, _, V2State} = rabbit_classic_queue_index_v2:recover(Name, non_clean_shutdown, true, - ContainsCheckFun, OnSyncFun, OnSyncMsgFun, - convert), - State = recover_index_v2_common(State0, V2State, CountersRef), - rabbit_log:info("Queue ~ts in vhost ~ts converted ~b total messages from v2 to v1", - [QName, VHost, counters:get(CountersRef, ?RECOVER_COUNT)]), - State. - -%% At this point all messages are persistent because transient messages -%% were dropped during the v2 index recovery. -recover_index_v2_common(State0 = #qistate { queue_name = Name, dir = Dir }, - V2State, CountersRef) -> - %% Use a temporary per-queue store state to read embedded messages. - StoreState0 = rabbit_classic_queue_store_v2:init(Name), - %% Go through the v2 index and publish messages to v1 index. - {LoSeqId, HiSeqId, _} = rabbit_classic_queue_index_v2:bounds(V2State), - %% When resuming after a crash we need to double check the messages that are both - %% in the v1 and v2 index (effectively the messages below the upper bound of the - %% v1 index that are about to be written to it). - {_, V1HiSeqId, _} = bounds(State0), - SkipFun = fun - (SeqId, FunState0) when SeqId < V1HiSeqId -> - case read(SeqId, SeqId + 1, FunState0) of - %% Message already exists, skip. - {[_], FunState} -> - {skip, FunState}; - %% Message doesn't exist, write. - {[], FunState} -> - {write, FunState} - end; - %% Message is out of bounds of the v1 index. - (_, FunState) -> - {write, FunState} - end, - %% We use a common function also used with conversion on policy change. - {State1, _StoreState} = rabbit_variable_queue:convert_from_v2_to_v1_loop(Name, State0, V2State, StoreState0, - {CountersRef, ?RECOVER_COUNT, ?RECOVER_BYTES}, - LoSeqId, HiSeqId, SkipFun), - %% Delete any remaining v2 index files. - OldFiles = rabbit_file:wildcard(".*\\.qi", Dir) - ++ rabbit_file:wildcard(".*\\.qs", Dir), - _ = [rabbit_file:delete(filename:join(Dir, F)) || F <- OldFiles], - %% Ensure that everything in the v1 index is written to disk. - State = flush(State1), - %% Clean up all the garbage that we have surely been creating. - garbage_collect(), - State. + {Count, Bytes, State2}. terminate(State = #qistate { journal_handle = JournalHdl, segments = Segments }) -> diff --git a/deps/rabbit/src/rabbit_variable_queue.erl b/deps/rabbit/src/rabbit_variable_queue.erl index 9e54844c1940..0c5e59b73c91 100644 --- a/deps/rabbit/src/rabbit_variable_queue.erl +++ b/deps/rabbit/src/rabbit_variable_queue.erl @@ -21,9 +21,9 @@ -export([start/2, stop/1]). -%% Used during dirty recovery to resume conversion between versions. +%% This function is used by rabbit_classic_queue_index_v2 +%% to convert v1 queues to v2 after an upgrade to 4.0. -export([convert_from_v1_to_v2_loop/8]). --export([convert_from_v2_to_v1_loop/8]). %% exported for testing only -export([start_msg_store/3, stop_msg_store/1, init/5]). @@ -52,20 +52,13 @@ %% %% Messages are persisted using a queue index and a message store. %% A few different scenarios may play out depending on the message -%% size and the queue-version argument. +%% size: %% -%% - queue-version=1, size < qi_msgs_embed_below: both the message -%% metadata and content are stored in rabbit_queue_index -%% -%% - queue-version=1, size >= qi_msgs_embed_below: the metadata -%% is stored in rabbit_queue_index, while the content is stored -%% in the per-vhost shared rabbit_msg_store -%% -%% - queue-version=2, size < qi_msgs_embed_below: the metadata +%% - size < qi_msgs_embed_below: the metadata %% is stored in rabbit_classic_queue_index_v2, while the content %% is stored in the per-queue rabbit_classic_queue_store_v2 %% -%% - queue-version=2, size >= qi_msgs_embed_below: the metadata +%% - size >= qi_msgs_embed_below: the metadata %% is stored in rabbit_classic_queue_index_v2, while the content %% is stored in the per-vhost shared rabbit_msg_store %% @@ -179,7 +172,7 @@ ram_pending_ack, %% msgs still in RAM disk_pending_ack, %% msgs in store, paged out qi_pending_ack, %% Unused. - index_mod, + index_mod, %% Unused. index_state, store_state, msg_store_clients, @@ -222,7 +215,7 @@ %% default queue or lazy queue mode, %% Unused. - version = 1, + version = 2, %% Unused. %% Fast path for confirms handling. Instead of having %% index/store keep track of confirms separately and %% doing intersect/subtract/union we just put the messages @@ -311,7 +304,6 @@ ram_pending_ack :: map(), disk_pending_ack :: map(), qi_pending_ack :: undefined, - index_mod :: rabbit_queue_index | rabbit_classic_queue_index_v2, index_state :: any(), store_state :: any(), msg_store_clients :: 'undefined' | {{any(), binary()}, @@ -346,7 +338,7 @@ io_batch_size :: pos_integer(), mode :: 'default' | 'lazy', - version :: 1 | 2, + version :: 2, unconfirmed_simple :: sets:set()}. -define(BLANK_DELTA, #delta { start_seq_id = undefined, @@ -431,14 +423,11 @@ init(Queue, Recover, Callback) -> init(Q, new, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) when ?is_amqqueue(Q) -> QueueName = amqqueue:get_name(Q), IsDurable = amqqueue:is_durable(Q), - %% We resolve the queue version immediately to avoid converting - %% between queue versions unnecessarily. - IndexMod = index_mod(Q), - IndexState = IndexMod:init(QueueName, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), + IndexState = rabbit_classic_queue_index_v2:init(QueueName, + MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), StoreState = rabbit_classic_queue_store_v2:init(QueueName), VHost = QueueName#resource.virtual_host, - init(queue_version(Q), - IsDurable, IndexMod, IndexState, StoreState, 0, 0, [], + init(IsDurable, IndexState, StoreState, 0, 0, [], case IsDurable of true -> msg_store_client_init(?PERSISTENT_MSG_STORE, MsgOnDiskFun, VHost); @@ -466,10 +455,8 @@ init(Q, Terms, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) when ?is_amqqu end, TransientClient = msg_store_client_init(?TRANSIENT_MSG_STORE, undefined, VHost), - %% We MUST resolve the queue version immediately in order to recover. - IndexMod = index_mod(Q), {DeltaCount, DeltaBytes, IndexState} = - IndexMod:recover( + rabbit_classic_queue_index_v2:recover( QueueName, RecoveryTerms, rabbit_vhost_msg_store:successfully_recovered_state( VHost, @@ -477,8 +464,8 @@ init(Q, Terms, MsgOnDiskFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun) when ?is_amqqu ContainsCheckFun, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun, main), StoreState = rabbit_classic_queue_store_v2:init(QueueName), - init(queue_version(Q), - IsDurable, IndexMod, IndexState, StoreState, DeltaCount, DeltaBytes, RecoveryTerms, + init(IsDurable, IndexState, StoreState, + DeltaCount, DeltaBytes, RecoveryTerms, PersistentClient, TransientClient, VHost). process_recovery_terms(Terms=non_clean_shutdown) -> @@ -489,27 +476,12 @@ process_recovery_terms(Terms) -> PRef -> {PRef, Terms} end. -queue_version(Q) -> - Resolve = fun(_, ArgVal) -> ArgVal end, - case rabbit_queue_type_util:args_policy_lookup(<<"queue-version">>, Resolve, Q) of - undefined -> rabbit_misc:get_env(rabbit, classic_queue_default_version, 1); - Vsn when is_integer(Vsn) -> Vsn; - Vsn -> binary_to_integer(Vsn) - end. - -index_mod(Q) -> - case queue_version(Q) of - 1 -> rabbit_queue_index; - 2 -> rabbit_classic_queue_index_v2 - end. - terminate(_Reason, State) -> State1 = #vqstate { virtual_host = VHost, next_seq_id = NextSeqId, next_deliver_seq_id = NextDeliverSeqId, persistent_count = PCount, persistent_bytes = PBytes, - index_mod = IndexMod, index_state = IndexState, store_state = StoreState, msg_store_clients = {MSCStateP, MSCStateT} } = @@ -526,7 +498,7 @@ terminate(_Reason, State) -> {persistent_count, PCount}, {persistent_bytes, PBytes}], a(State1#vqstate { - index_state = IndexMod:terminate(VHost, Terms, IndexState), + index_state = rabbit_classic_queue_index_v2:terminate(VHost, Terms, IndexState), store_state = rabbit_classic_queue_store_v2:terminate(StoreState), msg_store_clients = undefined }). @@ -649,8 +621,7 @@ ack([SeqId], State) -> end; ack(AckTags, State) -> {{IndexOnDiskSeqIds, MsgIdsByStore, SeqIdsInStore, AllMsgIds}, - State1 = #vqstate { index_mod = IndexMod, - index_state = IndexState, + State1 = #vqstate { index_state = IndexState, store_state = StoreState0, ack_out_counter = AckOutCount }} = lists:foldl( @@ -664,7 +635,7 @@ ack(AckTags, State) -> {accumulate_ack(MsgStatus, Acc), State3} end end, {accumulate_ack_init(), State}, AckTags), - {DeletedSegments, IndexState1} = IndexMod:ack(IndexOnDiskSeqIds, IndexState), + {DeletedSegments, IndexState1} = rabbit_classic_queue_index_v2:ack(IndexOnDiskSeqIds, IndexState), StoreState1 = rabbit_classic_queue_store_v2:delete_segments(DeletedSegments, StoreState0), StoreState = lists:foldl(fun rabbit_classic_queue_store_v2:remove/2, StoreState1, SeqIdsInStore), State2 = remove_vhost_msgs_by_id(MsgIdsByStore, State1), @@ -762,36 +733,32 @@ ram_duration(State) -> State1 = update_rates(State), {infinity, State1}. -needs_timeout(#vqstate { index_mod = IndexMod, - index_state = IndexState, +needs_timeout(#vqstate { index_state = IndexState, unconfirmed_simple = UCS }) -> - case {IndexMod:needs_sync(IndexState), sets:is_empty(UCS)} of + case {rabbit_classic_queue_index_v2:needs_sync(IndexState), sets:is_empty(UCS)} of {false, false} -> timed; {confirms, _} -> timed; - {other, _} -> idle; {false, true} -> false end. -timeout(State = #vqstate { index_mod = IndexMod, - index_state = IndexState0, +timeout(State = #vqstate { index_state = IndexState0, store_state = StoreState0, unconfirmed_simple = UCS, confirmed = C }) -> - IndexState = IndexMod:sync(IndexState0), + IndexState = rabbit_classic_queue_index_v2:sync(IndexState0), StoreState = rabbit_classic_queue_store_v2:sync(StoreState0), State #vqstate { index_state = IndexState, store_state = StoreState, unconfirmed_simple = sets:new([{version,2}]), confirmed = sets:union(C, UCS) }. -handle_pre_hibernate(State = #vqstate { index_mod = IndexMod, - index_state = IndexState0, +handle_pre_hibernate(State = #vqstate { index_state = IndexState0, store_state = StoreState0, msg_store_clients = MSCState0, unconfirmed_simple = UCS, confirmed = C }) -> MSCState = msg_store_pre_hibernate(MSCState0), - IndexState = IndexMod:flush(IndexState0), + IndexState = rabbit_classic_queue_index_v2:flush(IndexState0), StoreState = rabbit_classic_queue_store_v2:sync(StoreState0), State #vqstate { index_state = IndexState, store_state = StoreState, @@ -843,7 +810,6 @@ info(disk_writes, #vqstate{disk_write_count = Count}) -> info(backing_queue_status, #vqstate { delta = Delta, q3 = Q3, mode = Mode, - version = Version, len = Len, target_ram_count = TargetRamCount, next_seq_id = NextSeqId, @@ -852,7 +818,6 @@ info(backing_queue_status, #vqstate { disk_pending_ack = DPA, unconfirmed = UC, unconfirmed_simple = UCS, - index_mod = IndexMod, index_state = IndexState, store_state = StoreState, rates = #rates { in = AvgIngressRate, @@ -860,7 +825,7 @@ info(backing_queue_status, #vqstate { ack_in = AvgAckIngressRate, ack_out = AvgAckEgressRate }}) -> [ {mode , Mode}, - {version , Version}, + {version , 2}, {q1 , 0}, {q2 , 0}, {delta , Delta}, @@ -876,7 +841,7 @@ info(backing_queue_status, #vqstate { {avg_egress_rate , AvgEgressRate}, {avg_ack_ingress_rate, AvgAckIngressRate}, {avg_ack_egress_rate , AvgAckEgressRate} ] - ++ IndexMod:info(IndexState) + ++ rabbit_classic_queue_index_v2:info(IndexState) ++ rabbit_classic_queue_store_v2:info(StoreState); info(_, _) -> ''. @@ -896,94 +861,12 @@ zip_msgs_and_acks(Msgs, AckTags, Accumulator, _State) -> [{Id, AckTag} | Acc] end, Accumulator, lists:zip(Msgs, AckTags)). -%% No change. -set_queue_version(Version, State = #vqstate { version = Version }) -> - State; -%% v2 -> v1. -set_queue_version(1, State0 = #vqstate { version = 2 }) -> - %% We call timeout/1 so that we sync to disk and get the confirms - %% handled before we do the conversion. This is necessary because - %% v2 now has a simpler confirms code path. - State = timeout(State0), - convert_from_v2_to_v1(State #vqstate { version = 1 }); -%% v1 -> v2. -set_queue_version(2, State0 = #vqstate { version = 1 }) -> - %% We call timeout/1 so that we sync to disk and get the confirms - %% handled before we do the conversion. This is necessary because - %% v2 now has a simpler confirms code path. - State = timeout(State0), - convert_from_v1_to_v2(State #vqstate { version = 2 }). - --define(CONVERT_COUNT, 1). --define(CONVERT_BYTES, 2). %% Unused. --define(CONVERT_COUNTER_SIZE, 2). - -%% We move messages from the v1 index to the v2 index. The message payload -%% is moved to the v2 store if it was embedded, and left in the per-vhost -%% store otherwise. -convert_from_v1_to_v2(State0 = #vqstate{ index_mod = rabbit_queue_index, - index_state = V1Index, - store_state = V2Store0 }) -> - {QueueName, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun} = rabbit_queue_index:init_args(V1Index), - #resource{virtual_host = VHost, name = QName} = QueueName, - rabbit_log:info("Converting running queue ~ts in vhost ~ts from v1 to v2", [QName, VHost]), - State = convert_from_v1_to_v2_in_memory(State0), - V2Index0 = rabbit_classic_queue_index_v2:init_for_conversion(QueueName, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), - %% We do not need to init the v2 per-queue store because we already did so in the queue init. - {LoSeqId, HiSeqId, _} = rabbit_queue_index:bounds(V1Index), - CountersRef = counters:new(?CONVERT_COUNTER_SIZE, []), - {V2Index, V2Store} = convert_from_v1_to_v2_loop(QueueName, V1Index, V2Index0, V2Store0, - {CountersRef, ?CONVERT_COUNT, ?CONVERT_BYTES}, - LoSeqId, HiSeqId, - %% Write all messages. - fun (_, FunState) -> {write, FunState} end), - %% We have already deleted segments files but not the journal. - rabbit_queue_index:delete_journal(V1Index), - rabbit_log:info("Queue ~ts in vhost ~ts converted ~b total messages from v1 to v2", - [QName, VHost, counters:get(CountersRef, ?CONVERT_COUNT)]), - State#vqstate{ index_mod = rabbit_classic_queue_index_v2, - index_state = V2Index, - store_state = V2Store }. - -convert_from_v1_to_v2_in_memory(State = #vqstate{ q1 = Q1b, - q2 = Q2b, - q3 = Q3b, - q4 = Q4b, - ram_pending_ack = RPAb, - disk_pending_ack = DPAb }) -> - Q1 = convert_from_v1_to_v2_queue(Q1b), - Q2 = convert_from_v1_to_v2_queue(Q2b), - Q3 = convert_from_v1_to_v2_queue(Q3b), - Q4 = convert_from_v1_to_v2_queue(Q4b), - %% We also must convert the #msg_status entries in the pending_ack fields. - RPA = convert_from_v1_to_v2_map(RPAb), - DPA = convert_from_v1_to_v2_map(DPAb), - State#vqstate{ q1 = Q1, - q2 = Q2, - q3 = Q3, - q4 = Q4, - ram_pending_ack = RPA, - disk_pending_ack = DPA }. - -%% We change where the message is expected to be persisted to. -%% We do not need to worry about the message location because -%% it will only be in memory or in the per-vhost store. -convert_from_v1_to_v2_queue(Q) -> - List0 = ?QUEUE:to_list(Q), - List = lists:map(fun (MsgStatus) -> convert_from_v1_to_v2_msg_status(MsgStatus) end, List0), - ?QUEUE:from_list(List). - -convert_from_v1_to_v2_map(T) -> - maps:map(fun (_, MsgStatus) -> convert_from_v1_to_v2_msg_status(MsgStatus) end, T). - -convert_from_v1_to_v2_msg_status(MsgStatus) -> - case MsgStatus of - #msg_status{ persist_to = queue_index } -> - MsgStatus#msg_status{ persist_to = queue_store }; - _ -> - MsgStatus - end. +%% Queue version now ignored; only v2 is available. +set_queue_version(_, State) -> + State. +%% This function is used by rabbit_classic_queue_index_v2 +%% to convert v1 queues to v2 after an upgrade to 4.0. convert_from_v1_to_v2_loop(_, _, V2Index, V2Store, _, HiSeqId, HiSeqId, _) -> {V2Index, V2Store}; convert_from_v1_to_v2_loop(QueueName, V1Index0, V2Index0, V2Store0, @@ -1036,159 +919,6 @@ convert_from_v1_to_v2_loop(QueueName, V1Index0, V2Index0, V2Store0, [Name, VHost, length(Messages)]), convert_from_v1_to_v2_loop(QueueName, V1Index, V2Index, V2Store, Counters, UpSeqId, HiSeqId, SkipFun). -%% We move messages from the v1 index to the v2 index. The message payload -%% is moved to the v2 store if it was embedded, and left in the per-vhost -%% store otherwise. -convert_from_v2_to_v1(State0 = #vqstate{ index_mod = rabbit_classic_queue_index_v2, - index_state = V2Index }) -> - {QueueName, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun} = rabbit_classic_queue_index_v2:init_args(V2Index), - #resource{virtual_host = VHost, name = QName} = QueueName, - rabbit_log:info("Converting running queue ~ts in vhost ~ts from v2 to v1", [QName, VHost]), - State = convert_from_v2_to_v1_in_memory(State0), - %% We may have read from the per-queue store state and opened FDs. - #vqstate{ store_state = V2Store0 } = State, - V1Index0 = rabbit_queue_index:init_for_conversion(QueueName, MsgIdxOnDiskFun, MsgAndIdxOnDiskFun), - {LoSeqId, HiSeqId, _} = rabbit_classic_queue_index_v2:bounds(V2Index), - CountersRef = counters:new(?CONVERT_COUNTER_SIZE, []), - {V1Index, V2Store} = convert_from_v2_to_v1_loop(QueueName, V1Index0, V2Index, V2Store0, - {CountersRef, ?CONVERT_COUNT, ?CONVERT_BYTES}, - LoSeqId, HiSeqId, - %% Write all messages. - fun (_, FunState) -> {write, FunState} end), - rabbit_log:info("Queue ~ts in vhost ~ts converted ~b total messages from v2 to v1", - [QName, VHost, counters:get(CountersRef, ?CONVERT_COUNT)]), - %% We have already closed the v2 index/store FDs when deleting the files. - State#vqstate{ index_mod = rabbit_queue_index, - index_state = V1Index, - store_state = rabbit_classic_queue_store_v2:terminate(V2Store) }. - -convert_from_v2_to_v1_in_memory(State0 = #vqstate{ q1 = Q1b, - q2 = Q2b, - q3 = Q3b, - q4 = Q4b, - ram_pending_ack = RPAb, - disk_pending_ack = DPAb }) -> - {Q1, State1} = convert_from_v2_to_v1_queue(Q1b, State0), - {Q2, State2} = convert_from_v2_to_v1_queue(Q2b, State1), - {Q3, State3} = convert_from_v2_to_v1_queue(Q3b, State2), - {Q4, State4} = convert_from_v2_to_v1_queue(Q4b, State3), - %% We also must convert the #msg_status entries in the pending_ack fields. - %% We must separate entries in the queue index from other entries as - %% that is what is expected from the v1 index. - {RPA, State5} = convert_from_v2_to_v1_map(RPAb, State4), - {DPA, State6} = convert_from_v2_to_v1_map(DPAb, State5), - State6#vqstate{ q1 = Q1, - q2 = Q2, - q3 = Q3, - q4 = Q4, - ram_pending_ack = RPA, - disk_pending_ack = DPA }. - -%% We fetch the message from the per-queue store if necessary -%% and mark all messages as delivered to make the v1 index happy. -convert_from_v2_to_v1_queue(Q, State0) -> - List0 = ?QUEUE:to_list(Q), - {List, State} = lists:mapfoldl(fun (MsgStatus, State1) -> - convert_from_v2_to_v1_msg_status(MsgStatus, State1, true) - end, State0, List0), - {?QUEUE:from_list(List), State}. - -convert_from_v2_to_v1_map(T, State) -> - convert_from_v2_to_v1_map_loop(maps:iterator(T), #{}, State). - -convert_from_v2_to_v1_map_loop(Iterator0, Acc, State0) -> - case maps:next(Iterator0) of - none -> - {Acc, State0}; - {Key, Value0, Iterator} -> - {Value, State} = convert_from_v2_to_v1_msg_status(Value0, State0, false), - convert_from_v2_to_v1_map_loop(Iterator, maps:put(Key, Value, Acc), State) - end. - -convert_from_v2_to_v1_msg_status(MsgStatus0, State1 = #vqstate{ store_state = StoreState0, - ram_msg_count = RamMsgCount, - ram_bytes = RamBytes }, Ready) -> - case MsgStatus0 of - #msg_status{ seq_id = SeqId, - msg = undefined, - msg_location = MsgLocation = {rabbit_classic_queue_store_v2, _, _} } -> - {Msg, StoreState} = rabbit_classic_queue_store_v2:read(SeqId, MsgLocation, StoreState0), - MsgStatus = MsgStatus0#msg_status{ msg = Msg, - msg_location = memory, - is_delivered = true, - persist_to = queue_index }, - %% We have read the message into memory. We must also update the stats. - {MsgStatus, State1#vqstate{ store_state = StoreState, - ram_msg_count = RamMsgCount + one_if(Ready), - ram_bytes = RamBytes + msg_size(MsgStatus) }}; - #msg_status{ persist_to = queue_store } -> - {MsgStatus0#msg_status{ is_delivered = true, - persist_to = queue_index }, State1}; - _ -> - {MsgStatus0#msg_status{ is_delivered = true }, State1} - end. - -convert_from_v2_to_v1_loop(_, V1Index, _, V2Store, _, HiSeqId, HiSeqId, _) -> - {V1Index, V2Store}; -convert_from_v2_to_v1_loop(QueueName, V1Index0, V2Index0, V2Store0, - Counters = {CountersRef, CountIx, BytesIx}, - LoSeqId, HiSeqId, SkipFun) -> - UpSeqId = lists:min([rabbit_classic_queue_index_v2:next_segment_boundary(LoSeqId), - HiSeqId]), - {Messages, V2Index1} = rabbit_classic_queue_index_v2:read(LoSeqId, UpSeqId, V2Index0), - {V1Index3, V2Store3} = lists:foldl(fun - %% Read per-queue store messages before writing to the index. - ({_MsgId, SeqId, Location = {rabbit_classic_queue_store_v2, _, _}, Props, IsPersistent}, - {V1Index1, V2Store1}) -> - {Msg, V2Store2} = rabbit_classic_queue_store_v2:read(SeqId, Location, V2Store1), - %% When we are resuming the conversion the messages may have already been written to disk. - %% We do NOT want them written again: this is an error that leads to a corrupted index - %% (because it uses a journal it cannot know whether there's been a double write). - %% We therefore check first if the entry exists and if we need to write it. - V1Index2 = case SkipFun(SeqId, V1Index1) of - {skip, V1Index1a} -> - V1Index1a; - {write, V1Index1a} -> - counters:add(CountersRef, CountIx, 1), - counters:add(CountersRef, BytesIx, Props#message_properties.size), - V1Index1b = rabbit_queue_index:publish(Msg, SeqId, rabbit_queue_index, Props, IsPersistent, infinity, V1Index1a), - rabbit_queue_index:deliver([SeqId], V1Index1b) - end, - {V1Index2, V2Store2}; - %% Keep messages in the per-vhost store where they are. - ({MsgId, SeqId, rabbit_msg_store, Props, IsPersistent}, - {V1Index1, V2Store1}) -> - %% See comment in previous clause. - V1Index2 = case SkipFun(SeqId, V1Index1) of - {skip, V1Index1a} -> - V1Index1a; - {write, V1Index1a} -> - counters:add(CountersRef, CountIx, 1), - counters:add(CountersRef, BytesIx, Props#message_properties.size), - V1Index1b = rabbit_queue_index:publish(MsgId, SeqId, rabbit_msg_store, Props, IsPersistent, infinity, V1Index1a), - rabbit_queue_index:deliver([SeqId], V1Index1b) - end, - {V1Index2, V2Store1}; - %% Ignore messages that are in memory and had an entry written in the index. - %% @todo Remove this clause some time after CMQs get removed as this will become dead code. - ({undefined, _, memory, _, _}, {V1Index1, V2Store1}) -> - {V1Index1, V2Store1} - end, {V1Index0, V2Store0}, Messages), - %% Flush to disk to avoid keeping too much in memory between segments. - V1Index = rabbit_queue_index:flush(V1Index3), - %% We do a garbage collect because the old index may have created a lot of garbage. - garbage_collect(), - %% We have written everything to disk. We can delete the old segment file - %% to free up much needed space, to avoid doubling disk usage during the upgrade. - {DeletedSegments, V2Index} = rabbit_classic_queue_index_v2:delete_segment_file_for_seq_id(LoSeqId, V2Index1), - V2Store = rabbit_classic_queue_store_v2:delete_segments(DeletedSegments, V2Store3), - %% Log some progress to keep the user aware of what's going on, as moving - %% embedded messages can take quite some time. - #resource{virtual_host = VHost, name = Name} = QueueName, - rabbit_log:info("Queue ~ts in vhost ~ts converted ~b messages from v2 to v1", - [Name, VHost, length(Messages)]), - convert_from_v2_to_v1_loop(QueueName, V1Index, V2Index, V2Store, Counters, UpSeqId, HiSeqId, SkipFun). - %% Get the Timestamp property of the first msg, if present. This is %% the one with the oldest timestamp among the heads of the pending %% acks and unread queues. We can't check disk_pending_acks as these @@ -1296,7 +1026,7 @@ one_if(false) -> 0. cons_if(true, E, L) -> [E | L]; cons_if(false, _E, L) -> L. -msg_status(Version, IsPersistent, IsDelivered, SeqId, +msg_status(IsPersistent, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize) -> MsgId = mc:get_annotation(id, Msg), #msg_status{seq_id = SeqId, @@ -1308,7 +1038,7 @@ msg_status(Version, IsPersistent, IsDelivered, SeqId, is_delivered = IsDelivered, msg_location = memory, index_on_disk = false, - persist_to = determine_persist_to(Version, Msg, MsgProps, IndexMaxSize), + persist_to = determine_persist_to(Msg, MsgProps, IndexMaxSize), msg_props = MsgProps}. beta_msg_status({MsgId, SeqId, MsgLocation, MsgProps, IsPersistent}) @@ -1464,9 +1194,9 @@ expand_delta(_SeqId, #delta { count = Count, %% Internal major helpers for Public API %%---------------------------------------------------------------------------- -init(QueueVsn, IsDurable, IndexMod, IndexState, StoreState, DeltaCount, DeltaBytes, Terms, +init(IsDurable, IndexState, StoreState, DeltaCount, DeltaBytes, Terms, PersistentClient, TransientClient, VHost) -> - {LowSeqId, HiSeqId, IndexState1} = IndexMod:bounds(IndexState), + {LowSeqId, HiSeqId, IndexState1} = rabbit_classic_queue_index_v2:bounds(IndexState), {NextSeqId, NextDeliverSeqId, DeltaCount1, DeltaBytes1} = case Terms of @@ -1504,7 +1234,6 @@ init(QueueVsn, IsDurable, IndexMod, IndexState, StoreState, DeltaCount, DeltaByt next_deliver_seq_id = NextDeliverSeqId, ram_pending_ack = #{}, disk_pending_ack = #{}, - index_mod = IndexMod, index_state = IndexState1, store_state = StoreState, msg_store_clients = {PersistentClient, TransientClient}, @@ -1540,7 +1269,6 @@ init(QueueVsn, IsDurable, IndexMod, IndexState, StoreState, DeltaCount, DeltaByt io_batch_size = IoBatchSize, mode = default, - version = QueueVsn, virtual_host = VHost}, a(maybe_deltas_to_betas(State)). @@ -1733,13 +1461,12 @@ remove_from_disk(#msg_status { is_persistent = IsPersistent, msg_location = MsgLocation, index_on_disk = IndexOnDisk }, - State = #vqstate {index_mod = IndexMod, - index_state = IndexState1, + State = #vqstate {index_state = IndexState1, store_state = StoreState0, msg_store_clients = MSCState}) -> {DeletedSegments, IndexState2} = case IndexOnDisk of - true -> IndexMod:ack([SeqId], IndexState1); + true -> rabbit_classic_queue_index_v2:ack([SeqId], IndexState1); false -> {[], IndexState1} end, {StoreState1, State1} = case MsgLocation of @@ -1890,11 +1617,10 @@ purge_and_index_reset(State) -> purge1(AfterFun, State) -> a(purge_betas_and_deltas(AfterFun, State)). -reset_qi_state(State = #vqstate{ index_mod = IndexMod, - index_state = IndexState0, +reset_qi_state(State = #vqstate{ index_state = IndexState0, store_state = StoreState0 }) -> StoreState = rabbit_classic_queue_store_v2:terminate(StoreState0), - IndexState = IndexMod:reset_state(IndexState0), + IndexState = rabbit_classic_queue_index_v2:reset_state(IndexState0), State#vqstate{ index_state = IndexState, store_state = StoreState }. @@ -1946,12 +1672,9 @@ remove_queue_entries1( process_delivers_and_acks_fun(deliver_and_ack) -> %% @todo Make a clause for empty Acks list? - fun (NextDeliverSeqId, Acks, State = #vqstate { index_mod = IndexMod, - index_state = IndexState, + fun (NextDeliverSeqId, Acks, State = #vqstate { index_state = IndexState, store_state = StoreState0}) -> - %% We do not send delivers to the v1 index because - %% we've already done so when publishing. - {DeletedSegments, IndexState1} = IndexMod:ack(Acks, IndexState), + {DeletedSegments, IndexState1} = rabbit_classic_queue_index_v2:ack(Acks, IndexState), StoreState = rabbit_classic_queue_store_v2:delete_segments(DeletedSegments, StoreState0), @@ -1976,7 +1699,6 @@ publish1(Msg, IsDelivered, _ChPid, _Flow, PersistFun, State = #vqstate { q3 = Q3, delta = Delta = #delta { count = DeltaCount }, len = Len, - version = Version, qi_embed_msgs_below = IndexMaxSize, next_seq_id = SeqId, next_deliver_seq_id = NextDeliverSeqId, @@ -1988,7 +1710,7 @@ publish1(Msg, MsgId = mc:get_annotation(id, Msg), IsPersistent = mc:is_persistent(Msg), IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = msg_status(Version, IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize), + MsgStatus = msg_status(IsPersistent1, IsDelivered, SeqId, Msg, MsgProps, IndexMaxSize), %% We allow from 1 to 2048 messages in memory depending on the consume rate. The lower %% limit is at 1 because the queue process will need to access this message to know %% expiration information. @@ -2006,7 +1728,7 @@ publish1(Msg, stats_published_disk(MsgStatus1, State2) end, {UC1, UCS1} = maybe_needs_confirming(NeedsConfirming, persist_to(MsgStatus), - Version, MsgId, UC, UCS), + MsgId, UC, UCS), State3#vqstate{ next_seq_id = SeqId + 1, next_deliver_seq_id = maybe_next_deliver_seq_id(SeqId, NextDeliverSeqId, IsDelivered), in_counter = InCount + 1, @@ -2027,8 +1749,7 @@ publish_delivered1(Msg, MsgProps = #message_properties { needs_confirming = NeedsConfirming }, _ChPid, _Flow, PersistFun, - State = #vqstate { version = Version, - qi_embed_msgs_below = IndexMaxSize, + State = #vqstate { qi_embed_msgs_below = IndexMaxSize, next_seq_id = SeqId, next_deliver_seq_id = NextDeliverSeqId, in_counter = InCount, @@ -2039,11 +1760,11 @@ publish_delivered1(Msg, MsgId = mc:get_annotation(id, Msg), IsPersistent = mc:is_persistent(Msg), IsPersistent1 = IsDurable andalso IsPersistent, - MsgStatus = msg_status(Version, IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize), + MsgStatus = msg_status(IsPersistent1, true, SeqId, Msg, MsgProps, IndexMaxSize), {MsgStatus1, State1} = PersistFun(false, false, MsgStatus, State), State2 = record_pending_ack(m(MsgStatus1), State1), {UC1, UCS1} = maybe_needs_confirming(NeedsConfirming, persist_to(MsgStatus), - Version, MsgId, UC, UCS), + MsgId, UC, UCS), {SeqId, stats_published_pending_acks(MsgStatus1, State2#vqstate{ next_seq_id = SeqId + 1, @@ -2053,14 +1774,14 @@ publish_delivered1(Msg, unconfirmed = UC1, unconfirmed_simple = UCS1 })}. -maybe_needs_confirming(false, _, _, _, UC, UCS) -> +maybe_needs_confirming(false, _, _, UC, UCS) -> {UC, UCS}; %% When storing to the v2 queue store we take the simple confirms %% path because we don't need to track index and store separately. -maybe_needs_confirming(true, queue_store, 2, MsgId, UC, UCS) -> +maybe_needs_confirming(true, queue_store, MsgId, UC, UCS) -> {UC, sets:add_element(MsgId, UCS)}; %% Otherwise we keep tracking as it used to be. -maybe_needs_confirming(true, _, _, MsgId, UC, UCS) -> +maybe_needs_confirming(true, _, MsgId, UC, UCS) -> {sets:add_element(MsgId, UC), UCS}. batch_publish_delivered1({Msg, MsgProps}, {ChPid, Flow, SeqIds, State}) -> @@ -2115,7 +1836,6 @@ maybe_batch_write_index_to_disk(Force, State = #vqstate { target_ram_count = TargetRamCount, disk_write_count = DiskWriteCount, - index_mod = IndexMod, index_state = IndexState}) when Force orelse IsPersistent -> {MsgOrId, DiskWriteCount1} = @@ -2124,19 +1844,9 @@ maybe_batch_write_index_to_disk(Force, queue_store -> {MsgId, DiskWriteCount}; queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1} end, - IndexState1 = case IndexMod of - %% The old index needs IsDelivered to apply some of its optimisations. - %% But because the deliver tracking is now in the queue we always pass 'true'. - %% It also does not need the location so it is not given here. - rabbit_queue_index -> - IndexMod:pre_publish( - MsgOrId, SeqId, MsgProps, IsPersistent, true, - TargetRamCount, IndexState); - _ -> - IndexMod:pre_publish( - MsgOrId, SeqId, MsgLocation, MsgProps, IsPersistent, - TargetRamCount, IndexState) - end, + IndexState1 = rabbit_classic_queue_index_v2:pre_publish( + MsgOrId, SeqId, MsgLocation, MsgProps, + IsPersistent, TargetRamCount, IndexState), {MsgStatus#msg_status{index_on_disk = true}, State#vqstate{index_state = IndexState1, disk_write_count = DiskWriteCount1}}; @@ -2155,7 +1865,6 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status { msg_props = MsgProps}, State = #vqstate{target_ram_count = TargetRamCount, disk_write_count = DiskWriteCount, - index_mod = IndexMod, index_state = IndexState}) when Force orelse IsPersistent -> {MsgOrId, DiskWriteCount1} = @@ -2164,21 +1873,12 @@ maybe_write_index_to_disk(Force, MsgStatus = #msg_status { queue_store -> {MsgId, DiskWriteCount}; queue_index -> {prepare_to_store(Msg), DiskWriteCount + 1} end, - IndexState2 = IndexMod:publish( + IndexState2 = rabbit_classic_queue_index_v2:publish( MsgOrId, SeqId, MsgLocation, MsgProps, IsPersistent, persist_to(MsgStatus) =:= msg_store, TargetRamCount, IndexState), - %% We always deliver messages when the old index is used. - %% We are actually tracking message deliveries per-queue - %% but the old index expects delivers to be handled - %% per-message. Always delivering on publish prevents - %% issues related to delivers. - IndexState3 = case IndexMod of - rabbit_queue_index -> IndexMod:deliver([SeqId], IndexState2); - _ -> IndexState2 - end, {MsgStatus#msg_status{index_on_disk = true}, - State#vqstate{index_state = IndexState3, + State#vqstate{index_state = IndexState2, disk_write_count = DiskWriteCount1}}; maybe_write_index_to_disk(_Force, MsgStatus, State) -> @@ -2188,20 +1888,19 @@ maybe_write_to_disk(ForceMsg, ForceIndex, MsgStatus, State) -> {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State), maybe_write_index_to_disk(ForceIndex, MsgStatus1, State1). -maybe_prepare_write_to_disk(ForceMsg, ForceIndex0, MsgStatus, State = #vqstate{ version = Version }) -> +maybe_prepare_write_to_disk(ForceMsg, ForceIndex0, MsgStatus, State) -> {MsgStatus1, State1} = maybe_write_msg_to_disk(ForceMsg, MsgStatus, State), %% We want messages written to the v2 per-queue store to also %% be written to the index for proper accounting. The situation %% where a message can be in the store but not in the index can %% only occur when going through this function (not via maybe_write_to_disk). - ForceIndex = case {Version, persist_to(MsgStatus)} of - {2, queue_store} -> true; + ForceIndex = case persist_to(MsgStatus) of + queue_store -> true; _ -> ForceIndex0 end, maybe_batch_write_index_to_disk(ForceIndex, MsgStatus1, State1). -determine_persist_to(Version, - Msg, +determine_persist_to(Msg, #message_properties{size = BodySize}, IndexMaxSize) -> %% The >= is so that you can set the env to 0 and never persist @@ -2224,9 +1923,8 @@ determine_persist_to(Version, false -> Est = MetaSize + BodySize, case Est >= IndexMaxSize of - true -> msg_store; - false when Version =:= 1 -> queue_index; - false when Version =:= 2 -> queue_store + true -> msg_store; + false -> queue_store end end. @@ -2283,14 +1981,13 @@ remove_pending_ack(false, SeqId, State = #vqstate{ram_pending_ack = RPA, end. purge_pending_ack(KeepPersistent, - State = #vqstate { index_mod = IndexMod, - index_state = IndexState, + State = #vqstate { index_state = IndexState, store_state = StoreState0 }) -> {IndexOnDiskSeqIds, MsgIdsByStore, SeqIdsInStore, State1} = purge_pending_ack1(State), case KeepPersistent of true -> remove_transient_msgs_by_id(MsgIdsByStore, State1); false -> {DeletedSegments, IndexState1} = - IndexMod:ack(IndexOnDiskSeqIds, IndexState), + rabbit_classic_queue_index_v2:ack(IndexOnDiskSeqIds, IndexState), StoreState1 = lists:foldl(fun rabbit_classic_queue_store_v2:remove/2, StoreState0, SeqIdsInStore), StoreState = rabbit_classic_queue_store_v2:delete_segments(DeletedSegments, StoreState1), State2 = remove_vhost_msgs_by_id(MsgIdsByStore, State1), @@ -2299,12 +1996,11 @@ purge_pending_ack(KeepPersistent, end. purge_pending_ack_delete_and_terminate( - State = #vqstate { index_mod = IndexMod, - index_state = IndexState, + State = #vqstate { index_state = IndexState, store_state = StoreState }) -> {_, MsgIdsByStore, _SeqIdsInStore, State1} = purge_pending_ack1(State), StoreState1 = rabbit_classic_queue_store_v2:terminate(StoreState), - IndexState1 = IndexMod:delete_and_terminate(IndexState), + IndexState1 = rabbit_classic_queue_index_v2:delete_and_terminate(IndexState), State2 = remove_vhost_msgs_by_id(MsgIdsByStore, State1), State2 #vqstate { index_state = IndexState1, store_state = StoreState1 }. @@ -2510,8 +2206,8 @@ next({delta, #delta{start_seq_id = SeqId, end_seq_id = SeqId}, State}, IndexState) -> next(istate(delta, State), IndexState); next({delta, #delta{start_seq_id = SeqId, - end_seq_id = SeqIdEnd} = Delta, State = #vqstate{index_mod = IndexMod}}, IndexState) -> - SeqIdB = IndexMod:next_segment_boundary(SeqId), + end_seq_id = SeqIdEnd} = Delta, State}, IndexState) -> + SeqIdB = rabbit_classic_queue_index_v2:next_segment_boundary(SeqId), %% It may make sense to limit this based on rate. But this %% is not called outside of CMQs so I will leave it alone %% for the time being. @@ -2520,15 +2216,9 @@ next({delta, #delta{start_seq_id = SeqId, %% otherwise the queue will attempt to read up to segment_entry_count() %% messages from the index each time. The value %% chosen here is arbitrary. - %% @todo We have a problem where reduce_memory_usage puts messages back to 0, - %% and then this or the maybe_deltas_to_betas function is called and it - %% fetches 2048 messages again. This is not good. Maybe the reduce_memory_usage - %% function should reduce the number of messages we fetch at once at the - %% same time (start at 2048, divide by 2 every time we reduce, or something). - %% Maybe expiration does that? SeqId + 2048, SeqIdEnd]), - {List, IndexState1} = IndexMod:read(SeqId, SeqId1, IndexState), + {List, IndexState1} = rabbit_classic_queue_index_v2:read(SeqId, SeqId1, IndexState), next({delta, Delta#delta{start_seq_id = SeqId1}, List, State}, IndexState1); next({delta, Delta, [], State}, IndexState) -> next({delta, Delta, State}, IndexState); @@ -2610,7 +2300,6 @@ maybe_deltas_to_betas(DelsAndAcksFun, State = #vqstate { delta = Delta, q3 = Q3, - index_mod = IndexMod, index_state = IndexState, store_state = StoreState, msg_store_clients = {MCStateP, MCStateT}, @@ -2618,30 +2307,20 @@ maybe_deltas_to_betas(DelsAndAcksFun, ram_bytes = RamBytes, disk_read_count = DiskReadCount, delta_transient_bytes = DeltaTransientBytes, - transient_threshold = TransientThreshold, - version = Version }, + transient_threshold = TransientThreshold }, MemoryLimit, WhatToRead) -> #delta { start_seq_id = DeltaSeqId, count = DeltaCount, transient = Transient, end_seq_id = DeltaSeqIdEnd } = Delta, - %% For v1 we always want to read messages up to the next segment boundary. - %% This is because v1 is not optimised for multiple reads from the same - %% segment: every time we read messages from a segment it has to read - %% and parse the entire segment from disk, filtering the messages we - %% requested afterwards. - %% %% For v2 we want to limit the number of messages read at once to lower %% the memory footprint. We use the consume rate to determine how many %% messages we read. - DeltaSeqLimit = case Version of - 1 -> DeltaSeqIdEnd; - 2 -> DeltaSeqId + MemoryLimit - end, + DeltaSeqLimit = DeltaSeqId + MemoryLimit, DeltaSeqId1 = - lists:min([IndexMod:next_segment_boundary(DeltaSeqId), + lists:min([rabbit_classic_queue_index_v2:next_segment_boundary(DeltaSeqId), DeltaSeqLimit, DeltaSeqIdEnd]), - {List0, IndexState1} = IndexMod:read(DeltaSeqId, DeltaSeqId1, IndexState), + {List0, IndexState1} = rabbit_classic_queue_index_v2:read(DeltaSeqId, DeltaSeqId1, IndexState), {List, StoreState3, MCStateP3, MCStateT3} = case WhatToRead of messages -> %% We try to read messages from disk all at once instead of @@ -2769,10 +2448,9 @@ merge_sh_read_msgs(MTail, _Reads) -> MTail. %% Flushes queue index batch caches and updates queue index state. -ui(#vqstate{index_mod = IndexMod, - index_state = IndexState, +ui(#vqstate{index_state = IndexState, target_ram_count = TargetRamCount} = State) -> - IndexState1 = IndexMod:flush_pre_publish_cache( + IndexState1 = rabbit_classic_queue_index_v2:flush_pre_publish_cache( TargetRamCount, IndexState), State#vqstate{index_state = IndexState1}. diff --git a/deps/rabbit/test/backing_queue_SUITE.erl b/deps/rabbit/test/backing_queue_SUITE.erl index d758adf45fc5..ae37a4a366dd 100644 --- a/deps/rabbit/test/backing_queue_SUITE.erl +++ b/deps/rabbit/test/backing_queue_SUITE.erl @@ -67,8 +67,7 @@ groups() -> {backing_queue_tests, [], [ msg_store, msg_store_file_scan, - {backing_queue_v2, [], Common ++ V2Only}, - {backing_queue_v1, [], Common} + {backing_queue_v2, [], Common ++ V2Only} ]} ]. @@ -124,14 +123,6 @@ init_per_group1(backing_queue_tests, Config) -> "Backing queue module not supported by this test group: ~tp~n", [Module])} end; -init_per_group1(backing_queue_v1, Config) -> - ok = rabbit_ct_broker_helpers:rpc(Config, 0, - application, set_env, [rabbit, classic_queue_default_version, 1]), - Config; -init_per_group1(backing_queue_v2, Config) -> - ok = rabbit_ct_broker_helpers:rpc(Config, 0, - application, set_env, [rabbit, classic_queue_default_version, 2]), - Config; init_per_group1(backing_queue_embed_limit_0, Config) -> ok = rabbit_ct_broker_helpers:rpc(Config, 0, application, set_env, [rabbit, queue_index_embed_msgs_below, 0]), @@ -176,12 +167,6 @@ end_per_group1(backing_queue_tests, Config) -> rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, teardown_backing_queue_test_group, [Config]); end_per_group1(Group, Config) -when Group =:= backing_queue_v1 -orelse Group =:= backing_queue_v2 -> - ok = rabbit_ct_broker_helpers:rpc(Config, 0, - application, unset_env, [rabbit, classic_queue_default_version]), - Config; -end_per_group1(Group, Config) when Group =:= backing_queue_embed_limit_0 orelse Group =:= backing_queue_embed_limit_1024 -> ok = rabbit_ct_broker_helpers:rpc(Config, 0, @@ -745,10 +730,7 @@ bq_queue_index(Config) -> ?MODULE, bq_queue_index1, [Config]). index_mod() -> - case application:get_env(rabbit, classic_queue_default_version) of - {ok, 1} -> rabbit_queue_index; - {ok, 2} -> rabbit_classic_queue_index_v2 - end. + rabbit_classic_queue_index_v2. bq_queue_index1(_Config) -> init_queue_index(), @@ -761,10 +743,7 @@ bq_queue_index1(_Config) -> SeqIdsC = lists:seq(0, trunc(SegmentSize/2)), SeqIdsD = lists:seq(0, SegmentSize*4), - VerifyReadWithPublishedFun = case IndexMod of - rabbit_queue_index -> fun verify_read_with_published_v1/3; - rabbit_classic_queue_index_v2 -> fun verify_read_with_published_v2/3 - end, + VerifyReadWithPublishedFun = fun verify_read_with_published_v2/3, with_empty_test_queue( fun (Qi0, QName) -> @@ -854,8 +833,7 @@ bq_queue_index1(_Config) -> end), %% d) get messages in all states to a segment, then flush, then do - %% the same again, don't flush and read. CQ v1: this will hit all - %% possibilities in combining the segment with the journal. + %% the same again, don't flush and read. with_empty_test_queue( fun (Qi0, _QName) -> {Qi1, [Seven,Five,Four|_]} = queue_index_publish([0,1,2,4,5,7], @@ -882,8 +860,7 @@ bq_queue_index1(_Config) -> Qi10 end), - %% e) as for (d), but use terminate instead of read, which (CQ v1) will - %% exercise journal_minus_segment, not segment_plus_journal. + %% e) as for (d), but use terminate instead of read. with_empty_test_queue( fun (Qi0, QName) -> {Qi1, _SeqIdsMsgIdsE} = queue_index_publish([0,1,2,4,5,7], @@ -909,15 +886,6 @@ bq_queue_index1(_Config) -> passed. -verify_read_with_published_v1(_Persistent, [], _) -> - ok; -verify_read_with_published_v1(Persistent, - [{MsgId, SeqId, _Location, _Props, Persistent}|Read], - [{SeqId, MsgId}|Published]) -> - verify_read_with_published_v1(Persistent, Read, Published); -verify_read_with_published_v1(_Persistent, _Read, _Published) -> - ko. - %% The v2 index does not store the MsgId unless required. %% We therefore do not check it. verify_read_with_published_v2(_Persistent, [], _) -> diff --git a/deps/rabbit/test/classic_queue_prop_SUITE.erl b/deps/rabbit/test/classic_queue_prop_SUITE.erl index 848f9036bbad..baf103fbc89c 100644 --- a/deps/rabbit/test/classic_queue_prop_SUITE.erl +++ b/deps/rabbit/test/classic_queue_prop_SUITE.erl @@ -23,7 +23,6 @@ -record(cq, { amq = undefined :: amqqueue:amqqueue(), name :: atom(), - version :: 1 | 2, %% We have one queue per way of publishing messages (such as channels). %% We can only confirm the publish order on a per-channel level because @@ -73,19 +72,12 @@ %% Common Test. all() -> - [{group, classic_queue_tests}, {group, classic_queue_regressions}]. + [{group, classic_queue_tests}]. groups() -> [{classic_queue_tests, [], [ % manual%, - classic_queue_v1, classic_queue_v2 - ]}, - {classic_queue_regressions, [], [ - reg_v1_full_recover_only_journal, - reg_v1_no_del_jif, - reg_v1_no_del_idx, - reg_v1_no_del_idx_unclean ]} ]. @@ -136,10 +128,10 @@ instrs_to_manual([Instrs]) -> io:format("~ndo_manual(Config) ->~n~n"), lists:foreach(fun ({init, CQ}) -> - #cq{name=Name, version=Version} = CQ, - io:format(" St0 = #cq{name=~0p, version=~0p,~n" + #cq{name=Name} = CQ, + io:format(" St0 = #cq{name=~0p,~n" " config=minimal_config(Config)},~n~n", - [Name, Version]); + [Name]); ({set, {var,Var}, {call, ?MODULE, cmd_setup_queue, _}}) -> Res = "Res" ++ integer_to_list(Var), PrevSt = "St" ++ integer_to_list(Var - 1), @@ -197,15 +189,6 @@ manual(Config) -> do_manual(Config) -> Config =:= Config. -classic_queue_v1(Config) -> - true = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, do_classic_queue_v1, [Config]). - -do_classic_queue_v1(Config) -> - true = proper:quickcheck(prop_classic_queue_v1(Config), - [{on_output, on_output_fun()}, - {numtests, ?NUM_TESTS}]). - classic_queue_v2(Config) -> true = rabbit_ct_broker_helpers:rpc(Config, 0, ?MODULE, do_classic_queue_v2, [Config]). @@ -225,16 +208,11 @@ on_output_fun() -> %% Properties. -prop_classic_queue_v1(Config) -> - {ok, LimiterPid} = rabbit_limiter:start_link(no_id), - InitialState = #cq{name=?FUNCTION_NAME, version=1, - config=minimal_config(Config), limiter=LimiterPid}, - prop_common(InitialState). - prop_classic_queue_v2(Config) -> {ok, LimiterPid} = rabbit_limiter:start_link(no_id), - InitialState = #cq{name=?FUNCTION_NAME, version=2, - config=minimal_config(Config), limiter=LimiterPid}, + InitialState = #cq{name=?FUNCTION_NAME, + config=minimal_config(Config), + limiter=LimiterPid}, prop_common(InitialState). prop_common(InitialState) -> @@ -343,8 +321,8 @@ next_state(St=#cq{q=Q0, confirmed=Confirmed, uncertain=Uncertain0}, AMQ, {call, St#cq{amq=AMQ, q=Q, restarted=true, crashed=true, uncertain=Uncertain}; next_state(St, _, {call, _, cmd_set_v2_check_crc32, _}) -> St; -next_state(St, _, {call, _, cmd_set_version, [Version]}) -> - St#cq{version=Version}; +next_state(St, _, {call, _, cmd_set_version, _}) -> + St; next_state(St=#cq{q=Q}, Msg, {call, _, cmd_publish_msg, _}) -> IntQ = maps:get(internal, Q, queue:new()), St#cq{q=Q#{internal => queue:in(Msg, IntQ)}}; @@ -530,8 +508,10 @@ postcondition(_, {call, _, Cmd, _}, Q) when element(1, Q) =:= amqqueue; postcondition(_, {call, _, cmd_set_v2_check_crc32, _}, Res) -> Res =:= ok; -postcondition(#cq{amq=AMQ}, {call, _, cmd_set_version, [Version]}, _) -> - do_check_queue_version(AMQ, Version) =:= ok; +postcondition(#cq{amq=AMQ}, {call, _, cmd_set_version, _}, _) -> + %% We cannot use CQv1 anymore so we always + %% expect the queue to use v2. + do_check_queue_version(AMQ, 2) =:= ok; postcondition(_, {call, _, cmd_publish_msg, _}, Msg) -> is_record(Msg, amqp_msg); postcondition(_, {call, _, cmd_purge, _}, Res) -> @@ -698,21 +678,16 @@ crashed_and_previously_received(#cq{crashed=Crashed, received=Received}, Msg) -> %% Helpers. -cmd_setup_queue(St=#cq{name=Name, version=Version}) -> +cmd_setup_queue(St=#cq{name=Name}) -> ?DEBUG("~0p", [St]), IsDurable = true, %% We want to be able to restart the queue process. IsAutoDelete = false, - %% We cannot use args to set the version as the arguments override - %% the policies and we also want to test policy changes. - cmd_set_version(Version), - Args = [ -% {<<"x-queue-version">>, long, Version} - ], + Args = [], QName = rabbit_misc:r(<<"/">>, queue, iolist_to_binary([atom_to_binary(Name, utf8), $_, integer_to_binary(erlang:unique_integer([positive]))])), {new, AMQ} = rabbit_amqqueue:declare(QName, IsDurable, IsAutoDelete, Args, none, <<"acting-user">>), - %% We check that the queue was creating with the right version. - ok = do_check_queue_version(AMQ, Version), + %% We check that the queue was created with the right version. + ok = do_check_queue_version(AMQ, 2), AMQ. cmd_teardown_queue(St=#cq{amq=undefined}) -> @@ -788,7 +763,7 @@ do_check_queue_version(AMQ, Version, N) -> timer:sleep(1), [{backing_queue_status, Status}] = rabbit_amqqueue:info(AMQ, [backing_queue_status]), case proplists:get_value(version, Status) of - Version -> ok; + 2 -> ok; _ -> do_check_queue_version(AMQ, Version, N - 1) end. @@ -1098,243 +1073,6 @@ queue_fold(Fun, Acc0, {R, F}) when is_function(Fun, 2), is_list(R), is_list(F) - queue_fold(Fun, Acc0, Q) -> erlang:error(badarg, [Fun, Acc0, Q]). -%% Regression tests. -%% -%% These tests are hard to reproduce by running the test suite normally -%% because they require a very specific sequence of events. - -reg_v1_full_recover_only_journal(Config) -> - true = rabbit_ct_broker_helpers:rpc(Config, 0, - ?MODULE, do_reg_v1_full_recover_only_journal, [Config]). - -do_reg_v1_full_recover_only_journal(Config) -> - - St0 = #cq{name=prop_classic_queue_v1, version=1, - config=minimal_config(Config)}, - - Res1 = cmd_setup_queue(St0), - St3 = St0#cq{amq=Res1}, - - Res4 = cmd_channel_open(St3), - true = postcondition(St3, {call, undefined, cmd_channel_open, [St3]}, Res4), - St7 = next_state(St3, Res4, {call, undefined, cmd_channel_open, [St3]}), - - Res8 = cmd_restart_queue_dirty(St7), - true = postcondition(St7, {call, undefined, cmd_restart_queue_dirty, [St7]}, Res8), - St11 = next_state(St7, Res8, {call, undefined, cmd_restart_queue_dirty, [St7]}), - - Res12 = cmd_channel_publish_many(St11, Res4, 117, 4541, 2, true, undefined), - true = postcondition(St11, {call, undefined, cmd_channel_publish_many, [St11, Res4, 117, 4541, 2, true, undefined]}, Res12), - St14 = next_state(St11, Res12, {call, undefined, cmd_channel_publish_many, [St11, Res4, 117, 4541, 2, true, undefined]}), - - Res15 = cmd_restart_vhost_clean(St14), - true = postcondition(St14, {call, undefined, cmd_restart_vhost_clean, [St14]}, Res15), - St15 = next_state(St14, Res15, {call, undefined, cmd_restart_vhost_clean, [St14]}), - - cmd_teardown_queue(St15), - - true. - -%% The following reg_v1_no_del_* cases test when a classic queue has a -%% published message before an upgrade to 3.10. In that case there is -%% no delivery marker in the v1 queue index. - -%% After upgrade to 3.10 there is a published message in the journal file. -%% Consuming and acknowledging the message should work fine. -reg_v1_no_del_jif(Config) -> - try - true = rabbit_ct_broker_helpers:rpc( - Config, 0, ?MODULE, do_reg_v1_no_del_jif, [Config]) - catch exit:{exception, Reason} -> - exit(Reason) - end. - -do_reg_v1_no_del_jif(Config) -> - St0 = #cq{name=prop_classic_queue_v1, version=1, - config=minimal_config(Config)}, - - Res1 = cmd_setup_queue(St0), - St3 = St0#cq{amq=Res1}, - - {St4, Ch} = cmd(cmd_channel_open, St3, []), - - %% Simulate pre-3.10.0 behaviour by making deliver a noop - ok = meck:new(rabbit_queue_index, [passthrough]), - ok = meck:expect(rabbit_queue_index, deliver, fun(_, State) -> State end), - - {St5, _Res5} = cmd(cmd_channel_publish, St4, [Ch, 4, _Persistent = 2, _NotMandatory = false, _NoExpiration = undefined]), - - %% Enforce syncing journal to disk - %% (Not strictly necessary as vhost restart also triggers a sync) - %% At this point there should be a publish entry in the journal and no segment files - rabbit_amqqueue:pid_of(St5#cq.amq) ! timeout, - - {SyncTime, ok} = timer:tc(fun() -> meck:wait(rabbit_queue_index, sync, '_', 1000) end), - ct:pal("wait for sync took ~p ms", [SyncTime div 1000]), - - %% Simulate RabbitMQ version upgrade by a clean vhost restart - %% (also reset delivery to normal operation) - ok = meck:delete(rabbit_queue_index, deliver, 2), - {St10, _} = cmd(cmd_restart_vhost_clean, St5, []), - - meck:reset(rabbit_queue_index), - - %% Consume the message and acknowledge it - %% The queue index should not crash when finding a pub+ack but no_del in the journal - %% (It used to crash in `action_to_entry/3' with a case_clause) - {St6, _Tag} = cmd(cmd_channel_consume, St10, [Ch]), - receive SomeMsg -> self() ! SomeMsg - after 5000 -> ct:fail(no_message_consumed) - end, - {St7, _Msg = #amqp_msg{}} = cmd(cmd_channel_receive_and_ack, St6, [Ch]), - - %% enforce syncing journal to disk - rabbit_amqqueue:pid_of(St7#cq.amq) ! timeout, - - {SyncTime2, ok} = timer:tc(fun() -> meck:wait(rabbit_queue_index, sync, '_', 1000) end), - ct:pal("wait for sync took ~p ms", [SyncTime2 div 1000]), - - validate_and_teaddown(St7). - -%% After upgrade to 3.10 there is a published message in a segment file. -%% Consuming and acknowledging the message inserts an ack entry in the journal file. -%% A subsequent restart (of the queue/vhost/node) should work fine. -reg_v1_no_del_idx(Config) -> - try - true = rabbit_ct_broker_helpers:rpc( - Config, 0, ?MODULE, do_reg_v1_no_del_idx, [Config]) - catch exit:{exception, Reason} -> - exit(Reason) - end. - -do_reg_v1_no_del_idx(Config) -> - St0 = #cq{name=prop_classic_queue_v1, version=1, - config=minimal_config(Config)}, - - Res1 = cmd_setup_queue(St0), - St3 = St0#cq{amq=Res1}, - - {St4, Ch} = cmd(cmd_channel_open, St3, []), - - %% Simulate pre-3.10.0 behaviour by making deliver a noop - ok = meck:new(rabbit_queue_index, [passthrough]), - ok = meck:expect(rabbit_queue_index, deliver, fun(_, State) -> State end), - - ok = meck:new(rabbit_variable_queue, [passthrough]), - - {St5, _Res5} = cmd(cmd_channel_publish, St4, [Ch, 4, _Persistent = 2, _NotMandatory = false, _NoExpiration = undefined]), - - %% Wait for the queue process to get hibernated - %% handle_pre_hibernate syncs and flushes the journal - %% At this point there should be a publish entry in the segment file and an empty journal - {Time, ok} = timer:tc(fun() -> meck:wait(rabbit_variable_queue, handle_pre_hibernate, '_', 10000) end), - ct:pal("wait for hibernate took ~p ms", [Time div 1000]), - ok = meck:unload(rabbit_variable_queue), - - %% Simulate RabbitMQ version upgrade by a clean vhost restart - %% (also reset delivery to normal operation) - ok = meck:delete(rabbit_queue_index, deliver, 2), - {St10, _} = cmd(cmd_restart_vhost_clean, St5, []), - - %% Consume the message and acknowledge it - {St6, _Tag} = cmd(cmd_channel_consume, St10, [Ch]), - receive SomeMsg -> self() ! SomeMsg - after 5000 -> ct:fail(no_message_consumed) - end, - {St7, _Msg = #amqp_msg{}} = cmd(cmd_channel_receive_and_ack, St6, [Ch]), - - meck:reset(rabbit_queue_index), - - %% enforce syncing journal to disk - %% At this point there should be a publish entry in the segment file and an ack in the journal - rabbit_amqqueue:pid_of(St7#cq.amq) ! timeout, - {SyncTime, ok} = timer:tc(fun() -> meck:wait(rabbit_queue_index, sync, '_', 1000) end), - ct:pal("wait for sync took ~p ms", [SyncTime div 1000]), - - meck:reset(rabbit_queue_index), - - %% Another clean vhost restart - %% The queue index should not crash when finding a pub in a - %% segment, an ack in the journal, but no_del - %% (It used to crash in `segment_plus_journal1/2' with a function_clause) - catch cmd(cmd_restart_vhost_clean, St7, []), - - {ReadTime, ok} = timer:tc(fun() -> meck:wait(rabbit_queue_index, read, '_', 1000) end), - ct:pal("wait for queue read took ~p ms", [ReadTime div 1000]), - - validate_and_teaddown(St7). - -%% After upgrade to 3.10 there is a published message in a segment file. -%% Consuming and acknowledging the message inserts an ack entry in the journal file. -%% The recovery after a subsequent unclean shutdown (of the queue/vhost/node) should work fine. -reg_v1_no_del_idx_unclean(Config) -> - try - true = rabbit_ct_broker_helpers:rpc( - Config, 0, ?MODULE, do_reg_v1_no_del_idx_unclean, [Config]) - catch exit:{exception, Reason} -> - exit(Reason) - end. - -do_reg_v1_no_del_idx_unclean(Config) -> - St0 = #cq{name=prop_classic_queue_v1, version=1, - config=minimal_config(Config)}, - - Res1 = cmd_setup_queue(St0), - St3 = St0#cq{amq=Res1}, - - {St4, Ch} = cmd(cmd_channel_open, St3, []), - - %% Simulate pre-3.10.0 behaviour by making deliver a noop - ok = meck:new(rabbit_queue_index, [passthrough]), - ok = meck:expect(rabbit_queue_index, deliver, fun(_, State) -> State end), - - ok = meck:new(rabbit_variable_queue, [passthrough]), - - {St5, _Res5} = cmd(cmd_channel_publish, St4, [Ch, 4, _Persistent = 2, _NotMandatory = false, _NoExpiration = undefined]), - - %% Wait for the queue process to get hibernated - %% handle_pre_hibernate syncs and flushes the journal - %% At this point there should be a publish entry in the segment file and an empty journal - {Time, ok} = timer:tc(fun() -> meck:wait(rabbit_variable_queue, handle_pre_hibernate, '_', 10000) end), - ct:pal("wait for hibernate took ~p ms", [Time div 1000]), - ok = meck:unload(rabbit_variable_queue), - - %% Simulate RabbitMQ version upgrade by a clean vhost restart - %% (also reset delivery to normal operation) - ok = meck:delete(rabbit_queue_index, deliver, 2), - {St10, _} = cmd(cmd_restart_vhost_clean, St5, []), - - %% Consume the message and acknowledge it - {St6, _Tag} = cmd(cmd_channel_consume, St10, [Ch]), - receive SomeMsg -> self() ! SomeMsg - after 5000 -> ct:fail(no_message_consumed) - end, - meck:reset(rabbit_queue_index), - {St7, _Msg = #amqp_msg{}} = cmd(cmd_channel_receive_and_ack, St6, [Ch]), - - %% (need to ensure that the queue processed the ack before triggering the sync) - {AckTime, ok} = timer:tc(fun() -> meck:wait(rabbit_queue_index, ack, '_', 1000) end), - ct:pal("wait for ack took ~p ms", [AckTime div 1000]), - - %% enforce syncing journal to disk - %% At this point there should be a publish entry in the segment file and an ack in the journal - rabbit_amqqueue:pid_of(St7#cq.amq) ! timeout, - {SyncTime, ok} = timer:tc(fun() -> meck:wait(rabbit_queue_index, sync, '_', 1000) end), - ct:pal("wait for sync took ~p ms", [SyncTime div 1000]), - - meck:reset(rabbit_queue_index), - - %% Recovery after unclean queue shutdown - %% The queue index should not crash when finding a pub in a - %% segment, an ack in the journal, but no_del - %% (It used to crash in `journal_minus_segment1/2' with a function_clause) - {St20, _} = cmd(cmd_restart_queue_dirty, St7, []), - - {RecoverTime, ok} = timer:tc(fun() -> meck:wait(rabbit_queue_index, recover, '_', 1000) end), - ct:pal("wait for queue recover took ~p ms", [RecoverTime div 1000]), - - validate_and_teaddown(St20). - cmd(CmdName, StIn, ExtraArgs) -> Res0 = apply(?MODULE, CmdName, [StIn | ExtraArgs]), true = postcondition(StIn, {call, undefined, CmdName, [StIn | ExtraArgs]}, Res0), diff --git a/deps/rabbit/test/priority_queue_SUITE.erl b/deps/rabbit/test/priority_queue_SUITE.erl index eb7df79378bd..3c34f5adee79 100644 --- a/deps/rabbit/test/priority_queue_SUITE.erl +++ b/deps/rabbit/test/priority_queue_SUITE.erl @@ -392,6 +392,8 @@ info_head_message_timestamp1(_Config) -> PQ:delete_and_terminate(a_whim, BQS6), passed. +%% Because queue version is now ignored, this test is expected +%% to always get a queue version 2. info_backing_queue_version(Config) -> {Conn, Ch} = rabbit_ct_client_helpers:open_connection_and_channel(Config, 0), Q1 = <<"info-priority-queue-v1">>, @@ -402,7 +404,7 @@ info_backing_queue_version(Config) -> {<<"x-queue-version">>, byte, 2}]), try {ok, [{backing_queue_status, BQS1}]} = info(Config, Q1, [backing_queue_status]), - 1 = proplists:get_value(version, BQS1), + 2 = proplists:get_value(version, BQS1), {ok, [{backing_queue_status, BQS2}]} = info(Config, Q2, [backing_queue_status]), 2 = proplists:get_value(version, BQS2) after diff --git a/deps/rabbit/test/unicode_SUITE.erl b/deps/rabbit/test/unicode_SUITE.erl index 65088e613961..4f28d1362c24 100644 --- a/deps/rabbit/test/unicode_SUITE.erl +++ b/deps/rabbit/test/unicode_SUITE.erl @@ -17,7 +17,6 @@ all() -> groups() -> [ {queues, [], [ - classic_queue_v1, classic_queue_v2, quorum_queue, stream @@ -57,14 +56,7 @@ init_per_testcase(Testcase, Config) -> end_per_testcase(Testcase, Config) -> rabbit_ct_helpers:testcase_finished(Config, Testcase). -classic_queue_v1(Config) -> - ok = rabbit_ct_broker_helpers:rpc( - Config, 0, application, set_env, [rabbit, classic_queue_default_version, 1]), - ok = queue(Config, ?FUNCTION_NAME, []). - classic_queue_v2(Config) -> - ok = rabbit_ct_broker_helpers:rpc( - Config, 0, application, set_env, [rabbit, classic_queue_default_version, 2]), ok = queue(Config, ?FUNCTION_NAME, []). quorum_queue(Config) -> diff --git a/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl b/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl index d8738b1de580..07c67b857d9c 100644 --- a/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl +++ b/deps/rabbitmq_management/test/rabbit_mgmt_http_SUITE.erl @@ -1135,21 +1135,21 @@ queues_test(Config) -> auto_delete => false, exclusive => false, arguments => #{}, - storage_version => 1}, + storage_version => 2}, #{name => <<"foo">>, vhost => <<"/">>, durable => true, auto_delete => false, exclusive => false, arguments => #{}, - storage_version => 1}], Queues), + storage_version => 2}], Queues), assert_item(#{name => <<"foo">>, vhost => <<"/">>, durable => true, auto_delete => false, exclusive => false, arguments => #{}, - storage_version => 1}, Queue), + storage_version => 2}, Queue), http_delete(Config, "/queues/%2F/foo", {group, '2xx'}), http_delete(Config, "/queues/%2F/baz", {group, '2xx'}), @@ -2339,8 +2339,8 @@ queue_pagination_test(Config) -> ?assertEqual(1, maps:get(page, PageOfTwo)), ?assertEqual(2, maps:get(page_size, PageOfTwo)), ?assertEqual(2, maps:get(page_count, PageOfTwo)), - assert_list([#{name => <<"test0">>, vhost => <<"/">>, storage_version => 1}, - #{name => <<"test2_reg">>, vhost => <<"/">>, storage_version => 1} + assert_list([#{name => <<"test0">>, vhost => <<"/">>, storage_version => 2}, + #{name => <<"test2_reg">>, vhost => <<"/">>, storage_version => 2} ], maps:get(items, PageOfTwo)), SortedByName = http_get(Config, "/queues?sort=name&page=1&page_size=2", ?OK), diff --git a/deps/rabbitmq_mqtt/test/shared_SUITE.erl b/deps/rabbitmq_mqtt/test/shared_SUITE.erl index acfadfc270d6..e89d25dfcd33 100644 --- a/deps/rabbitmq_mqtt/test/shared_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/shared_SUITE.erl @@ -201,11 +201,8 @@ init_per_group(Group, Config0) -> Config1, [{rmq_nodes_count, Nodes}, {rmq_nodename_suffix, Suffix}]), - Config3 = rabbit_ct_helpers:merge_app_env( - Config2, - {rabbit, [{classic_queue_default_version, 2}]}), Config = rabbit_ct_helpers:run_steps( - Config3, + Config2, rabbit_ct_broker_helpers:setup_steps() ++ rabbit_ct_client_helpers:setup_steps()), util:maybe_skip_v5(Config). diff --git a/deps/rabbitmq_mqtt/test/v5_SUITE.erl b/deps/rabbitmq_mqtt/test/v5_SUITE.erl index 4ca22953ed58..309e221aabe2 100644 --- a/deps/rabbitmq_mqtt/test/v5_SUITE.erl +++ b/deps/rabbitmq_mqtt/test/v5_SUITE.erl @@ -169,8 +169,7 @@ init_per_group(Group, Config0) -> {rmq_nodename_suffix, Suffix}]), Config2 = rabbit_ct_helpers:merge_app_env( Config1, - {rabbit, [{classic_queue_default_version, 2}, - {quorum_tick_interval, 200}]}), + {rabbit, [{quorum_tick_interval, 200}]}), Config = rabbit_ct_helpers:run_steps( Config2, rabbit_ct_broker_helpers:setup_steps() ++