Skip to content

Commit

Permalink
Add more implementation details to the document
Browse files Browse the repository at this point in the history
  • Loading branch information
lhotari committed Oct 1, 2024
1 parent 7afbdec commit 09902e6
Showing 1 changed file with 269 additions and 2 deletions.
271 changes: 269 additions & 2 deletions pip/pip-379.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
Apache Pulsar's Key_Shared subscription mode is designed to provide ordered message delivery on a per-key basis while allowing multiple consumers to process messages concurrently. This mode is particularly useful in scenarios where maintaining message order for specific keys is crucial, but overall throughput can be improved by parallelizing message consumption across multiple consumers.

Key concepts:

- **Key_Shared subscription**: A subscription mode that maintains message ordering per key while allowing multiple consumers.
- **Hash ranges**: In AUTO_SPLIT mode, the hash space is divided among active consumers to distribute message processing.
- **Pending messages**: Messages that have been sent to a consumer but not yet acknowledged (also called "pending acks" or "unacknowledged messages").
Expand Down Expand Up @@ -65,6 +66,12 @@ When new consumers join or leave, the consumer handling a message key can change

The Key_Shared subscription doesn't prevent using any methods in the consumer API. For example, the application might call `negativeAcknowledge` or the `redeliverUnacknowledgedMessages` method. When messages are scheduled for delivery due to these methods, they will get redelivered as soon as possible. There's no ordering guarantee in these cases, however the guarantee of delivering a message key to a single consumer at a time will continue to be preserved.

### Computer Science Perspective: Invariants

Wikipedia tells us about [invariants](https://en.wikipedia.org/wiki/Invariant_(mathematics)#Invariants_in_computer_science): "In computer science, an invariant is a logical assertion that is always held to be true during a certain phase of execution of a computer program."

The contract "In Key_Shared subscriptions, messages with the same key are delivered and allowed to be in an unacknowledged state to only one consumer at a time." can be seen as an invariant for Key_Shared subscriptions. It is something that must always be held true for Key_Shared subscriptions. The design and implementation in PIP-379 focuses on ensuring this.

### Future work in needed for supporting key-based ordering with negative acknowledgements

The updated contract explicitly states that it is not possible to retain key-based ordering of messages when negative acknowledgements are used. Changing this is out of scope for PIP-379. A potential future solution for handling this would be to modify the client so that when a message is negatively acknowledged, it would also reject all further messages with the same key until the original message gets redelivered. It's already possible to attempt to implement this in client-side code. However, a proper solution would require support on the broker side to block further delivery of the specific key when there are pending negatively acknowledged messages until all negatively acknowledged messages for that particular key have been acknowledged by the consumer. This solution is out of scope for PIP-379. A future implementation to address these problems could build upon PIP-379 concepts such as "draining hashes" and extend that to cover the negative acknowledgement scenarios.
Expand All @@ -74,13 +81,272 @@ The updated contract explicitly states that it is not possible to retain key-bas
The proposed solution introduces a "draining hashes" concept to efficiently manage message ordering in Key_Shared subscriptions:

1. When consumer hash ranges change (e.g., a consumer joins or leaves), affected hashes of pending messages are added to a "draining hashes" set.

Pending messages of the consumer are iterated, and if the hash of a pending message belongs to one of the impacted ranges, the hash gets added to the "draining hashes" tracker.

Code example to illustrate the implementation:

```java
private synchronized void registerDrainingHashes(Consumer skipConsumer,
Map<Consumer, NavigableSet<Range>> impactedRangesByConsumer) {
for (Map.Entry<Consumer, NavigableSet<Range>> entry : impactedRangesByConsumer.entrySet()) {
Consumer c = entry.getKey();
NavigableSet<Range> ranges = entry.getValue();
if (c != skipConsumer) {
// add all pending acks in the impacted hash ranges to the draining hashes tracker
c.getPendingAcks().forEachAndLock((ledgerId, entryId, batchSize, stickyKeyHashLong) -> {
int stickyKeyHash = (int) stickyKeyHashLong;
for (Range range : ranges) {
if (stickyKeyHash >= range.getStart() && stickyKeyHash <= range.getEnd()) {
drainingHashesTracker.addEntry(c, stickyKeyHash);
break;
}
// stop looking for more since the ranges cannot overlap
if (stickyKeyHash > range.getEnd()) {
break;
}
}
}
});
}
});
}
}
```

2. Following messages with hashes in the "draining hashes" set are blocked from further delivery until pending messages are processed.

Code example to illustrate the implementation:

```java
// If the hash is draining, do not send the message
if (drainingHashesTracker.shouldBlockStickyKeyHash(consumer, stickyKeyHash)) {
return false;
}
```

3. A reference counter tracks pending messages for each hash in the "draining hashes" set.

Code example to illustrate the implementation:

```java
// optimize the memory consumption of the map by using primitive int keys
private final Int2ObjectOpenHashMap<DrainingHashEntry> drainingHashes = new Int2ObjectOpenHashMap<>();

public static class DrainingHashEntry {
private final long consumerId;
private int refCount;
private int blockedCount;

DrainingHashEntry(long consumerId) {
this.consumerId = consumerId;
}

public long getConsumerId() {
return consumerId;
}

void incrementRefCount() {
refCount++;
}

boolean decrementRefCount() {
return --refCount == 0;
}

void incrementBlockedCount() {
blockedCount++;
}

boolean isBlocking() {
return blockedCount > 0;
}
}
```

The memory consumption estimate for tracking a hash is 52 bytes:
key: 16 bytes (object header) + 4 bytes (int) = 20 bytes
entry: 16 bytes (object header) + 8 bytes (long) + 4 bytes (int) + 4 bytes (int) = 32 bytes

Although the estimate is 52 bytes per entry, calculations have been made with 80 bytes per entry to account for possible additional overheads such as memory alignment and the overhead of the Int2ObjectOpenHashMap.

Memory usage estimate for each subscription after there have been consumer changes:

- Worst case (all 64k hashes draining for a subscription): about 5MB
- Practical case (less than 1000 hashes draining): less than 80 kilobytes
- For 10,000 draining hashes: about 800 kB

The memory usage of draining hashes tracking will go down to 0 after all hashes have "drained" and are no longer blocked. This memory usage isn't an overhead that applies at all times.

The hash range size is reduced to 65535 (2^16-1) from the current 2^31-1 (Integer.MAX_VALUE) in ConsistentHashingStickyKeyConsumerSelector to reduce the worst-case memory consumption. Reducing the hash range size won't significantly impact the accuracy of distributing messages across connected consumers. The proof-of-concept implementation of PIP-379 includes the changes to reduce the hash range size.

4. As messages are acknowledged or consumers disconnect and therefore get removed from pending messages, the reference counter is decremented.
5. When the reference counter reaches zero, the hash is removed from the set, allowing new message delivery.

Individual acks are handled in Consumer's `removePendingAcks` method:

```java
private boolean removePendingAcks(Consumer ackOwnedConsumer, Position position) {
PendingAcksMap ownedConsumerPendingAcks = ackOwnedConsumer.getPendingAcks();
if (drainingHashesTracker != null) {
LongPair pendingAckEntry = ownedConsumerPendingAcks.get(position.getLedgerId(), position.getEntryId());
if (pendingAckEntry != null
&& ownedConsumerPendingAcks
.remove(position.getLedgerId(), position.getEntryId(), pendingAckEntry.first,
pendingAckEntry.second)) {
int hash = (int) pendingAckEntry.second;
drainingHashesTracker.reduceRefCount(ackOwnedConsumer, hash);
} else {
// Message was already removed by the other consumer
return false;
}
...
```

When a consumer disconnects, hashes of pending acks are removed

```java
DrainingHashesTracker drainingHashesTracker = consumer.getDrainingHashesTracker();
consumer.getPendingAcks().forEachAndClose((ledgerId, entryId, batchSize, stickyKeyHash) -> {
addMessageToReplay(ledgerId, entryId, stickyKeyHash);
if (drainingHashesTracker != null) {
drainingHashesTracker.reduceRefCount(consumer, (int) stickyKeyHash);
}
});
```

5. When the reference counter reaches zero, the hash is removed from the set, allowing new message delivery. The dispatcher is notified about this so that the delivery of the blocked messages can occur. Unblocked hashes are batched together to prevent a new notification for each call. This is handled with the `keySharedUnblockingIntervalMs` configuration setting.

In the proof-of-concept codebase, this is handled in the DrainingHashesTracker's reduceRefCount method:

```java
public synchronized void reduceRefCount(Consumer consumer, int stickyHash) {
DrainingHashEntry entry = drainingHashes.get(stickyHash);
if (entry == null) {
return;
}
if (entry.decrementRefCount()) {
DrainingHashEntry removed = drainingHashes.remove(stickyHash);
if (removed.isBlocking()) {
unblockingHandler.stickyKeyHashUnblocked(stickyHash);
}
}
}
```

The `isBlocking()` method of `DrainingHashEntry` returns true when delivery was attempted for that hash, indicating a need to unblock it when it's removed.
The dispatcher is notified via the `unblockingHandler.stickyKeyHashUnblocked(stickyHash)` callback. The implementation simply schedules a read, batching all calls together, and then calls `readMoreEntries` in the dispatcher.
```java
private void stickyKeyHashUnblocked(int stickyKeyHash) {
reScheduleReadInMs(keySharedUnblockingIntervalMsSupplier.getAsLong());
}
protected void reScheduleReadInMs(long readAfterMs) {
if (isRescheduleReadInProgress.compareAndSet(false, true)) {
Runnable runnable = () -> {
isRescheduleReadInProgress.set(false);
readMoreEntries();
};
topic.getBrokerService().executor().schedule(runnable, readAfterMs, TimeUnit.MILLISECONDS);
}
}
```
6. Consumer hash assignments may change multiple times, and a draining hash might be reassigned to the original consumer. The draining hash data structure contains information about the draining consumer. When a message is attempted for delivery, the system can check if the target consumer is the same as the draining consumer. If they match, there's no need to block the hash. The implementation should also remove such hashes from the draining hashes set. This "lazy" approach reduces the need for actively scanning all draining hashes whenever hash assignments change.

This approach will meet the updated contract of preserving ordering while minimizing the impact on performance and memory usage. The tracking only comes into play during transition states. When consumers have been connected for a longer duration and all draining hashes have been removed, there won't be a need to check any special rules or maintain any extra state.
This is handled in the `DrainingHashesTracker`
```java
public synchronized boolean shouldBlockStickyKeyHash(Consumer consumer, int stickyKeyHash) {
DrainingHashEntry entry = drainingHashes.get(stickyKeyHash);
if (entry == null) {
return false;
}
// hash has been reassigned to the original consumer, remove the entry
if (entry.getConsumerId() == consumer.consumerId()) {
drainingHashes.remove(stickyKeyHash, entry);
return false;
}
entry.incrementBlockedCount();
return true;
}
```

7. When sending out messages, there are potential race conditions that could allow the delivery of a message that should be blocked. This could happen when a consumer is added while reading and sending messages are already in progress. The sending process has been modified to perform a check when adding the message to the pending acknowledgments map. There are also additional locks in the pending acks handling which prevent race conditions.

`put` method in `PendingAcksMap` class:

```java
public boolean put(long consumerId, long ledgerId, long entryId, int batchSize, int stickyKeyHash) {
try {
lock.lock();
if (closed) {
return false;
}
// prevent adding sticky hash to pending acks if it's already in draining hashes
// to avoid any race conditions that would break consistency
PendingAcksAddHandler pendingAcksAddHandler = pendingAcksAddHandlerSupplier.get();
if (pendingAcksAddHandler != null
&& !pendingAcksAddHandler.handleAdding(consumerId, ledgerId, entryId, stickyKeyHash)) {
return false;
}
pendingAcks.put(ledgerId, entryId, batchSize, stickyKeyHash);
return true;
} finally {
lock.unlock();
}
}
```

This `put` method is called from Consumer's `sendMessages` method:
```java
boolean sendingAllowed =
pendingAcks.put(consumerId, entry.getLedgerId(), entry.getEntryId(), batchSize,
stickyKeyHash);
if (!sendingAllowed) {
// sending isn't allowed when pending acks doesn't accept adding the entry
// this happens when Key_Shared draining hashes contains the stickyKeyHash
// because of race conditions, it might be resolved at the time of sending
totalEntries--;
entries.set(i, null);
entry.release();
if (log.isDebugEnabled()) {
log.debug("[{}-{}] Skipping sending of {}:{} ledger entry with batchSize of {} since adding"
+ " to pending acks failed in broker.service.Consumer for consumerId: {}",
topicName, subscription, entry.getLedgerId(), entry.getEntryId(), batchSize,
consumerId);
}
```
If sending isn't allowed, the entry will be skipped from delivery. The `PendingAcksAddHandler` callback will add the message to redelivery if this is the case.
The callback maps to `handleAddingPendingAck` in the dispatcher (`PersistentStickyKeyDispatcherMultipleConsumers`).

```java
private boolean handleAddingPendingAck(long consumerId, long ledgerId, long entryId, int stickyKeyHash) {
DrainingHashesTracker.DrainingHashEntry drainingHashEntry = drainingHashesTracker.getEntry(stickyKeyHash);
if (drainingHashEntry != null && drainingHashEntry.getConsumerId() != consumerId) {
log.warn("[{}] Another consumer id {} is already draining hash {}. Skipping adding {}:{} to pending acks "
+ "for consumer id {}. Adding the message to replay.",
getName(), drainingHashEntry.getConsumerId(), stickyKeyHash, ledgerId, entryId, consumerId);
addMessageToReplay(ledgerId, entryId, stickyKeyHash);
return false;
}
if (recentReadTypeInSending == ReadType.Normal && redeliveryMessages.containsStickyKeyHash(stickyKeyHash)) {
log.warn("[{}] Sticky hash {} is already in the replay queue. "
+ "Skipping adding {}:{} to pending acks. Adding the message to replay.",
getName(), stickyKeyHash, ledgerId, entryId);
addMessageToReplay(ledgerId, entryId, stickyKeyHash);
return false;
}
return true;
}
```

This logic will prevent any inconsistency when consumers get added or removed and hash ranges change while the sending of messages is already in progress. It will ensure that the view on pending acknowledgments is consistent so that the tracking of draining hashes will also be consistent in all cases. In addition, this logic will block hashes of messages that have recently been added to the redelivery queue and therefore, for message ordering reasons, should get delivered before any further message delivery happens.

This approach will meet the updated contract of preserving ordering: "In Key_Shared subscriptions, messages with the same key are delivered and allowed to be in an unacknowledged state to only one consumer at a time." It also minimizes the impact on performance and memory usage. The tracking only comes into play during transition states. When consumers have been connected for a longer duration and all draining hashes have been removed, there won't be a need to check any special rules or maintain any extra state. When the draining hashes are empty, lookups will essentially be no-ops and won't consume CPU or memory resources.



## Public-facing Changes

Expand Down Expand Up @@ -109,6 +375,7 @@ The "draining hashes" feature doesn't introduce backward or forward compatibilit
- Mailing List discussion thread: https://lists.apache.org/thread/l5zjq0fb2dscys3rsn6kfl7505tbndlx
- Mailing List voting thread: [To be updated]
- PIP-379 implementation PR: https://github.com/apache/pulsar/pull/23352
- [PIP-282: Change definition of the recently joined consumers position](https://github.com/apache/pulsar/blob/master/pip/pip-282.md)
- [Pulsar issue #23307: Message ordering isn't retained in Key_Shared AUTO_SPLIT mode in a rolling restart type of test scenario](https://github.com/apache/pulsar/issues/23307)
Expand Down

0 comments on commit 09902e6

Please sign in to comment.