diff --git a/pip/pip-379.md b/pip/pip-379.md index 5e9829cac96c9..4c4b082958c9c 100644 --- a/pip/pip-379.md +++ b/pip/pip-379.md @@ -60,7 +60,7 @@ The current implementation of Key_Shared subscriptions faces several challenges: The "Preserving order of processing" section of the Key_Shared documentation would be updated to contain this contract: -In Key_Shared subscriptions, messages with the same key are delivered and allowed to be in unacknowledged state to only one consumer at a time. +_In Key_Shared subscriptions, messages with the same key are delivered and allowed to be in unacknowledged state to only one consumer at a time._ When new consumers join or leave, the consumer handling a message key can change when the default AUTO_SPLIT mode is used, but only after all pending messages for a particular key are acknowledged or the original consumer disconnects. @@ -70,7 +70,7 @@ The Key_Shared subscription doesn't prevent using any methods in the consumer AP Wikipedia tells us about [invariants](https://en.wikipedia.org/wiki/Invariant_(mathematics)#Invariants_in_computer_science): "In computer science, an invariant is a logical assertion that is always held to be true during a certain phase of execution of a computer program." -The contract "In Key_Shared subscriptions, messages with the same key are delivered and allowed to be in an unacknowledged state to only one consumer at a time." can be seen as an invariant for Key_Shared subscriptions. It is something that must always be held true for Key_Shared subscriptions. The design and implementation in PIP-379 focuses on ensuring this. +The contract _"In Key_Shared subscriptions, messages with the same key are delivered and allowed to be in an unacknowledged state to only one consumer at a time."_ can be seen as an invariant for Key_Shared subscriptions. It is something that must always be held true for Key_Shared subscriptions. The design and implementation in PIP-379 focuses on ensuring this. ### Future work in needed for supporting key-based ordering with negative acknowledgements @@ -80,7 +80,7 @@ The updated contract explicitly states that it is not possible to retain key-bas The proposed solution introduces a "draining hashes" concept to efficiently manage message ordering in Key_Shared subscriptions: -1. When consumer hash ranges change (e.g., a consumer joins or leaves), affected hashes of pending messages are added to a "draining hashes" set. +**1. When consumer hash ranges change (e.g., a consumer joins or leaves), affected hashes of pending messages are added to a "draining hashes" set.** Pending messages of the consumer are iterated, and if the hash of a pending message belongs to one of the impacted ranges, the hash gets added to the "draining hashes" tracker. @@ -114,7 +114,7 @@ Code example to illustrate the implementation: } ``` -2. Following messages with hashes in the "draining hashes" set are blocked from further delivery until pending messages are processed. +**2. Following messages with hashes in the "draining hashes" set are blocked from further delivery until pending messages are processed.** Code example to illustrate the implementation: @@ -125,7 +125,7 @@ Code example to illustrate the implementation: } ``` -3. A reference counter tracks pending messages for each hash in the "draining hashes" set. +**3. A reference counter tracks pending messages for each hash in the "draining hashes" set.** Code example to illustrate the implementation: @@ -180,7 +180,7 @@ The memory usage of draining hashes tracking will go down to 0 after all hashes The hash range size is reduced to 65535 (2^16-1) from the current 2^31-1 (Integer.MAX_VALUE) in ConsistentHashingStickyKeyConsumerSelector to reduce the worst-case memory consumption. Reducing the hash range size won't significantly impact the accuracy of distributing messages across connected consumers. The proof-of-concept implementation of PIP-379 includes the changes to reduce the hash range size. -4. As messages are acknowledged or consumers disconnect and therefore get removed from pending messages, the reference counter is decremented. +**4. As messages are acknowledged or consumers disconnect and therefore get removed from pending messages, the reference counter is decremented.** Individual acks are handled in Consumer's `removePendingAcks` method: @@ -214,7 +214,7 @@ When a consumer disconnects, hashes of pending acks are removed }); ``` -5. When the reference counter reaches zero, the hash is removed from the set, allowing new message delivery. The dispatcher is notified about this so that the delivery of the blocked messages can occur. Unblocked hashes are batched together to prevent a new notification for each call. This is handled with the `keySharedUnblockingIntervalMs` configuration setting. +**5. When the reference counter reaches zero, the hash is removed from the set, allowing new message delivery. The dispatcher is notified about this so that the delivery of the blocked messages can occur. Unblocked hashes are batched together to prevent a new notification for each call. This is handled with the `keySharedUnblockingIntervalMs` configuration setting.** In the proof-of-concept codebase, this is handled in the DrainingHashesTracker's reduceRefCount method: @@ -252,7 +252,9 @@ The dispatcher is notified via the `unblockingHandler.stickyKeyHashUnblocked(sti } ``` -6. Consumer hash assignments may change multiple times, and a draining hash might be reassigned to the original consumer. The draining hash data structure contains information about the draining consumer. When a message is attempted for delivery, the system can check if the target consumer is the same as the draining consumer. If they match, there's no need to block the hash. The implementation should also remove such hashes from the draining hashes set. This "lazy" approach reduces the need for actively scanning all draining hashes whenever hash assignments change. +**6. Consumer hash assignments may change multiple times, and a draining hash might be reassigned to the original consumer.** + +The draining hash data structure contains information about the draining consumer. When a message is attempted for delivery, the system can check if the target consumer is the same as the draining consumer. If they match, there's no need to block the hash. The implementation should also remove such hashes from the draining hashes set. This "lazy" approach reduces the need for actively scanning all draining hashes whenever hash assignments change. This is handled in the `DrainingHashesTracker` ```java @@ -271,7 +273,9 @@ This is handled in the `DrainingHashesTracker` } ``` -7. When sending out messages, there are potential race conditions that could allow the delivery of a message that should be blocked. This could happen when a consumer is added while reading and sending messages are already in progress. The sending process has been modified to perform a check when adding the message to the pending acknowledgments map. There are also additional locks in the pending acks handling which prevent race conditions. +**7. When sending out messages, there are potential race conditions that could allow the delivery of a message that should be blocked.** + +This could happen when a consumer is added while reading and sending messages are already in progress. In PIP-379, the sending process has been modified to perform a check when adding the message to the pending acknowledgments map. There are also additional locks in the pending acks handling which prevent race conditions. `put` method in `PendingAcksMap` class: @@ -344,9 +348,11 @@ The callback maps to `handleAddingPendingAck` in the dispatcher (`PersistentStic This logic will prevent any inconsistency when consumers get added or removed and hash ranges change while the sending of messages is already in progress. It will ensure that the view on pending acknowledgments is consistent so that the tracking of draining hashes will also be consistent in all cases. In addition, this logic will block hashes of messages that have recently been added to the redelivery queue and therefore, for message ordering reasons, should get delivered before any further message delivery happens. -This approach will meet the updated contract of preserving ordering: "In Key_Shared subscriptions, messages with the same key are delivered and allowed to be in an unacknowledged state to only one consumer at a time." It also minimizes the impact on performance and memory usage. The tracking only comes into play during transition states. When consumers have been connected for a longer duration and all draining hashes have been removed, there won't be a need to check any special rules or maintain any extra state. When the draining hashes are empty, lookups will essentially be no-ops and won't consume CPU or memory resources. +**Summary** +This high-level design approach will meet the updated contract of preserving ordering: _"In Key_Shared subscriptions, messages with the same key are delivered and allowed to be in an unacknowledged state to only one consumer at a time."_ +It also minimizes the impact on performance and memory usage. **The tracking only comes into play during transition states.** When consumers have been connected for a longer duration and all draining hashes have been removed, there won't be a need to check any special rules or maintain any extra state. **When the draining hashes are empty, lookups will essentially be no-ops and won't consume CPU or memory resources.** ## Public-facing Changes