-
Notifications
You must be signed in to change notification settings - Fork 3.9k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
4.x: change AMQP on disk message format & speed up the AMQP parser #10964
Merged
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
9a4ea2a
to
0464769
Compare
4dc0ef9
to
85e289f
Compare
557e5d3
to
73d697a
Compare
ansd
added a commit
that referenced
this pull request
Apr 22, 2024
This is a follow up to #11012 ## What? For incoming MQTT messages, always set the `durable` message container annotation. ## Why? Even though defaulting to `durable=true` when no durable annotation is set, as prior to this commit, is good enough, explicitly setting the durable annotation makes the code a bit more future proof and maintainable going forward in 4.0 where we will rely more on the durable annotation because AMQP 1.0 message headers will be omitted in classic and quorum queues (see #10964) For MQTT messages, it's important to know whether the message was published with QoS 0 or QoS 1 because it affects the QoS the MQTT message that will delivered to the MQTT subscriber. The performance impact of always setting the durable annotation is negligible.
ansd
added a commit
that referenced
this pull request
Apr 22, 2024
This is a follow up to #11012 ## What? For incoming MQTT messages, always set the `durable` message container annotation. ## Why? Even though defaulting to `durable=true` when no durable annotation is set, as prior to this commit, is good enough, explicitly setting the durable annotation makes the code a bit more future proof and maintainable going forward in 4.0 where we will rely more on the durable annotation because AMQP 1.0 message headers will be omitted in classic and quorum queues (see #10964) For MQTT messages, it's important to know whether the message was published with QoS 0 or QoS 1 because it affects the QoS for the MQTT message that will delivered to the MQTT subscriber. The performance impact of always setting the durable annotation is negligible.
ansd
added a commit
that referenced
this pull request
Apr 22, 2024
This is a follow up to #11012 ## What? For incoming MQTT messages, always set the `durable` message container annotation. ## Why? Even though defaulting to `durable=true` when no durable annotation is set, as prior to this commit, is good enough, explicitly setting the durable annotation makes the code a bit more future proof and maintainable going forward in 4.0 where we will rely more on the durable annotation because AMQP 1.0 message headers will be omitted in classic and quorum queues (see #10964) For MQTT messages, it's important to know whether the message was published with QoS 0 or QoS 1 because it affects the QoS for the MQTT message that will delivered to the MQTT subscriber. The performance impact of always setting the durable annotation is negligible.
and preserve original delivery_mode field i.e. leave it undefined if it was sent as undefined
Prior to this commit test case ``` bazel test //deps/rabbit:amqp_client_SUITE-mixed -t- \ --test_sharding_strategy=disabled --test_env \ FOCUS="-group [cluster_size_3] -case quorum_queue_on_old_node" ``` was failing because `mc_amqp:size(#v1{})` was called on the old node which doesn't understand the new AMQP on disk message format. Even though the old 3.13 node never stored mc_amqp messages in classic or quorum queues, 1. it will either need to understand the new mc_amqp message format, or 2. we should prevent the new format being sent to 3.13. nodes. In this commit we decide for the 2nd solution. In `mc:prepare(store, Msg)`, we convert the new mc_amqp format to mc_amqpl which is guaranteed to be understood by the old node. Note that `mc:prepare(store, Msg)` is not only stored before actual storage, but already before the message is sent to the queue process (which might be hosted by the old node). The 2nd solution is easier to reason about over the 1st solution because: a) We don't have to backport code meant to be for 4.0 to 3.13, and b) 3.13 is guaranteed to never store mc_amqp messages in classic or quorum queues, even in mixed version clusters. The disadvantage of the 2nd solution is that messages are converted from mc_amqp to mc_amqpl and back to mc_amqp if there is an AMQP sender and AMQP receiver. However, this only happens while the new feature flag is disabled during the rolling upgrade. In a certain sense, this is a hybrid to how the AMQP 1.0 plugin worked in 3.13: Even though we don't proxy via AMQP 0.9.1 anymore, we still convert to AMQP 0.9.1 (mc_amqpl) messages when feature flag message_containers_store_amqp_v1 is disabled.
This commit fixes test ``` bazel test //deps/rabbitmq_mqtt:shared_SUITE-mixed -t- \ --test_sharding_strategy=disabled --test_env \ FOCUS="-group [mqtt,v3,cluster_size_3] -case pubsub" ``` Fix some mixed version tests Assume the AMQP body, especially amqp-value section won't be parsed. Hence, omit smart conversions from AMQP to MQTT involving the Payload-Format-Indicator bit. Fix test Fix ``` bazel test //deps/amqp10_client:system_SUITE-mixed -t- --test_sharding_strategy=disabled --test_env FOCUS="-group [rabbitmq] ```
Ensure footer gets deliverd to AMQP client as received from AMQP client when feature flag message_containers_store_amqp_v1 is disabled. Fixes test ``` bazel test //deps/rabbit:amqp_system_SUITE-mixed -t- --test_sharding_strategy=disabled --test_env FOCUS="-group [dotnet] -case footer" ```
AMQP 3.2.1 defines durable=false to be the default. However, the same section also mentions: > If the header section is omitted the receiver MUST assume the appropriate > default values (or the meaning implied by no value being set) for the > fields within the header unless other target or node specific defaults > have otherwise been set. We want RabbitMQ to be secure by default, hence in RabbitMQ we set durable=true to be the default.
as they are only meant to be used from sending to receiving peer
Fix crashes when message is originally sent via AMQP and stored within a classic or quorum queue and subsequently dead lettered where the dead letter exchange needs access to message annotations or properties or application-properties.
This commit enables client apps to automatically perform end-to-end checksumming over the bare message (i.e. body + application defined headers). This commit allows an app to configure the AMQP client: * for a sending link to automatically compute CRC-32 or Adler-32 checksums over each bare message including the computed checksum as a footer annotation, and * for a receiving link to automatically lookup the expected CRC-32 or Adler-32 checksum in the footer annotation and, if present, check the received checksum against the actually computed checksum. The commit comes with the following advantages: 1. Transparent end-to-end checksumming. Although checksumming is performed by TCP and RabbitMQ queues using the disk, end-to-end checksumming is a level higher up and can therefore detect bit flips within RabbitMQ nodes or load balancers and other bit flips that went unnoticed. 2. Not only is the body checksummed, but also the properties and application-properties sections. This is an advantage over AMQP 0.9.1 because the AMQP protocol disallows modification of the bare message. 3. This commit is currently used for testing the RabbitMQ AMQP implementation, but it shows the feasiblity of how apps could also get integrity guarantees of the whole bare message using HMACs or signatures.
Prior to this commit the entire amqp-value or amqp-sequence sections were parsed when converting a message from mc_amqp. Parsing the entire amqp-value or amqp-sequence section can generate a huge amount of garbage depending on how large these sections are. Given that other protocol cannot make use of amqp-value and amqp-sequence sections anyway, leave them AMQP encoded when converting from mc_amqp. In fact prior to this commit, the entire body section was parsed generating huge amounts of garbage just to subsequently encode it again in mc_amqpl or mc_mqtt. The new conversion interface from mc_amqp to other mc_* modules will either output amqp-data sections or the encoded amqp-value / amqp-sequence sections.
Similar to how we convert from mc_amqp to mc_amqpl before sending to a classic queue or quorum queue process if feature flag message_containers_store_amqp_v1 is disabled, we also need to do the same conversion before sending to an MQTT QoS 0 queue on the old node.
This is similar to #11057 What? For incoming AMQP messages, always set the durable message container annotation. Why? Even though defaulting to durable=true when no durable annotation is set, as prior to this commit, is good enough, explicitly setting the durable annotation makes the code a bit more future proof and maintainable going forward in 4.0 where we will rely more on the durable annotation because AMQP 1.0 message headers will be omitted in classic and quorum queues. The performance impact of always setting the durable annotation is negligible.
kjnilsson
approved these changes
May 3, 2024
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good to me - a couple of optional comments only.
I can't say I have evaluated every single line of code fully but the approach is good.
Still not 100% sure that concatenating multiple data sections is the right way but it should be extremely rare I hope.
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
This PR comes with the following benefits
1. Immutable bare message
According to the AMQP spec:
Prior to this commit, this requirement was violated by RabbitMQ.
RabbitMQ used to decode the bare message from the sending client and encode the bare message before sending to the receiving client. RabbitMQ violated the immutability of the bare message for example:
This PR ensures that the bare message is never modified if the message is sent and consumed by AMQP clients.
2. End-to-end checksumming
Thanks to 1. Immutable bare message, this PR adds a new feature to the AMQP Erlang client: end-to-end checksumming over the bare message (i.e. body + application defined headers).
This PR allows an app to configure the AMQP client:
checksums over each bare message including the computed checksum
as a footer annotation, and
or Adler-32 checksum in the footer annotation and, if present, check
the received checksum against the actually computed checksum.
The feature comes with the following advantages:
performed by TCP and RabbitMQ queues using the disk, end-to-end
checksumming is a level higher up and can therefore detect bit flips
within RabbitMQ nodes or load balancers and other bit flips that
went unnoticed.
application-properties sections. This is an advantage over AMQP 0.9.1
because the AMQP protocol disallows modification of the bare message.
implementation, but it shows the feasibility of how apps could also
get integrity guarantees of the whole bare message using HMACs or
signatures.
3. Avoid parsing the AMQP body
Do not parse entire AMQP body.
Prior to this PR the entire amqp-value or amqp-sequence sections were parsed when receiving a message from a client.
Parsing the entire amqp-value or amqp-sequence sections can generate a huge amount of garbage depending on how large these sections are.
Not only will this PR not parse the amqp-value or amqp-sequence sections when receiving a message from a client, but also it won't parse the sections when converting from mc_amqp to another protocol, such as mc_amqpl or mc_mqtt. Given that other protocols cannot make use of amqp-value and amqp-sequence sections anyway, leave them AMQP encoded when converting from mc_amqp.
In fact prior to this commit, the entire body section was parsed generating huge amounts of garbage just to subsequently encode it again in mc_amqpl or mc_mqtt.
The new conversion interface from mc_amqp to other mc_* modules will either output amqp-data sections or the encoded amqp-value / amqp-sequence sections.
4. Faster AMQP parser
The AMQP parser got optimised.
When compiling file
amqp10_binary_parser.erl
withERL_COMPILER_OPTIONS=bin_opt_info
, all code now outputsOPTIMIZED: match context reused
instead ofBINARY CREATED
orNOT OPTIMIZED
. The only exception are arrays since arrays aren't used in the hot path.Benchmarks
Results with PR:
Results prior to PR:
Results with PR:
Results prior to PR:
Using persistent messages and classic queues, the end to end throughput increase is 23% with this PR.
For the last benchmark, the CPU flame graphs show that this PR spends only 2.6% instead of 5.5% in function
rabbit_amqp_reader:parse_frame_body/2
and that this PR spends 18.9% instead of 25.6% ingarbage_collect
.Results with PR:
Results prior to PR:
Using quorum queues, the end to end throughput increase is 24% with this PR.