diff --git a/deps/rabbit/src/rabbit_db_queue.erl b/deps/rabbit/src/rabbit_db_queue.erl index 53918e7d8f83..ba63cd94eed2 100644 --- a/deps/rabbit/src/rabbit_db_queue.erl +++ b/deps/rabbit/src/rabbit_db_queue.erl @@ -651,21 +651,61 @@ update_durable_in_mnesia(UpdateFun, FilterFun) -> ok. update_durable_in_khepri(UpdateFun, FilterFun) -> - Path = khepri_queues_path() ++ [rabbit_khepri:if_has_data_wildcard()], - rabbit_khepri:transaction( - fun() -> - khepri_tx:foreach(Path, - fun(Path0, #{data := Q}) -> - DoUpdate = amqqueue:is_durable(Q) - andalso FilterFun(Q), - case DoUpdate of - true -> - khepri_tx:put(Path0, UpdateFun(Q)); - false -> - ok - end - end) - end). + PathPattern = khepri_queues_path() ++ + [?KHEPRI_WILDCARD_STAR, + #if_data_matches{ + pattern = amqqueue:pattern_match_on_durable(true)}], + %% The `FilterFun' or `UpdateFun' might attempt to do something + %% incompatible with Khepri transactions (such as dynamic apply, sending + %% a message, etc.), so this function cannot be written as a regular + %% transaction. Instead we can get all queues and track their versions, + %% update them, then apply the updates in a transaction, failing if any + %% queue has changed since reading the queue record. + case rabbit_khepri:adv_get_many(PathPattern) of + {ok, Props} -> + Updates = maps:fold( + fun(Path0, #{data := Q0, payload_version := Vsn}, Acc) + when ?is_amqqueue(Q0) -> + case FilterFun(Q0) of + true -> + Path = khepri_path:combine_with_conditions( + Path0, + [#if_payload_version{version = Vsn}]), + Q = UpdateFun(Q0), + [{Path, Q} | Acc]; + false -> + Acc + end + end, [], Props), + Res = rabbit_khepri:transaction( + fun() -> + for_each_while_ok( + fun({Path, Q}) -> khepri_tx:put(Path, Q) end, + Updates) + end), + case Res of + ok -> + ok; + {error, {khepri, mismatching_node, _}} -> + %% One of the queues changed while attempting to update + %% all queues. Retry the operation. + update_durable_in_khepri(UpdateFun, FilterFun); + {error, _} = Error -> + Error + end; + {error, _} = Error -> + Error + end. + +for_each_while_ok(Fun, [Elem | Rest]) -> + case Fun(Elem) of + ok -> + for_each_while_ok(Fun, Rest); + {error, _} = Error -> + Error + end; +for_each_while_ok(_, []) -> + ok. %% ------------------------------------------------------------------- %% exists(). diff --git a/deps/rabbit/src/rabbit_khepri.erl b/deps/rabbit/src/rabbit_khepri.erl index 21af0f430f03..b3c9235b2452 100644 --- a/deps/rabbit/src/rabbit_khepri.erl +++ b/deps/rabbit/src/rabbit_khepri.erl @@ -120,6 +120,7 @@ get/2, get_many/1, adv_get/1, + adv_get_many/1, match/1, match/2, exists/1, @@ -883,6 +884,9 @@ get_many(PathPattern) -> adv_get(Path) -> khepri_adv:get(?STORE_ID, Path, #{favor => low_latency}). +adv_get_many(PathPattern) -> + khepri_adv:get_many(?STORE_ID, PathPattern, #{favor => low_latency}). + match(Path) -> match(Path, #{}).