diff --git a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java index 7870233576a..3ff34237014 100644 --- a/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java +++ b/proxy/src/main/java/org/apache/rocketmq/proxy/processor/ConsumerProcessor.java @@ -414,49 +414,59 @@ public CompletableFuture queryConsumerOffset(ProxyContext ctx, MessageQueu public CompletableFuture> lockBatchMQ(ProxyContext ctx, Set mqSet, String consumerGroup, String clientId, long timeoutMillis) { CompletableFuture> future = new CompletableFuture<>(); - Set successSet = new CopyOnWriteArraySet<>(); - Set addressableMessageQueueSet = buildAddressableSet(ctx, mqSet); - Map> messageQueueSetMap = buildAddressableMapByBrokerName(addressableMessageQueueSet); - List> futureList = new ArrayList<>(); - messageQueueSetMap.forEach((k, v) -> { - LockBatchRequestBody requestBody = new LockBatchRequestBody(); - requestBody.setConsumerGroup(consumerGroup); - requestBody.setClientId(clientId); - requestBody.setMqSet(v.stream().map(AddressableMessageQueue::getMessageQueue).collect(Collectors.toSet())); - CompletableFuture future0 = serviceManager.getMessageService() - .lockBatchMQ(ctx, v.get(0), requestBody, timeoutMillis) - .thenAccept(successSet::addAll); - futureList.add(FutureUtils.addExecutor(future0, this.executor)); - }); - CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).whenComplete((v, t) -> { - if (t != null) { - log.error("LockBatchMQ failed", t); - } - future.complete(successSet); - }); + try { + Set successSet = new CopyOnWriteArraySet<>(); + Set addressableMessageQueueSet = buildAddressableSet(ctx, mqSet); + Map> messageQueueSetMap = buildAddressableMapByBrokerName(addressableMessageQueueSet); + List> futureList = new ArrayList<>(); + messageQueueSetMap.forEach((k, v) -> { + LockBatchRequestBody requestBody = new LockBatchRequestBody(); + requestBody.setConsumerGroup(consumerGroup); + requestBody.setClientId(clientId); + requestBody.setMqSet(v.stream().map(AddressableMessageQueue::getMessageQueue).collect(Collectors.toSet())); + CompletableFuture future0 = serviceManager.getMessageService() + .lockBatchMQ(ctx, v.get(0), requestBody, timeoutMillis) + .thenAccept(successSet::addAll); + futureList.add(FutureUtils.addExecutor(future0, this.executor)); + }); + CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).whenComplete((v, t) -> { + if (t != null) { + log.error("LockBatchMQ failed, group={}", consumerGroup, t); + } + future.complete(successSet); + }); + } catch (Throwable t) { + log.error("LockBatchMQ exception, group={}", consumerGroup, t); + future.completeExceptionally(t); + } return FutureUtils.addExecutor(future, this.executor); } public CompletableFuture unlockBatchMQ(ProxyContext ctx, Set mqSet, String consumerGroup, String clientId, long timeoutMillis) { CompletableFuture future = new CompletableFuture<>(); - Set addressableMessageQueueSet = buildAddressableSet(ctx, mqSet); - Map> messageQueueSetMap = buildAddressableMapByBrokerName(addressableMessageQueueSet); - List> futureList = new ArrayList<>(); - messageQueueSetMap.forEach((k, v) -> { - UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody(); - requestBody.setConsumerGroup(consumerGroup); - requestBody.setClientId(clientId); - requestBody.setMqSet(v.stream().map(AddressableMessageQueue::getMessageQueue).collect(Collectors.toSet())); - CompletableFuture future0 = serviceManager.getMessageService().unlockBatchMQ(ctx, v.get(0), requestBody, timeoutMillis); - futureList.add(FutureUtils.addExecutor(future0, this.executor)); - }); - CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).whenComplete((v, t) -> { - if (t != null) { - log.error("UnlockBatchMQ failed", t); - } - future.complete(null); - }); + try { + Set addressableMessageQueueSet = buildAddressableSet(ctx, mqSet); + Map> messageQueueSetMap = buildAddressableMapByBrokerName(addressableMessageQueueSet); + List> futureList = new ArrayList<>(); + messageQueueSetMap.forEach((k, v) -> { + UnlockBatchRequestBody requestBody = new UnlockBatchRequestBody(); + requestBody.setConsumerGroup(consumerGroup); + requestBody.setClientId(clientId); + requestBody.setMqSet(v.stream().map(AddressableMessageQueue::getMessageQueue).collect(Collectors.toSet())); + CompletableFuture future0 = serviceManager.getMessageService().unlockBatchMQ(ctx, v.get(0), requestBody, timeoutMillis); + futureList.add(FutureUtils.addExecutor(future0, this.executor)); + }); + CompletableFuture.allOf(futureList.toArray(new CompletableFuture[0])).whenComplete((v, t) -> { + if (t != null) { + log.error("UnlockBatchMQ failed, group={}", consumerGroup, t); + } + future.complete(null); + }); + } catch (Throwable t) { + log.error("UnlockBatchMQ exception, group={}", consumerGroup, t); + future.completeExceptionally(t); + } return FutureUtils.addExecutor(future, this.executor); } @@ -505,7 +515,13 @@ protected Set buildAddressableSet(ProxyContext ctx, Set protected HashMap> buildAddressableMapByBrokerName( final Set mqSet) { HashMap> result = new HashMap<>(); + if (mqSet == null) { + return result; + } for (AddressableMessageQueue mq : mqSet) { + if (mq == null) { + continue; + } List mqs = result.computeIfAbsent(mq.getBrokerName(), k -> new ArrayList<>()); mqs.add(mq); }