Skip to content

Commit

Permalink
Replicate x_jms_topic_table Mnesia table
Browse files Browse the repository at this point in the history
The x_jms_topic_table Mnesia table must be on all nodes
for messages to be published to JMS topic exchanges
and routed to topic subscribers.

The table used to be only in RAM on one node, so it would
be unavailable when the node was down and empty
when it came back up, losing the state for subscribers
still online because connected to other nodes.

References #9005
  • Loading branch information
acogoluegnes committed Apr 25, 2024
1 parent 57f9aec commit df9fec8
Showing 1 changed file with 45 additions and 8 deletions.
53 changes: 45 additions & 8 deletions deps/rabbitmq_jms_topic_exchange/src/rabbit_db_jms_exchange.erl
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,51 @@ setup_schema() ->
}).

setup_schema_in_mnesia() ->
case mnesia:create_table( ?JMS_TOPIC_TABLE
, [ {attributes, record_info(fields, ?JMS_TOPIC_RECORD)}
, {record_name, ?JMS_TOPIC_RECORD}
, {type, set} ]
) of
{atomic, ok} -> ok;
{aborted, {already_exists, ?JMS_TOPIC_TABLE}} -> ok
end,
TableName = ?JMS_TOPIC_TABLE,
rabbit_log:info(
"Creating table ~ts for JMS topic exchange",
[TableName]),
_ = try
rabbit_table:create(
TableName,
[{attributes, record_info(fields, ?JMS_TOPIC_RECORD)},
{record_name, ?JMS_TOPIC_RECORD},
{type, set}]),
%% The JMS topic exchange table must be available on all nodes.
%% If it existed on only one node, messages could not be published
%% to JMS topic exchanges and routed to topic subscribers if the node
%% was unavailable.
%% The call below makes sure this node has a copy of the table.
case rabbit_table:ensure_table_copy(TableName, node(), ram_copies) of
ok ->
%% Next, we try to fix other nodes in the cluster if they are
%% running a version of RabbitMQ which does not replicate the
%% table. All nodes must have a replica for Mnesia operations
%% to work properly. Therefore the code below is to make older
%% compatible with newer nodes.
Replicas = mnesia:table_info(TableName, all_nodes),
Members = rabbit_nodes:list_running(),
MissingOn = Members -- Replicas,
lists:foreach(
fun(Node) ->
%% Errors from adding a replica on those older nodes
%% are ignored however. They should not be fatal. The
%% problem will solve by itself once all nodes are
%% upgraded.
_ = rpc:call(
Node,
rabbit_table, ensure_table_copy,
[TableName, Node, ram_copies])
end, MissingOn),
ok;
Error ->
Error
end
catch throw:Reason ->
rabbit_log:error(
"Failed to create JMS topic exchange table: ~tp",
[Reason])
end,
ok.

%% -------------------------------------------------------------------
Expand Down

0 comments on commit df9fec8

Please sign in to comment.