Skip to content

Commit

Permalink
Pulsar Connection handler should not spin up a consumer / reader (#9893)
Browse files Browse the repository at this point in the history
* pulsar refactor to avoid spinning up reader in connection handler; connection handler shouldn't care about the topic or partition

* Make variable final

* closing reader in the partition level consumer
  • Loading branch information
navina authored Dec 20, 2022
1 parent f57d922 commit f1807c9
Show file tree
Hide file tree
Showing 4 changed files with 44 additions and 33 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,14 @@ public class PulsarConfig {
public static final String TLS_TRUST_CERTS_FILE_PATH = "tlsTrustCertsFilePath";
public static final String ENABLE_KEY_VALUE_STITCH = "enableKeyValueStitch";

private String _pulsarTopicName;
private String _subscriberId;
private String _bootstrapServers;
private MessageId _initialMessageId;
private SubscriptionInitialPosition _subscriptionInitialPosition;
private String _authenticationToken;
private String _tlsTrustCertsFilePath;
private boolean _enableKeyValueStitch;
private final String _pulsarTopicName;
private final String _subscriberId;
private final String _bootstrapServers;
private final MessageId _initialMessageId;
private final SubscriptionInitialPosition _subscriptionInitialPosition;
private final String _authenticationToken;
private final String _tlsTrustCertsFilePath;
private final boolean _enableKeyValueStitch;

public PulsarConfig(StreamConfig streamConfig, String subscriberId) {
Map<String, String> streamConfigMap = streamConfig.getStreamConfigsMap();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.pulsar.client.api.Authentication;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.ClientBuilder;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.Reader;
import org.slf4j.Logger;
Expand All @@ -38,19 +39,14 @@ public class PulsarPartitionLevelConnectionHandler {

protected final PulsarConfig _config;
protected final String _clientId;
protected final int _partition;
protected final String _topic;
protected PulsarClient _pulsarClient = null;
protected Reader<byte[]> _reader = null;

/**
* Creates a new instance of {@link PulsarClient} and {@link Reader}
*/
public PulsarPartitionLevelConnectionHandler(String clientId, StreamConfig streamConfig, int partition) {
public PulsarPartitionLevelConnectionHandler(String clientId, StreamConfig streamConfig) {
_config = new PulsarConfig(streamConfig, clientId);
_clientId = clientId;
_partition = partition;
_topic = _config.getPulsarTopicName();

try {
ClientBuilder pulsarClientBuilder = PulsarClient.builder().serviceUrl(_config.getBootstrapServers());
Expand All @@ -64,33 +60,38 @@ public PulsarPartitionLevelConnectionHandler(String clientId, StreamConfig strea
}

_pulsarClient = pulsarClientBuilder.build();

_reader = _pulsarClient.newReader().topic(getPartitionedTopicName(partition))
.startMessageId(_config.getInitialMessageId()).startMessageIdInclusive().create();

LOGGER.info("Created consumer with id {} for topic {}", _reader, _config.getPulsarTopicName());
LOGGER.info("Created pulsar client {}", _pulsarClient);
} catch (Exception e) {
LOGGER.error("Could not create pulsar consumer", e);
}
}

protected Reader<byte[]> createReaderForPartition(String topic, int partition, MessageId initialMessageId) {
if (_pulsarClient == null) {
throw new RuntimeException("Failed to create reader as no pulsar client found for topic " + topic);
}
try {
return _pulsarClient.newReader().topic(getPartitionedTopicName(topic, partition)).startMessageId(initialMessageId)
.startMessageIdInclusive().create();
} catch (Exception e) {
LOGGER.error("Failed to create pulsar consumer client for topic " + topic + " partition " + partition, e);
return null;
}
}

/**
* A pulsar partitioned topic with N partitions is comprised of N topics with topicName as prefix and portitionId
* as suffix.
* The method fetches the names of N partitioned topic and returns the topic name of {@param partition}
*/
protected String getPartitionedTopicName(int partition)
protected String getPartitionedTopicName(String topic, int partition)
throws Exception {
List<String> partitionTopicList = _pulsarClient.getPartitionsForTopic(_topic).get();
List<String> partitionTopicList = _pulsarClient.getPartitionsForTopic(topic).get();
return partitionTopicList.get(partition);
}

public void close()
throws IOException {
if (_reader != null) {
_reader.close();
}

if (_pulsarClient != null) {
_pulsarClient.close();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Reader;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -46,11 +47,18 @@ public class PulsarPartitionLevelConsumer extends PulsarPartitionLevelConnection
implements PartitionGroupConsumer {
private static final Logger LOGGER = LoggerFactory.getLogger(PulsarPartitionLevelConsumer.class);
private final ExecutorService _executorService;
private final Reader _reader;
private boolean _enableKeyValueStitch = false;

public PulsarPartitionLevelConsumer(String clientId, StreamConfig streamConfig,
PartitionGroupConsumptionStatus partitionGroupConsumptionStatus) {
super(clientId, streamConfig, partitionGroupConsumptionStatus.getPartitionGroupId());
super(clientId, streamConfig);
PulsarConfig config = new PulsarConfig(streamConfig, clientId);
_reader = createReaderForPartition(config.getPulsarTopicName(),
partitionGroupConsumptionStatus.getPartitionGroupId(),
config.getInitialMessageId());
LOGGER.info("Created pulsar reader with id {} for topic {} partition {}", _reader, _config.getPulsarTopicName(),
partitionGroupConsumptionStatus.getPartitionGroupId());
_executorService = Executors.newSingleThreadExecutor();
_enableKeyValueStitch = _config.getEnableKeyValueStitch();
}
Expand Down Expand Up @@ -128,6 +136,7 @@ private Iterable<Message<byte[]>> buildOffsetFilteringIterable(final List<Messag
@Override
public void close()
throws IOException {
_reader.close();
super.close();
shutdownAndAwaitTermination();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,22 +49,24 @@ public class PulsarStreamMetadataProvider extends PulsarPartitionLevelConnection
private static final Logger LOGGER = LoggerFactory.getLogger(PulsarStreamMetadataProvider.class);

private final int _partition;
private final String _topic;

public PulsarStreamMetadataProvider(String clientId, StreamConfig streamConfig) {
this(clientId, streamConfig, 0);
}

public PulsarStreamMetadataProvider(String clientId, StreamConfig streamConfig, int partition) {
super(clientId, streamConfig, partition);
super(clientId, streamConfig);
_topic = _config.getPulsarTopicName();
_partition = partition;
}

@Override
public int fetchPartitionCount(long timeoutMillis) {
try {
return _pulsarClient.getPartitionsForTopic(_config.getPulsarTopicName()).get().size();
return _pulsarClient.getPartitionsForTopic(_topic).get().size();
} catch (Exception e) {
throw new RuntimeException("Cannot fetch partitions for topic: " + _config.getPulsarTopicName(), e);
throw new RuntimeException("Cannot fetch partitions for topic: " + _topic, e);
}
}

Expand Down Expand Up @@ -119,7 +121,6 @@ public List<PartitionGroupMetadata> computePartitionGroupMetadata(String clientI
partitionGroupConsumptionStatus.getStartOffset()));
}

PulsarConfig pulsarConfig = new PulsarConfig(streamConfig, clientId);
String subscription = ConsumerName.generateRandomName();
try {
List<String> partitionedTopicNameList = _pulsarClient.getPartitionsForTopic(_topic).get();
Expand All @@ -129,7 +130,7 @@ public List<PartitionGroupMetadata> computePartitionGroupMetadata(String clientI

for (int p = newPartitionStartIndex; p < partitionedTopicNameList.size(); p++) {
try (Consumer consumer = _pulsarClient.newConsumer().topic(partitionedTopicNameList.get(p))
.subscriptionInitialPosition(pulsarConfig.getInitialSubscriberPosition())
.subscriptionInitialPosition(_config.getInitialSubscriberPosition())
.subscriptionMode(SubscriptionMode.NonDurable)
.subscriptionName(subscription).subscribe()) {

Expand All @@ -148,8 +149,8 @@ public List<PartitionGroupMetadata> computePartitionGroupMetadata(String clientI
new PartitionGroupMetadata(p, new MessageIdStreamOffset(lastMessageId)));
}
} catch (PulsarClientException pce) {
LOGGER.warn("Error encountered while calculating partition group metadata for topic " + _topic
+ " partition " + partitionedTopicNameList.get(p), pce);
LOGGER.warn("Error encountered while calculating partition group metadata for topic "
+ _config.getPulsarTopicName() + " partition " + partitionedTopicNameList.get(p), pce);
}
}
}
Expand Down

0 comments on commit f1807c9

Please sign in to comment.