Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[ISSUE #7821] Add notifyLast flag for PopLongPollingService #7835

Merged
merged 1 commit into from
Feb 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,16 @@ public class PopLongPollingService extends ServiceThread {
private long lastCleanTime = 0;

private final AtomicLong totalPollingNum = new AtomicLong(0);
private final boolean notifyLast;

public PopLongPollingService(BrokerController brokerController, NettyRequestProcessor processor) {
public PopLongPollingService(BrokerController brokerController, NettyRequestProcessor processor, boolean notifyLast) {
this.brokerController = brokerController;
this.processor = processor;
// 100000 topic default, 100000 lru topic + cid + qid
this.topicCidMap = new ConcurrentHashMap<>(brokerController.getBrokerConfig().getPopPollingMapSize());
this.pollingMap = new ConcurrentLinkedHashMap.Builder<String, ConcurrentSkipListSet<PopRequest>>()
.maximumWeightedCapacity(this.brokerController.getBrokerConfig().getPopPollingMapSize()).build();
this.notifyLast = notifyLast;
}

@Override
Expand Down Expand Up @@ -172,17 +174,10 @@ public boolean notifyMessageArriving(final String topic, final String cid, final
if (remotingCommands == null || remotingCommands.isEmpty()) {
return false;
}
PopRequest popRequest = remotingCommands.pollFirst();
//clean inactive channel
while (popRequest != null && !popRequest.getChannel().isActive()) {
totalPollingNum.decrementAndGet();
popRequest = remotingCommands.pollFirst();
}

PopRequest popRequest = pollRemotingCommands(remotingCommands);
if (popRequest == null) {
return false;
}
totalPollingNum.decrementAndGet();
if (brokerController.getBrokerConfig().isEnablePopLog()) {
POP_LOGGER.info("lock release , new msg arrive , wakeUp : {}", popRequest);
}
Expand Down Expand Up @@ -340,4 +335,22 @@ private void cleanUnusedResource() {

lastCleanTime = System.currentTimeMillis();
}

private PopRequest pollRemotingCommands(ConcurrentSkipListSet<PopRequest> remotingCommands) {
if (remotingCommands == null || remotingCommands.isEmpty()) {
return null;
}

PopRequest popRequest;
do {
if (notifyLast) {
popRequest = remotingCommands.pollLast();
} else {
popRequest = remotingCommands.pollFirst();
}
totalPollingNum.decrementAndGet();
} while (popRequest != null && !popRequest.getChannel().isActive());

return popRequest;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ public class NotificationProcessor implements NettyRequestProcessor {

public NotificationProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
this.popLongPollingService = new PopLongPollingService(brokerController, this);
this.popLongPollingService = new PopLongPollingService(brokerController, this, true);
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ public class PopMessageProcessor implements NettyRequestProcessor {
public PopMessageProcessor(final BrokerController brokerController) {
this.brokerController = brokerController;
this.reviveTopic = PopAckConstants.buildClusterReviveTopic(this.brokerController.getBrokerConfig().getBrokerClusterName());
this.popLongPollingService = new PopLongPollingService(brokerController, this);
this.popLongPollingService = new PopLongPollingService(brokerController, this, false);
this.queueLockManager = new QueueLockManager();
this.popBufferMergeService = new PopBufferMergeService(this.brokerController, this);
this.ckMessageNumber = new AtomicLong();
Expand Down
Loading