diff --git a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java index 883a048e78ce4..2052af9160398 100644 --- a/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java +++ b/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java @@ -570,7 +570,7 @@ public class KafkaConsumer implements Consumer { private Logger log; private final String clientId; - private String groupId; + private final Optional groupId; private final ConsumerCoordinator coordinator; private final Deserializer keyDeserializer; private final Deserializer valueDeserializer; @@ -673,7 +673,7 @@ private KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, De GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config, GroupRebalanceConfig.ProtocolType.CONSUMER); - this.groupId = groupRebalanceConfig.groupId; + this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId); this.clientId = buildClientId(config.getString(CommonClientConfigs.CLIENT_ID_CONFIG), groupRebalanceConfig); LogContext logContext; @@ -681,20 +681,22 @@ private KafkaConsumer(ConsumerConfig config, Deserializer keyDeserializer, De // If group.instance.id is set, we will append it to the log context. if (groupRebalanceConfig.groupInstanceId.isPresent()) { logContext = new LogContext("[Consumer instanceId=" + groupRebalanceConfig.groupInstanceId.get() + - ", clientId=" + clientId + ", groupId=" + groupId + "] "); + ", clientId=" + clientId + ", groupId=" + groupId.orElse("null") + "] "); } else { - logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId + "] "); + logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId.orElse("null") + "] "); } this.log = logContext.logger(getClass()); boolean enableAutoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG); - if (groupId == null) { // overwrite in case of default group id where the config is not explicitly provided - if (!config.originals().containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) + if (!groupId.isPresent()) { // overwrite in case of default group id where the config is not explicitly provided + if (!config.originals().containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) { enableAutoCommit = false; - else if (enableAutoCommit) + } else if (enableAutoCommit) { throw new InvalidConfigurationException(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " cannot be set to true when default group id (null) is used."); - } else if (groupId.isEmpty()) + } + } else if (groupId.get().isEmpty()) { log.warn("Support for using the empty group id by consumers is deprecated and will be removed in the next major release."); + } log.debug("Initializing the Kafka consumer"); this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG); @@ -773,7 +775,7 @@ else if (enableAutoCommit) this.assignors = getAssignorInstances(config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), config.originals()); // no coordinator will be constructed for the default (null) group id - this.coordinator = groupId == null ? null : + this.coordinator = !groupId.isPresent() ? null : new ConsumerCoordinator(groupRebalanceConfig, logContext, this.client, @@ -858,7 +860,7 @@ else if (enableAutoCommit) this.requestTimeoutMs = requestTimeoutMs; this.defaultApiTimeoutMs = defaultApiTimeoutMs; this.assignors = assignors; - this.groupId = groupId; + this.groupId = Optional.ofNullable(groupId); this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer"); } @@ -2436,7 +2438,7 @@ private void throwIfNoAssignorsConfigured() { } private void maybeThrowInvalidGroupIdException() { - if (groupId == null) + if (!groupId.isPresent()) throw new InvalidGroupIdException("To use the group management or offset commit APIs, you must " + "provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration."); }