Skip to content

Commit

Permalink
KAFKA-9410; Make groupId Optional in KafkaConsumer (#7943)
Browse files Browse the repository at this point in the history
Reviewers: Ron Dagostino <[email protected]>, Jason Gustafson <[email protected]>
  • Loading branch information
belugabehr authored and hachikuji committed Jan 16, 2020
1 parent 5c00191 commit 6d87c12
Showing 1 changed file with 13 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -570,7 +570,7 @@ public class KafkaConsumer<K, V> implements Consumer<K, V> {

private Logger log;
private final String clientId;
private String groupId;
private final Optional<String> groupId;
private final ConsumerCoordinator coordinator;
private final Deserializer<K> keyDeserializer;
private final Deserializer<V> valueDeserializer;
Expand Down Expand Up @@ -673,28 +673,30 @@ private KafkaConsumer(ConsumerConfig config, Deserializer<K> keyDeserializer, De
GroupRebalanceConfig groupRebalanceConfig = new GroupRebalanceConfig(config,
GroupRebalanceConfig.ProtocolType.CONSUMER);

this.groupId = groupRebalanceConfig.groupId;
this.groupId = Optional.ofNullable(groupRebalanceConfig.groupId);
this.clientId = buildClientId(config.getString(CommonClientConfigs.CLIENT_ID_CONFIG), groupRebalanceConfig);

LogContext logContext;

// If group.instance.id is set, we will append it to the log context.
if (groupRebalanceConfig.groupInstanceId.isPresent()) {
logContext = new LogContext("[Consumer instanceId=" + groupRebalanceConfig.groupInstanceId.get() +
", clientId=" + clientId + ", groupId=" + groupId + "] ");
", clientId=" + clientId + ", groupId=" + groupId.orElse("null") + "] ");
} else {
logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId + "] ");
logContext = new LogContext("[Consumer clientId=" + clientId + ", groupId=" + groupId.orElse("null") + "] ");
}

this.log = logContext.logger(getClass());
boolean enableAutoCommit = config.getBoolean(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG);
if (groupId == null) { // overwrite in case of default group id where the config is not explicitly provided
if (!config.originals().containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG))
if (!groupId.isPresent()) { // overwrite in case of default group id where the config is not explicitly provided
if (!config.originals().containsKey(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG)) {
enableAutoCommit = false;
else if (enableAutoCommit)
} else if (enableAutoCommit) {
throw new InvalidConfigurationException(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG + " cannot be set to true when default group id (null) is used.");
} else if (groupId.isEmpty())
}
} else if (groupId.get().isEmpty()) {
log.warn("Support for using the empty group id by consumers is deprecated and will be removed in the next major release.");
}

log.debug("Initializing the Kafka consumer");
this.requestTimeoutMs = config.getInt(ConsumerConfig.REQUEST_TIMEOUT_MS_CONFIG);
Expand Down Expand Up @@ -773,7 +775,7 @@ else if (enableAutoCommit)
this.assignors = getAssignorInstances(config.getList(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG), config.originals());

// no coordinator will be constructed for the default (null) group id
this.coordinator = groupId == null ? null :
this.coordinator = !groupId.isPresent() ? null :
new ConsumerCoordinator(groupRebalanceConfig,
logContext,
this.client,
Expand Down Expand Up @@ -858,7 +860,7 @@ else if (enableAutoCommit)
this.requestTimeoutMs = requestTimeoutMs;
this.defaultApiTimeoutMs = defaultApiTimeoutMs;
this.assignors = assignors;
this.groupId = groupId;
this.groupId = Optional.ofNullable(groupId);
this.kafkaConsumerMetrics = new KafkaConsumerMetrics(metrics, "consumer");
}

Expand Down Expand Up @@ -2436,7 +2438,7 @@ private void throwIfNoAssignorsConfigured() {
}

private void maybeThrowInvalidGroupIdException() {
if (groupId == null)
if (!groupId.isPresent())
throw new InvalidGroupIdException("To use the group management or offset commit APIs, you must " +
"provide a valid " + ConsumerConfig.GROUP_ID_CONFIG + " in the consumer configuration.");
}
Expand Down

0 comments on commit 6d87c12

Please sign in to comment.