Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix #639 fix semantics of topicOfInterest #642

Merged
merged 1 commit into from
Dec 1, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions cpp/.gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -18,3 +18,4 @@ bazel-rocketmq-client-cpp
/bazel-*
/compile_commands.json
/.cache/
.clangd
2 changes: 1 addition & 1 deletion cpp/source/rocketmq/Producer.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ ProducerBuilder& ProducerBuilder::withConfiguration(Configuration configuration)
}

ProducerBuilder& ProducerBuilder::withTopics(const std::vector<std::string>& topics) {
impl_->topicsOfInterest(topics);
impl_->withTopics(topics);
return *this;
}

Expand Down
18 changes: 16 additions & 2 deletions cpp/source/rocketmq/ProducerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
*/
#include "ProducerImpl.h"

#include <algorithm>
#include <apache/rocketmq/v2/definition.pb.h>

#include <atomic>
Expand Down Expand Up @@ -575,9 +576,22 @@ void ProducerImpl::onOrphanedTransactionalMessage(MessageConstSharedPtr message)
}
}

void ProducerImpl::topicsOfInterest(std::vector<std::string> topics) {
void ProducerImpl::topicsOfInterest(std::vector<std::string> &topics) {
absl::MutexLock lk(&topics_mtx_);
topics_.swap(topics);
for (auto& topic : topics_) {
aaron-ai marked this conversation as resolved.
Show resolved Hide resolved
if (std::find(topics.begin(), topics.end(), topic) == topics.end()) {
topics.push_back(topic);
}
}
}

void ProducerImpl::withTopics(const std::vector<std::string> &topics) {
absl::MutexLock lk(&topics_mtx_);
for (auto &topic: topics) {
if (std::find(topics_.begin(), topics_.end(), topic) == topics_.end()) {
topics_.push_back(topic);
}
}
}

void ProducerImpl::buildClientSettings(rmq::Settings& settings) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/source/rocketmq/PushConsumerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,7 @@ PushConsumerImpl::~PushConsumerImpl() {
shutdown();
}

void PushConsumerImpl::topicsOfInterest(std::vector<std::string> topics) {
void PushConsumerImpl::topicsOfInterest(std::vector<std::string> &topics) {
absl::MutexLock lk(&topic_filter_expression_table_mtx_);
for (const auto& entry : topic_filter_expression_table_) {
topics.push_back(entry.first);
Expand Down
2 changes: 1 addition & 1 deletion cpp/source/rocketmq/SimpleConsumerImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,7 @@ void SimpleConsumerImpl::buildClientSettings(rmq::Settings& settings) {
}
}

void SimpleConsumerImpl::topicsOfInterest(std::vector<std::string> topics) {
void SimpleConsumerImpl::topicsOfInterest(std::vector<std::string> &topics) {
absl::MutexLock lk(&subscriptions_mtx_);
for (const auto& entry : subscriptions_) {
if (std::find(topics.begin(), topics.end(), entry.first) == topics.end()) {
Expand Down
2 changes: 1 addition & 1 deletion cpp/source/rocketmq/include/ClientImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -167,7 +167,7 @@ class ClientImpl : virtual public Client {
absl::flat_hash_map<std::string, std::unique_ptr<Session>> session_map_ GUARDED_BY(session_map_mtx_);
absl::Mutex session_map_mtx_;

virtual void topicsOfInterest(std::vector<std::string> topics) {
virtual void topicsOfInterest(std::vector<std::string> &topics) {
}

void updateRouteInfo() LOCKS_EXCLUDED(topic_route_table_mtx_);
Expand Down
4 changes: 3 additions & 1 deletion cpp/source/rocketmq/include/ProducerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,9 @@ class ProducerImpl : virtual public ClientImpl, public std::enable_shared_from_t

void buildClientSettings(rmq::Settings& settings) override;

void topicsOfInterest(std::vector<std::string> topics) override LOCKS_EXCLUDED(topics_mtx_);
void topicsOfInterest(std::vector<std::string> &topics) override LOCKS_EXCLUDED(topics_mtx_);

void withTopics(const std::vector<std::string> &topics) LOCKS_EXCLUDED(topics_mtx_);

const PublishStats& stats() const {
return stats_;
Expand Down
2 changes: 1 addition & 1 deletion cpp/source/rocketmq/include/PushConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ class PushConsumerImpl : virtual public ClientImpl, public std::enable_shared_fr

void prepareHeartbeatData(HeartbeatRequest& request) override;

void topicsOfInterest(std::vector<std::string> topics) override LOCKS_EXCLUDED(topic_filter_expression_table_mtx_);
void topicsOfInterest(std::vector<std::string> &topics) override LOCKS_EXCLUDED(topic_filter_expression_table_mtx_);

void start() override;

Expand Down
2 changes: 1 addition & 1 deletion cpp/source/rocketmq/include/SimpleConsumerImpl.h
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ class SimpleConsumerImpl : public ClientImpl, public std::enable_shared_from_thi
}

protected:
void topicsOfInterest(std::vector<std::string> topics) override;
void topicsOfInterest(std::vector<std::string> &topics) override;

private:
absl::flat_hash_map<std::string, FilterExpression> subscriptions_ GUARDED_BY(subscriptions_mtx_);
Expand Down
Loading