From cb83bf5b261e6cf7af435cdbf03e29112104e074 Mon Sep 17 00:00:00 2001 From: lizhimins <707364882@qq.com> Date: Thu, 23 Jan 2025 10:26:10 +0800 Subject: [PATCH] [ISSUE #928] Fix C++ push consumer handle error code --- cpp/examples/ExampleFifoProducer.cpp | 6 +++--- cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp | 5 +++++ cpp/source/rocketmq/SimpleConsumerImpl.cpp | 4 ++-- 3 files changed, 10 insertions(+), 5 deletions(-) diff --git a/cpp/examples/ExampleFifoProducer.cpp b/cpp/examples/ExampleFifoProducer.cpp index 1e7829d46..1876ebb16 100644 --- a/cpp/examples/ExampleFifoProducer.cpp +++ b/cpp/examples/ExampleFifoProducer.cpp @@ -105,8 +105,8 @@ int main(int argc, char* argv[]) { gflags::ParseCommandLineFlags(&argc, &argv, true); auto& logger = getLogger(); - logger.setConsoleLevel(Level::Debug); - logger.setLevel(Level::Debug); + logger.setConsoleLevel(Level::Info); + logger.setLevel(Level::Info); logger.init(); // Access Key/Secret pair may be acquired from management console @@ -172,7 +172,7 @@ int main(int argc, char* argv[]) { semaphore->acquire(); producer.send(std::move(message), callback); - std::cout << "Cached No." << i << " message" << std::endl; + // std::cout << "Cached No." << i << " message" << std::endl; } } catch (...) { std::cerr << "Ah...No!!!" << std::endl; diff --git a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp index 1c96b0954..f68c9f88f 100644 --- a/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp +++ b/cpp/source/rocketmq/AsyncReceiveMessageCallback.cpp @@ -51,6 +51,11 @@ void AsyncReceiveMessageCallback::onCompletion(const std::error_code& ec, const return; } + if (ec == ErrorCode::NoContent) { + checkThrottleThenReceive(); + return; + } + if (ec) { SPDLOG_WARN("Receive message from {} failed. Cause: {}. Retry after 1 second.", process_queue->simpleName(), ec.message()); receiveMessageLater(std::chrono::seconds (1)); diff --git a/cpp/source/rocketmq/SimpleConsumerImpl.cpp b/cpp/source/rocketmq/SimpleConsumerImpl.cpp index df0607936..7c51afcb4 100644 --- a/cpp/source/rocketmq/SimpleConsumerImpl.cpp +++ b/cpp/source/rocketmq/SimpleConsumerImpl.cpp @@ -95,11 +95,11 @@ void SimpleConsumerImpl::start() { } }; - // refer java sdk: set refresh interval to 30 seconds + // refer java sdk: set refresh interval to 5 seconds // org.apache.rocketmq.client.java.impl.ClientImpl#startUp refresh_assignment_task_ = manager()->getScheduler()->schedule( refresh_assignment_task, "RefreshAssignmentTask", - std::chrono::minutes(5), std::chrono::seconds(5)); + std::chrono::seconds(5), std::chrono::minutes(5)); client_manager_->addClientObserver(shared_from_this()); }