Skip to content

Commit

Permalink
Merge pull request #11091 from rabbitmq/mergify/bp/v3.13.x/pr-11087
Browse files Browse the repository at this point in the history
Replicate x_jms_topic_table Mnesia table (backport #11087)
  • Loading branch information
michaelklishin authored Apr 25, 2024
2 parents d9ad9c6 + afc718b commit 75e8bce
Showing 1 changed file with 45 additions and 8 deletions.
53 changes: 45 additions & 8 deletions deps/rabbitmq_jms_topic_exchange/src/rabbit_db_jms_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,51 @@ setup_schema() ->
}).

setup_schema_in_mnesia() ->
case mnesia:create_table( ?JMS_TOPIC_TABLE
, [ {attributes, record_info(fields, ?JMS_TOPIC_RECORD)}
, {record_name, ?JMS_TOPIC_RECORD}
, {type, set} ]
) of
{atomic, ok} -> ok;
{aborted, {already_exists, ?JMS_TOPIC_TABLE}} -> ok
end,
TableName = ?JMS_TOPIC_TABLE,
rabbit_log:info(
"Creating table ~ts for JMS topic exchange",
[TableName]),
_ = try
rabbit_table:create(
TableName,
[{attributes, record_info(fields, ?JMS_TOPIC_RECORD)},
{record_name, ?JMS_TOPIC_RECORD},
{type, set}]),
%% The JMS topic exchange table must be available on all nodes.
%% If it existed on only one node, messages could not be published
%% to JMS topic exchanges and routed to topic subscribers if the node
%% was unavailable.
%% The call below makes sure this node has a copy of the table.
case rabbit_table:ensure_table_copy(TableName, node(), ram_copies) of
ok ->
%% Next, we try to fix other nodes in the cluster if they are
%% running a version of RabbitMQ which does not replicate the
%% table. All nodes must have a replica for Mnesia operations
%% to work properly. Therefore the code below is to make older
%% compatible with newer nodes.
Replicas = mnesia:table_info(TableName, all_nodes),
Members = rabbit_nodes:list_running(),
MissingOn = Members -- Replicas,
lists:foreach(
fun(Node) ->
%% Errors from adding a replica on those older nodes
%% are ignored however. They should not be fatal. The
%% problem will solve by itself once all nodes are
%% upgraded.
_ = rpc:call(
Node,
rabbit_table, ensure_table_copy,
[TableName, Node, ram_copies])
end, MissingOn),
ok;
Error ->
Error
end
catch throw:Reason ->
rabbit_log:error(
"Failed to create JMS topic exchange table: ~tp",
[Reason])
end,
ok.

%% -------------------------------------------------------------------
Expand Down

0 comments on commit 75e8bce

Please sign in to comment.