Skip to content

Commit

Permalink
[ISSUE #7803] Add try catch for lock and unlock (#7804)
Browse files Browse the repository at this point in the history
* Add try catch for lock and unlock
  • Loading branch information
drpmma authored Feb 4, 2024
1 parent cb7fa3e commit 702bbd2
Showing 1 changed file with 53 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -414,49 +414,59 @@ public CompletableFuture<Long> queryConsumerOffset(ProxyContext ctx, MessageQueu
public CompletableFuture<Set<MessageQueue>> lockBatchMQ(ProxyContext ctx, Set<MessageQueue> mqSet,
String consumerGroup, String clientId, long timeoutMillis) {
CompletableFuture<Set<MessageQueue>> future = new CompletableFuture<>();
Set<MessageQueue> successSet = new CopyOnWriteArraySet<>();
Set<AddressableMessageQueue> addressableMessageQueueSet = buildAddressableSet(ctx, mqSet);
Map<String, List<AddressableMessageQueue>> messageQueueSetMap = buildAddressableMapByBrokerName(addressableMessageQueueSet);
List<CompletableFuture<Void>> futureList = new ArrayList<>();
messageQueueSetMap.forEach((k, v) -> {
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(consumerGroup);
requestBody.setClientId(clientId);
requestBody.setMqSet(v.stream().map(AddressableMessageQueue::getMessageQueue).collect(Collectors.toSet()));
CompletableFuture<Void> future0 = serviceManager.getMessageService()
.lockBatchMQ(ctx, v.get(0), requestBody, timeoutMillis)
.thenAccept(successSet::addAll);
futureList.add(FutureUtils.addExecutor(future0, this.executor));
});
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).whenComplete((v, t) -> {
if (t != null) {
log.error("LockBatchMQ failed", t);
}
future.complete(successSet);
});
try {
Set<MessageQueue> successSet = new CopyOnWriteArraySet<>();
Set<AddressableMessageQueue> addressableMessageQueueSet = buildAddressableSet(ctx, mqSet);
Map<String, List<AddressableMessageQueue>> messageQueueSetMap = buildAddressableMapByBrokerName(addressableMessageQueueSet);
List<CompletableFuture<Void>> futureList = new ArrayList<>();
messageQueueSetMap.forEach((k, v) -> {
LockBatchRequestBody requestBody = new LockBatchRequestBody();
requestBody.setConsumerGroup(consumerGroup);
requestBody.setClientId(clientId);
requestBody.setMqSet(v.stream().map(AddressableMessageQueue::getMessageQueue).collect(Collectors.toSet()));
CompletableFuture<Void> future0 = serviceManager.getMessageService()
.lockBatchMQ(ctx, v.get(0), requestBody, timeoutMillis)
.thenAccept(successSet::addAll);
futureList.add(FutureUtils.addExecutor(future0, this.executor));
});
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).whenComplete((v, t) -> {
if (t != null) {
log.error("LockBatchMQ failed, group={}", consumerGroup, t);
}
future.complete(successSet);
});
} catch (Throwable t) {
log.error("LockBatchMQ exception, group={}", consumerGroup, t);
future.completeExceptionally(t);
}
return FutureUtils.addExecutor(future, this.executor);
}

public CompletableFuture<Void> unlockBatchMQ(ProxyContext ctx, Set<MessageQueue> mqSet,
String consumerGroup, String clientId, long timeoutMillis) {
CompletableFuture<Void> future = new CompletableFuture<>();
Set<AddressableMessageQueue> addressableMessageQueueSet = buildAddressableSet(ctx, mqSet);
Map<String, List<AddressableMessageQueue>> messageQueueSetMap = buildAddressableMapByBrokerName(addressableMessageQueueSet);
List<CompletableFuture<Void>> futureList = new ArrayList<>();
messageQueueSetMap.forEach((k, v) -> {
UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody();
requestBody.setConsumerGroup(consumerGroup);
requestBody.setClientId(clientId);
requestBody.setMqSet(v.stream().map(AddressableMessageQueue::getMessageQueue).collect(Collectors.toSet()));
CompletableFuture<Void> future0 = serviceManager.getMessageService().unlockBatchMQ(ctx, v.get(0), requestBody, timeoutMillis);
futureList.add(FutureUtils.addExecutor(future0, this.executor));
});
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).whenComplete((v, t) -> {
if (t != null) {
log.error("UnlockBatchMQ failed", t);
}
future.complete(null);
});
try {
Set<AddressableMessageQueue> addressableMessageQueueSet = buildAddressableSet(ctx, mqSet);
Map<String, List<AddressableMessageQueue>> messageQueueSetMap = buildAddressableMapByBrokerName(addressableMessageQueueSet);
List<CompletableFuture<Void>> futureList = new ArrayList<>();
messageQueueSetMap.forEach((k, v) -> {
UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody();
requestBody.setConsumerGroup(consumerGroup);
requestBody.setClientId(clientId);
requestBody.setMqSet(v.stream().map(AddressableMessageQueue::getMessageQueue).collect(Collectors.toSet()));
CompletableFuture<Void> future0 = serviceManager.getMessageService().unlockBatchMQ(ctx, v.get(0), requestBody, timeoutMillis);
futureList.add(FutureUtils.addExecutor(future0, this.executor));
});
CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).whenComplete((v, t) -> {
if (t != null) {
log.error("UnlockBatchMQ failed, group={}", consumerGroup, t);
}
future.complete(null);
});
} catch (Throwable t) {
log.error("UnlockBatchMQ exception, group={}", consumerGroup, t);
future.completeExceptionally(t);
}
return FutureUtils.addExecutor(future, this.executor);
}

Expand Down Expand Up @@ -505,7 +515,13 @@ protected Set<AddressableMessageQueue> buildAddressableSet(ProxyContext ctx, Set
protected HashMap<String, List<AddressableMessageQueue>> buildAddressableMapByBrokerName(
final Set<AddressableMessageQueue> mqSet) {
HashMap<String, List<AddressableMessageQueue>> result = new HashMap<>();
if (mqSet == null) {
return result;
}
for (AddressableMessageQueue mq : mqSet) {
if (mq == null) {
continue;
}
List<AddressableMessageQueue> mqs = result.computeIfAbsent(mq.getBrokerName(), k -> new ArrayList<>());
mqs.add(mq);
}
Expand Down

0 comments on commit 702bbd2

Please sign in to comment.