Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix] Remove closed producers and consumers from client #2

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/ci-pr-validation.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ jobs:
run: ./pulsar-test-service-start.sh

- name: Run unit tests
run: ./run-unit-tests.sh
run: RETRY_FAILED=3 ./run-unit-tests.sh

- name: Stop Pulsar service
run: ./pulsar-test-service-stop.sh
Expand Down
2 changes: 1 addition & 1 deletion lib/ClientConnection.h
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,7 @@ class PULSAR_PUBLIC ClientConnection : public std::enable_shared_from_this<Clien
typedef std::map<long, Promise<Result, NamespaceTopicsPtr>> PendingGetNamespaceTopicsMap;
PendingGetNamespaceTopicsMap pendingGetNamespaceTopicsRequests_;

std::mutex mutex_;
mutable std::mutex mutex_;
typedef std::unique_lock<std::mutex> Lock;

// Pending buffers to write on the socket
Expand Down
89 changes: 51 additions & 38 deletions lib/ClientImpl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -189,9 +189,15 @@ void ClientImpl::handleCreateProducer(const Result result, const LookupDataResul
void ClientImpl::handleProducerCreated(Result result, ProducerImplBaseWeakPtr producerBaseWeakPtr,
CreateProducerCallback callback, ProducerImplBasePtr producer) {
if (result == ResultOk) {
Lock lock(mutex_);
producers_.push_back(producer);
lock.unlock();
auto pair = producers_.emplace(producer.get(), producer);
if (!pair.second) {
auto existingProducer = pair.first->second.lock();
LOG_ERROR("Unexpected existing producer at the same address: "
<< pair.first->first << ", producer: "
<< (existingProducer ? existingProducer->getProducerName() : "(null)"));
callback(ResultUnknownError, {});
return;
}
callback(result, Producer(producer));
} else {
callback(result, {});
Expand Down Expand Up @@ -241,9 +247,18 @@ void ClientImpl::handleReaderMetadataLookup(const Result result, const LookupDat
ConsumerImplBasePtr consumer = reader->getConsumer().lock();
auto self = shared_from_this();
reader->start(startMessageId, [this, self](const ConsumerImplBaseWeakPtr& weakConsumerPtr) {
Lock lock(mutex_);
consumers_.push_back(weakConsumerPtr);
lock.unlock();
auto consumer = weakConsumerPtr.lock();
if (consumer) {
auto pair = consumers_.emplace(consumer.get(), consumer);
if (!pair.second) {
auto existingConsumer = pair.first->second.lock();
LOG_ERROR("Unexpected existing consumer at the same address: "
<< pair.first->first
<< ", consumer: " << (existingConsumer ? existingConsumer->getName() : "(null)"));
}
} else {
LOG_ERROR("Unexpected case: the consumer is somehow expired");
}
});
}

Expand Down Expand Up @@ -397,9 +412,15 @@ void ClientImpl::handleSubscribe(const Result result, const LookupDataResultPtr
void ClientImpl::handleConsumerCreated(Result result, ConsumerImplBaseWeakPtr consumerImplBaseWeakPtr,
SubscribeCallback callback, ConsumerImplBasePtr consumer) {
if (result == ResultOk) {
Lock lock(mutex_);
consumers_.push_back(consumer);
lock.unlock();
auto pair = consumers_.emplace(consumer.get(), consumer);
if (!pair.second) {
auto existingConsumer = pair.first->second.lock();
LOG_ERROR("Unexpected existing consumer at the same address: "
<< pair.first->first
<< ", consumer: " << (existingConsumer ? existingConsumer->getName() : "(null)"));
callback(ResultUnknownError, {});
return;
}
callback(result, Consumer(consumer));
} else {
callback(result, {});
Expand Down Expand Up @@ -477,27 +498,26 @@ void ClientImpl::getPartitionsForTopicAsync(const std::string& topic, GetPartiti
}

void ClientImpl::closeAsync(CloseCallback callback) {
Lock lock(mutex_);
ProducersList producers(producers_);
ConsumersList consumers(consumers_);

if (state_ != Open && callback) {
lock.unlock();
callback(ResultAlreadyClosed);
if (state_ != Open) {
if (callback) {
callback(ResultAlreadyClosed);
}
return;
}
// Set the state to Closing so that no producers could get added
state_ = Closing;
lock.unlock();

memoryLimitController_.close();

auto producers = producers_.move();
auto consumers = consumers_.move();

SharedInt numberOfOpenHandlers = std::make_shared<int>(producers.size() + consumers.size());
LOG_INFO("Closing Pulsar client with " << producers.size() << " producers and " << consumers.size()
<< " consumers");

for (ProducersList::iterator it = producers.begin(); it != producers.end(); ++it) {
ProducerImplBasePtr producer = it->lock();
for (auto&& kv : producers) {
ProducerImplBasePtr producer = kv.second.lock();
if (producer && !producer->isClosed()) {
producer->closeAsync(std::bind(&ClientImpl::handleClose, shared_from_this(),
std::placeholders::_1, numberOfOpenHandlers, callback));
Expand All @@ -507,8 +527,8 @@ void ClientImpl::closeAsync(CloseCallback callback) {
}
}

for (ConsumersList::iterator it = consumers.begin(); it != consumers.end(); ++it) {
ConsumerImplBasePtr consumer = it->lock();
for (auto&& kv : consumers) {
ConsumerImplBasePtr consumer = kv.second.lock();
if (consumer && !consumer->isClosed()) {
consumer->closeAsync(std::bind(&ClientImpl::handleClose, shared_from_this(),
std::placeholders::_1, numberOfOpenHandlers, callback));
Expand Down Expand Up @@ -562,23 +582,18 @@ void ClientImpl::handleClose(Result result, SharedInt numberOfOpenHandlers, Resu
}

void ClientImpl::shutdown() {
Lock lock(mutex_);
ProducersList producers;
ConsumersList consumers;
auto producers = producers_.move();
auto consumers = consumers_.move();

producers.swap(producers_);
consumers.swap(consumers_);
lock.unlock();

for (ProducersList::iterator it = producers.begin(); it != producers.end(); ++it) {
ProducerImplBasePtr producer = it->lock();
for (auto&& kv : producers) {
ProducerImplBasePtr producer = kv.second.lock();
if (producer) {
producer->shutdown();
}
}

for (ConsumersList::iterator it = consumers.begin(); it != consumers.end(); ++it) {
ConsumerImplBasePtr consumer = it->lock();
for (auto&& kv : consumers) {
ConsumerImplBasePtr consumer = kv.second.lock();
if (consumer) {
consumer->shutdown();
}
Expand Down Expand Up @@ -631,26 +646,24 @@ uint64_t ClientImpl::newRequestId() {
}

uint64_t ClientImpl::getNumberOfProducers() {
Lock lock(mutex_);
uint64_t numberOfAliveProducers = 0;
for (const auto& producer : producers_) {
producers_.forEachValue([&numberOfAliveProducers](const ProducerImplBaseWeakPtr& producer) {
const auto& producerImpl = producer.lock();
if (producerImpl) {
numberOfAliveProducers += producerImpl->getNumberOfConnectedProducer();
}
}
});
return numberOfAliveProducers;
}

uint64_t ClientImpl::getNumberOfConsumers() {
Lock lock(mutex_);
uint64_t numberOfAliveConsumers = 0;
for (const auto& consumer : consumers_) {
consumers_.forEachValue([&numberOfAliveConsumers](const ConsumerImplBaseWeakPtr& consumer) {
const auto consumerImpl = consumer.lock();
if (consumerImpl) {
numberOfAliveConsumers += consumerImpl->getNumberOfConnectedConsumer();
}
}
});
return numberOfAliveConsumers;
}

Expand Down
13 changes: 8 additions & 5 deletions lib/ClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
#include <atomic>
#include <vector>
#include "ServiceNameResolver.h"
#include "SynchronizedHashMap.h"

namespace pulsar {

Expand Down Expand Up @@ -91,6 +92,11 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
ExecutorServiceProviderPtr getListenerExecutorProvider();
ExecutorServiceProviderPtr getPartitionListenerExecutorProvider();
LookupServicePtr getLookup();

void cleanupProducer(ProducerImplBase* address) { producers_.remove(address); }

void cleanupConsumer(ConsumerImplBase* address) { consumers_.remove(address); }

friend class PulsarFriend;

private:
Expand Down Expand Up @@ -147,11 +153,8 @@ class ClientImpl : public std::enable_shared_from_this<ClientImpl> {
uint64_t consumerIdGenerator_;
uint64_t requestIdGenerator_;

typedef std::vector<ProducerImplBaseWeakPtr> ProducersList;
ProducersList producers_;

typedef std::vector<ConsumerImplBaseWeakPtr> ConsumersList;
ConsumersList consumers_;
SynchronizedHashMap<ProducerImplBase*, ProducerImplBaseWeakPtr> producers_;
SynchronizedHashMap<ConsumerImplBase*, ConsumerImplBaseWeakPtr> consumers_;

std::atomic<Result> closingError;

Expand Down
4 changes: 2 additions & 2 deletions lib/ConnectionPool.h
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ class PULSAR_PUBLIC ConnectionPool {
typedef std::map<std::string, ClientConnectionWeakPtr> PoolMap;
PoolMap pool_;
bool poolConnections_;
std::mutex mutex_;
mutable std::mutex mutex_;
std::atomic_bool closed_{false};

friend class ConnectionPoolTest;
friend class PulsarFriend;
};
} // namespace pulsar
#endif //_PULSAR_CONNECTION_POOL_HEADER_
Loading