Skip to content

Commit

Permalink
Polish
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari committed Oct 1, 2024
1 parent 09902e6 commit 72ded29
Showing 1 changed file with 16 additions and 10 deletions.
26 changes: 16 additions & 10 deletions pip/pip-379.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ The current implementation of Key_Shared subscriptions faces several challenges:

The "Preserving order of processing" section of the Key_Shared documentation would be updated to contain this contract:

In Key_Shared subscriptions, messages with the same key are delivered and allowed to be in unacknowledged state to only one consumer at a time.
_In Key_Shared subscriptions, messages with the same key are delivered and allowed to be in unacknowledged state to only one consumer at a time._

When new consumers join or leave, the consumer handling a message key can change when the default AUTO_SPLIT mode is used, but only after all pending messages for a particular key are acknowledged or the original consumer disconnects.

Expand All @@ -70,7 +70,7 @@ The Key_Shared subscription doesn't prevent using any methods in the consumer AP

Wikipedia tells us about [invariants](https://en.wikipedia.org/wiki/Invariant_(mathematics)#Invariants_in_computer_science): "In computer science, an invariant is a logical assertion that is always held to be true during a certain phase of execution of a computer program."

The contract "In Key_Shared subscriptions, messages with the same key are delivered and allowed to be in an unacknowledged state to only one consumer at a time." can be seen as an invariant for Key_Shared subscriptions. It is something that must always be held true for Key_Shared subscriptions. The design and implementation in PIP-379 focuses on ensuring this.
The contract _"In Key_Shared subscriptions, messages with the same key are delivered and allowed to be in an unacknowledged state to only one consumer at a time."_ can be seen as an invariant for Key_Shared subscriptions. It is something that must always be held true for Key_Shared subscriptions. The design and implementation in PIP-379 focuses on ensuring this.

### Future work in needed for supporting key-based ordering with negative acknowledgements

Expand All @@ -80,7 +80,7 @@ The updated contract explicitly states that it is not possible to retain key-bas

The proposed solution introduces a "draining hashes" concept to efficiently manage message ordering in Key_Shared subscriptions:

1. When consumer hash ranges change (e.g., a consumer joins or leaves), affected hashes of pending messages are added to a "draining hashes" set.
**1. When consumer hash ranges change (e.g., a consumer joins or leaves), affected hashes of pending messages are added to a "draining hashes" set.**

Pending messages of the consumer are iterated, and if the hash of a pending message belongs to one of the impacted ranges, the hash gets added to the "draining hashes" tracker.

Expand Down Expand Up @@ -114,7 +114,7 @@ Code example to illustrate the implementation:
}
```

2. Following messages with hashes in the "draining hashes" set are blocked from further delivery until pending messages are processed.
**2. Following messages with hashes in the "draining hashes" set are blocked from further delivery until pending messages are processed.**

Code example to illustrate the implementation:

Expand All @@ -125,7 +125,7 @@ Code example to illustrate the implementation:
}
```

3. A reference counter tracks pending messages for each hash in the "draining hashes" set.
**3. A reference counter tracks pending messages for each hash in the "draining hashes" set.**

Code example to illustrate the implementation:

Expand Down Expand Up @@ -180,7 +180,7 @@ The memory usage of draining hashes tracking will go down to 0 after all hashes

The hash range size is reduced to 65535 (2^16-1) from the current 2^31-1 (Integer.MAX_VALUE) in ConsistentHashingStickyKeyConsumerSelector to reduce the worst-case memory consumption. Reducing the hash range size won't significantly impact the accuracy of distributing messages across connected consumers. The proof-of-concept implementation of PIP-379 includes the changes to reduce the hash range size.

4. As messages are acknowledged or consumers disconnect and therefore get removed from pending messages, the reference counter is decremented.
**4. As messages are acknowledged or consumers disconnect and therefore get removed from pending messages, the reference counter is decremented.**

Individual acks are handled in Consumer's `removePendingAcks` method:

Expand Down Expand Up @@ -214,7 +214,7 @@ When a consumer disconnects, hashes of pending acks are removed
});
```

5. When the reference counter reaches zero, the hash is removed from the set, allowing new message delivery. The dispatcher is notified about this so that the delivery of the blocked messages can occur. Unblocked hashes are batched together to prevent a new notification for each call. This is handled with the `keySharedUnblockingIntervalMs` configuration setting.
**5. When the reference counter reaches zero, the hash is removed from the set, allowing new message delivery. The dispatcher is notified about this so that the delivery of the blocked messages can occur. Unblocked hashes are batched together to prevent a new notification for each call. This is handled with the `keySharedUnblockingIntervalMs` configuration setting.**

In the proof-of-concept codebase, this is handled in the DrainingHashesTracker's reduceRefCount method:

Expand Down Expand Up @@ -252,7 +252,9 @@ The dispatcher is notified via the `unblockingHandler.stickyKeyHashUnblocked(sti
}
```
6. Consumer hash assignments may change multiple times, and a draining hash might be reassigned to the original consumer. The draining hash data structure contains information about the draining consumer. When a message is attempted for delivery, the system can check if the target consumer is the same as the draining consumer. If they match, there's no need to block the hash. The implementation should also remove such hashes from the draining hashes set. This "lazy" approach reduces the need for actively scanning all draining hashes whenever hash assignments change.
**6. Consumer hash assignments may change multiple times, and a draining hash might be reassigned to the original consumer.**
The draining hash data structure contains information about the draining consumer. When a message is attempted for delivery, the system can check if the target consumer is the same as the draining consumer. If they match, there's no need to block the hash. The implementation should also remove such hashes from the draining hashes set. This "lazy" approach reduces the need for actively scanning all draining hashes whenever hash assignments change.

This is handled in the `DrainingHashesTracker`
```java
Expand All @@ -271,7 +273,9 @@ This is handled in the `DrainingHashesTracker`
}
```

7. When sending out messages, there are potential race conditions that could allow the delivery of a message that should be blocked. This could happen when a consumer is added while reading and sending messages are already in progress. The sending process has been modified to perform a check when adding the message to the pending acknowledgments map. There are also additional locks in the pending acks handling which prevent race conditions.
**7. When sending out messages, there are potential race conditions that could allow the delivery of a message that should be blocked.**

This could happen when a consumer is added while reading and sending messages are already in progress. In PIP-379, the sending process has been modified to perform a check when adding the message to the pending acknowledgments map. There are also additional locks in the pending acks handling which prevent race conditions.

`put` method in `PendingAcksMap` class:

Expand Down Expand Up @@ -344,9 +348,11 @@ The callback maps to `handleAddingPendingAck` in the dispatcher (`PersistentStic

This logic will prevent any inconsistency when consumers get added or removed and hash ranges change while the sending of messages is already in progress. It will ensure that the view on pending acknowledgments is consistent so that the tracking of draining hashes will also be consistent in all cases. In addition, this logic will block hashes of messages that have recently been added to the redelivery queue and therefore, for message ordering reasons, should get delivered before any further message delivery happens.

This approach will meet the updated contract of preserving ordering: "In Key_Shared subscriptions, messages with the same key are delivered and allowed to be in an unacknowledged state to only one consumer at a time." It also minimizes the impact on performance and memory usage. The tracking only comes into play during transition states. When consumers have been connected for a longer duration and all draining hashes have been removed, there won't be a need to check any special rules or maintain any extra state. When the draining hashes are empty, lookups will essentially be no-ops and won't consume CPU or memory resources.
**Summary**

This high-level design approach will meet the updated contract of preserving ordering: _"In Key_Shared subscriptions, messages with the same key are delivered and allowed to be in an unacknowledged state to only one consumer at a time."_

It also minimizes the impact on performance and memory usage. **The tracking only comes into play during transition states.** When consumers have been connected for a longer duration and all draining hashes have been removed, there won't be a need to check any special rules or maintain any extra state. **When the draining hashes are empty, lookups will essentially be no-ops and won't consume CPU or memory resources.**

## Public-facing Changes

Expand Down

0 comments on commit 72ded29

Please sign in to comment.