Skip to content

Commit

Permalink
[improve][broker] Avoid logging errors when there is a connection iss…
Browse files Browse the repository at this point in the history
…ue during subscription. (apache#23939)

(cherry picked from commit 5e5d514)
  • Loading branch information
RobertIndie committed Feb 7, 2025
1 parent 560d120 commit 35c97fc
Show file tree
Hide file tree
Showing 3 changed files with 12 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,12 @@ public TopicPoliciesCacheNotInitException() {
}
}

public static class ConnectionClosedException extends BrokerServiceException {
public ConnectionClosedException(String msg) {
super(msg);
}
}

public static class TopicBacklogQuotaExceededException extends BrokerServiceException {
@Getter
private final BacklogQuota.RetentionPolicy retentionPolicy;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,8 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St
consumer.consumerName(), currentUsageCount());
}
future.completeExceptionally(
new BrokerServiceException("Connection was closed while the opening the cursor "));
new BrokerServiceException.ConnectionClosedException(
"Connection was closed while the opening the cursor "));
} else {
log.info("[{}][{}] Created new subscription for {}", topic, subscriptionName, consumerId);
future.complete(consumer);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1004,7 +1004,8 @@ private CompletableFuture<Consumer> internalSubscribe(final TransportCnx cnx, St

decrementUsageCount();
return FutureUtil.failedFuture(
new BrokerServiceException("Connection was closed while the opening the cursor "));
new BrokerServiceException.ConnectionClosedException(
"Connection was closed while the opening the cursor "));
} else {
checkReplicatedSubscriptionControllerState();
if (log.isDebugEnabled()) {
Expand Down Expand Up @@ -1041,6 +1042,8 @@ && isCompactionSubscription(subscriptionName)) {
log.warn("[{}][{}] has been fenced. closing the topic {}", topic, subscriptionName,
ex.getMessage());
close();
} else if (ex.getCause() instanceof BrokerServiceException.ConnectionClosedException) {
log.warn("[{}][{}] Connection was closed while the opening the cursor", topic, subscriptionName);
} else {
log.error("[{}] Failed to create subscription: {}", topic, subscriptionName, ex);
}
Expand Down

0 comments on commit 35c97fc

Please sign in to comment.