diff --git a/cpp/.gitignore b/cpp/.gitignore index 23e0e933..b7f10c0f 100644 --- a/cpp/.gitignore +++ b/cpp/.gitignore @@ -18,3 +18,4 @@ bazel-rocketmq-client-cpp /bazel-* /compile_commands.json /.cache/ +.clangd diff --git a/cpp/source/rocketmq/Producer.cpp b/cpp/source/rocketmq/Producer.cpp index 8620f681..78d812ed 100644 --- a/cpp/source/rocketmq/Producer.cpp +++ b/cpp/source/rocketmq/Producer.cpp @@ -88,7 +88,7 @@ ProducerBuilder& ProducerBuilder::withConfiguration(Configuration configuration) } ProducerBuilder& ProducerBuilder::withTopics(const std::vector& topics) { - impl_->topicsOfInterest(topics); + impl_->withTopics(topics); return *this; } diff --git a/cpp/source/rocketmq/ProducerImpl.cpp b/cpp/source/rocketmq/ProducerImpl.cpp index 32b2ecad..73130161 100644 --- a/cpp/source/rocketmq/ProducerImpl.cpp +++ b/cpp/source/rocketmq/ProducerImpl.cpp @@ -16,6 +16,7 @@ */ #include "ProducerImpl.h" +#include #include #include @@ -575,9 +576,22 @@ void ProducerImpl::onOrphanedTransactionalMessage(MessageConstSharedPtr message) } } -void ProducerImpl::topicsOfInterest(std::vector topics) { +void ProducerImpl::topicsOfInterest(std::vector &topics) { absl::MutexLock lk(&topics_mtx_); - topics_.swap(topics); + for (auto& topic : topics_) { + if (std::find(topics.begin(), topics.end(), topic) == topics.end()) { + topics.push_back(topic); + } + } +} + +void ProducerImpl::withTopics(const std::vector &topics) { + absl::MutexLock lk(&topics_mtx_); + for (auto &topic: topics) { + if (std::find(topics_.begin(), topics_.end(), topic) == topics_.end()) { + topics_.push_back(topic); + } + } } void ProducerImpl::buildClientSettings(rmq::Settings& settings) { diff --git a/cpp/source/rocketmq/PushConsumerImpl.cpp b/cpp/source/rocketmq/PushConsumerImpl.cpp index d73407b4..505854db 100644 --- a/cpp/source/rocketmq/PushConsumerImpl.cpp +++ b/cpp/source/rocketmq/PushConsumerImpl.cpp @@ -47,7 +47,7 @@ PushConsumerImpl::~PushConsumerImpl() { shutdown(); } -void PushConsumerImpl::topicsOfInterest(std::vector topics) { +void PushConsumerImpl::topicsOfInterest(std::vector &topics) { absl::MutexLock lk(&topic_filter_expression_table_mtx_); for (const auto& entry : topic_filter_expression_table_) { topics.push_back(entry.first); diff --git a/cpp/source/rocketmq/SimpleConsumerImpl.cpp b/cpp/source/rocketmq/SimpleConsumerImpl.cpp index 09acb7ab..7a1b3edf 100644 --- a/cpp/source/rocketmq/SimpleConsumerImpl.cpp +++ b/cpp/source/rocketmq/SimpleConsumerImpl.cpp @@ -67,7 +67,7 @@ void SimpleConsumerImpl::buildClientSettings(rmq::Settings& settings) { } } -void SimpleConsumerImpl::topicsOfInterest(std::vector topics) { +void SimpleConsumerImpl::topicsOfInterest(std::vector &topics) { absl::MutexLock lk(&subscriptions_mtx_); for (const auto& entry : subscriptions_) { if (std::find(topics.begin(), topics.end(), entry.first) == topics.end()) { diff --git a/cpp/source/rocketmq/include/ClientImpl.h b/cpp/source/rocketmq/include/ClientImpl.h index 70dc5382..c266047a 100644 --- a/cpp/source/rocketmq/include/ClientImpl.h +++ b/cpp/source/rocketmq/include/ClientImpl.h @@ -167,7 +167,7 @@ class ClientImpl : virtual public Client { absl::flat_hash_map> session_map_ GUARDED_BY(session_map_mtx_); absl::Mutex session_map_mtx_; - virtual void topicsOfInterest(std::vector topics) { + virtual void topicsOfInterest(std::vector &topics) { } void updateRouteInfo() LOCKS_EXCLUDED(topic_route_table_mtx_); diff --git a/cpp/source/rocketmq/include/ProducerImpl.h b/cpp/source/rocketmq/include/ProducerImpl.h index ad9b24d5..d7260a93 100644 --- a/cpp/source/rocketmq/include/ProducerImpl.h +++ b/cpp/source/rocketmq/include/ProducerImpl.h @@ -107,7 +107,9 @@ class ProducerImpl : virtual public ClientImpl, public std::enable_shared_from_t void buildClientSettings(rmq::Settings& settings) override; - void topicsOfInterest(std::vector topics) override LOCKS_EXCLUDED(topics_mtx_); + void topicsOfInterest(std::vector &topics) override LOCKS_EXCLUDED(topics_mtx_); + + void withTopics(const std::vector &topics) LOCKS_EXCLUDED(topics_mtx_); const PublishStats& stats() const { return stats_; diff --git a/cpp/source/rocketmq/include/PushConsumerImpl.h b/cpp/source/rocketmq/include/PushConsumerImpl.h index d512f4c8..7a4ff1a3 100644 --- a/cpp/source/rocketmq/include/PushConsumerImpl.h +++ b/cpp/source/rocketmq/include/PushConsumerImpl.h @@ -52,7 +52,7 @@ class PushConsumerImpl : virtual public ClientImpl, public std::enable_shared_fr void prepareHeartbeatData(HeartbeatRequest& request) override; - void topicsOfInterest(std::vector topics) override LOCKS_EXCLUDED(topic_filter_expression_table_mtx_); + void topicsOfInterest(std::vector &topics) override LOCKS_EXCLUDED(topic_filter_expression_table_mtx_); void start() override; diff --git a/cpp/source/rocketmq/include/SimpleConsumerImpl.h b/cpp/source/rocketmq/include/SimpleConsumerImpl.h index a20cce56..45aa61b9 100644 --- a/cpp/source/rocketmq/include/SimpleConsumerImpl.h +++ b/cpp/source/rocketmq/include/SimpleConsumerImpl.h @@ -66,7 +66,7 @@ class SimpleConsumerImpl : public ClientImpl, public std::enable_shared_from_thi } protected: - void topicsOfInterest(std::vector topics) override; + void topicsOfInterest(std::vector &topics) override; private: absl::flat_hash_map subscriptions_ GUARDED_BY(subscriptions_mtx_);