Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[broker] Add config to allow deliverAt time to be strictly honored #16068

Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 10 additions & 0 deletions conf/broker.conf
Original file line number Diff line number Diff line change
Expand Up @@ -514,9 +514,19 @@ delayedDeliveryEnabled=true

# Control the tick time for when retrying on delayed delivery,
# affecting the accuracy of the delivery time compared to the scheduled time.
# Note that this time is used to configure the HashedWheelTimer's tick time for the
# InMemoryDelayedDeliveryTrackerFactory (the default DelayedDeliverTrackerFactory).
# Default is 1 second.
delayedDeliveryTickTimeMillis=1000

# When using the InMemoryDelayedDeliveryTrackerFactory (the default DelayedDeliverTrackerFactory), whether
# the deliverAt time is strictly followed. When false (default), messages may be sent to consumers before the deliverAt
# time by as much as the tickTimeMillis. This can reduce the overhead on the broker of maintaining the delayed index
# for a potentially very short time period. When true, messages will not be sent to consumer until the deliverAt time
# has passed, and they may be as late as the deliverAt time plus the tickTimeMillis for the topic plus the
# delayedDeliveryTickTimeMillis.
isDelayedDeliveryDeliverAtTimeStrict=false

# Whether to enable acknowledge of batch local index.
acknowledgmentAtBatchIndexLevelEnabled=false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -332,9 +332,20 @@ public class ServiceConfiguration implements PulsarConfiguration {
+ ".InMemoryDelayedDeliveryTrackerFactory";

@FieldContext(category = CATEGORY_SERVER, doc = "Control the tick time for when retrying on delayed delivery, "
+ " affecting the accuracy of the delivery time compared to the scheduled time. Default is 1 second.")
+ "affecting the accuracy of the delivery time compared to the scheduled time. Default is 1 second. "
+ "Note that this time is used to configure the HashedWheelTimer's tick time for the "
+ "InMemoryDelayedDeliveryTrackerFactory.")
private long delayedDeliveryTickTimeMillis = 1000;

@FieldContext(category = CATEGORY_SERVER, doc = "When using the InMemoryDelayedDeliveryTrackerFactory (the default "
+ "DelayedDeliverTrackerFactory), whether the deliverAt time is strictly followed. When false (default), "
+ "messages may be sent to consumers before the deliverAt time by as much as the tickTimeMillis. This can "
+ "reduce the overhead on the broker of maintaining the delayed index for a potentially very short time "
+ "period. When true, messages will not be sent to consumer until the deliverAt time has passed, and they "
+ "may be as late as the deliverAt time plus the tickTimeMillis for the topic plus the "
+ "delayedDeliveryTickTimeMillis.")
private boolean isDelayedDeliveryDeliverAtTimeStrict = false;

@FieldContext(category = CATEGORY_SERVER, doc = "Whether to enable the acknowledge of batch local index")
private boolean acknowledgmentAtBatchIndexLevelEnabled = false;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,39 +46,55 @@ public class InMemoryDelayedDeliveryTracker implements DelayedDeliveryTracker, T
// Timestamp at which the timeout is currently set
private long currentTimeoutTarget;

// Last time the TimerTask was triggered for this class
private long lastTickRun;

private long tickTimeMillis;

private final Clock clock;

InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis) {
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC());
private final boolean isDelayedDeliveryDeliverAtTimeStrict;

InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer, long tickTimeMillis,
boolean isDelayedDeliveryDeliverAtTimeStrict) {
this(dispatcher, timer, tickTimeMillis, Clock.systemUTC(), isDelayedDeliveryDeliverAtTimeStrict);
}

InMemoryDelayedDeliveryTracker(PersistentDispatcherMultipleConsumers dispatcher, Timer timer,
long tickTimeMillis, Clock clock) {
long tickTimeMillis, Clock clock, boolean isDelayedDeliveryDeliverAtTimeStrict) {
this.dispatcher = dispatcher;
this.timer = timer;
this.tickTimeMillis = tickTimeMillis;
this.clock = clock;
this.isDelayedDeliveryDeliverAtTimeStrict = isDelayedDeliveryDeliverAtTimeStrict;
}

/**
* When {@link #isDelayedDeliveryDeliverAtTimeStrict} is false, we allow for early delivery by as much as the
* {@link #tickTimeMillis} because it is a slight optimization to let messages skip going back into the delay
* tracker for a brief amount of time when we're already trying to dispatch to the consumer.
*
* When {@link #isDelayedDeliveryDeliverAtTimeStrict} is true, we use the current time to determine when messages
* can be delivered. As a consequence, there are two delays that will affect delivery. The first is the
* {@link #tickTimeMillis} and the second is the {@link Timer}'s granularity.
*
* @return the cutoff time to determine whether a message is ready to deliver to the consumer
*/
private long getCutoffTime() {
return isDelayedDeliveryDeliverAtTimeStrict ? clock.millis() : clock.millis() + tickTimeMillis;
}

@Override
public boolean addMessage(long ledgerId, long entryId, long deliveryAt) {
long now = clock.millis();
public boolean addMessage(long ledgerId, long entryId, long deliverAt) {
if (log.isDebugEnabled()) {
log.debug("[{}] Add message {}:{} -- Delivery in {} ms ", dispatcher.getName(), ledgerId, entryId,
deliveryAt - now);
deliverAt - clock.millis());
}
if (deliveryAt < (now + tickTimeMillis)) {
// It's already about time to deliver this message. We add the buffer of
// `tickTimeMillis` because messages can be extracted from the tracker
// slightly before the expiration time. We don't want the messages to
// go back into the delay tracker (for a brief amount of time) when we're
// trying to dispatch to the consumer.
if (deliverAt <= getCutoffTime()) {
return false;
}

priorityQueue.add(deliveryAt, ledgerId, entryId);
priorityQueue.add(deliverAt, ledgerId, entryId);
updateTimer();
return true;
}
Expand All @@ -88,11 +104,8 @@ public boolean addMessage(long ledgerId, long entryId, long deliveryAt) {
*/
@Override
public boolean hasMessageAvailable() {
// Avoid the TimerTask run before reach the timeout.
long cutOffTime = clock.millis() + tickTimeMillis;
boolean hasMessageAvailable = !priorityQueue.isEmpty() && priorityQueue.peekN1() <= cutOffTime;
boolean hasMessageAvailable = !priorityQueue.isEmpty() && priorityQueue.peekN1() <= getCutoffTime();
if (!hasMessageAvailable) {
// prevent the first delay message later than cutoffTime
updateTimer();
}
return hasMessageAvailable;
Expand All @@ -105,11 +118,7 @@ public boolean hasMessageAvailable() {
public Set<PositionImpl> getScheduledMessages(int maxMessages) {
int n = maxMessages;
Set<PositionImpl> positions = new TreeSet<>();
long now = clock.millis();
// Pick all the messages that will be ready within the tick time period.
// This is to avoid keeping rescheduling the timer for each message at
// very short delay
long cutoffTime = now + tickTimeMillis;
long cutoffTime = getCutoffTime();

while (n > 0 && !priorityQueue.isEmpty()) {
long timestamp = priorityQueue.peekN1();
Expand Down Expand Up @@ -150,6 +159,17 @@ public long getNumberOfDelayedMessages() {
return priorityQueue.size();
}

/**
* Update the scheduled timer task such that:
* 1. If there are no delayed messages, return and do not schedule a timer task.
* 2. If the next message in the queue has the same deliverAt time as the timer task, return and leave existing
* timer task in place.
* 3. If the deliverAt time for the next delayed message has already passed (i.e. the delay is negative), return
* without scheduling a timer task since the subscription is backlogged.
* 4. Else, schedule a timer task where the delay is the greater of these two: the next message's deliverAt time or
* the last tick time plus the tickTimeMillis (to ensure we do not schedule the task more frequently than the
* tickTimeMillis).
*/
private void updateTimer() {
if (priorityQueue.isEmpty()) {
if (timeout != null) {
Expand All @@ -170,10 +190,8 @@ private void updateTimer() {
timeout.cancel();
}

long delayMillis = timestamp - clock.millis();
if (log.isDebugEnabled()) {
log.debug("[{}] Start timer in {} millis", dispatcher.getName(), delayMillis);
}
long now = clock.millis();
long delayMillis = timestamp - now;

if (delayMillis < 0) {
// There are messages that are already ready to be delivered. If
Expand All @@ -185,8 +203,18 @@ private void updateTimer() {
return;
}

// Compute the earliest time that we schedule the timer to run.
long remainingTickDelayMillis = lastTickRun + tickTimeMillis - now;
long calculatedDelayMillis = Math.max(delayMillis, remainingTickDelayMillis);

if (log.isDebugEnabled()) {
log.debug("[{}] Start timer in {} millis", dispatcher.getName(), calculatedDelayMillis);
}

// Even though we may delay longer than this timestamp because of the tick delay, we still track the
// current timeout with reference to the next message's timestamp.
currentTimeoutTarget = timestamp;
timeout = timer.newTimeout(this, delayMillis, TimeUnit.MILLISECONDS);
timeout = timer.newTimeout(this, calculatedDelayMillis, TimeUnit.MILLISECONDS);
}

@Override
Expand All @@ -199,6 +227,7 @@ public void run(Timeout timeout) throws Exception {
}

synchronized (dispatcher) {
lastTickRun = clock.millis();
currentTimeoutTarget = -1;
this.timeout = null;
dispatcher.readMoreEntries();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,16 +31,20 @@ public class InMemoryDelayedDeliveryTrackerFactory implements DelayedDeliveryTra

private long tickTimeMillis;

private boolean isDelayedDeliveryDeliverAtTimeStrict;

@Override
public void initialize(ServiceConfiguration config) {
this.timer = new HashedWheelTimer(new DefaultThreadFactory("pulsar-delayed-delivery"),
config.getDelayedDeliveryTickTimeMillis(), TimeUnit.MILLISECONDS);
this.tickTimeMillis = config.getDelayedDeliveryTickTimeMillis();
this.isDelayedDeliveryDeliverAtTimeStrict = config.isDelayedDeliveryDeliverAtTimeStrict();
}

@Override
public DelayedDeliveryTracker newTracker(PersistentDispatcherMultipleConsumers dispatcher) {
return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis);
return new InMemoryDelayedDeliveryTracker(dispatcher, timer, tickTimeMillis,
isDelayedDeliveryDeliverAtTimeStrict);
}

@Override
Expand Down
Loading