diff --git a/cpp/.gitignore b/cpp/.gitignore index b7f10c0f4..551fa09a7 100644 --- a/cpp/.gitignore +++ b/cpp/.gitignore @@ -19,3 +19,4 @@ bazel-rocketmq-client-cpp /compile_commands.json /.cache/ .clangd +build \ No newline at end of file diff --git a/cpp/examples/ExampleFifoProducer.cpp b/cpp/examples/ExampleFifoProducer.cpp index 9d99be367..1e7829d46 100644 --- a/cpp/examples/ExampleFifoProducer.cpp +++ b/cpp/examples/ExampleFifoProducer.cpp @@ -28,7 +28,6 @@ #include "rocketmq/FifoProducer.h" #include "rocketmq/Logger.h" #include "rocketmq/Message.h" -#include "rocketmq/Producer.h" #include "rocketmq/SendReceipt.h" using namespace ROCKETMQ_NAMESPACE; @@ -93,8 +92,8 @@ std::string randomString(std::string::size_type len) { return result; } -DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published"); -DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider"); +DEFINE_string(topic, "FifoTopic", "Topic to which messages are published"); +DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider"); DEFINE_int32(message_body_size, 4096, "Message body size"); DEFINE_uint32(total, 256, "Number of sample messages to publish"); DEFINE_string(access_key, "", "Your access key ID"); diff --git a/cpp/examples/ExampleProducer.cpp b/cpp/examples/ExampleProducer.cpp index 5e20cc12d..8c6011d8e 100644 --- a/cpp/examples/ExampleProducer.cpp +++ b/cpp/examples/ExampleProducer.cpp @@ -51,8 +51,8 @@ std::string randomString(std::string::size_type len) { return result; } -DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published"); -DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider"); +DEFINE_string(topic, "NormalTopic", "Topic to which messages are published"); +DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider"); DEFINE_int32(message_body_size, 4096, "Message body size"); DEFINE_uint32(total, 256, "Number of sample messages to publish"); DEFINE_string(access_key, "", "Your access key ID"); diff --git a/cpp/examples/ExampleProducerWithAsync.cpp b/cpp/examples/ExampleProducerWithAsync.cpp index d88dfc85d..a46fdf43d 100644 --- a/cpp/examples/ExampleProducerWithAsync.cpp +++ b/cpp/examples/ExampleProducerWithAsync.cpp @@ -89,8 +89,8 @@ std::string randomString(std::string::size_type len) { return result; } -DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published"); -DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider"); +DEFINE_string(topic, "NormalTopic", "Topic to which messages are published"); +DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider"); DEFINE_int32(message_body_size, 4096, "Message body size"); DEFINE_uint32(total, 256, "Number of sample messages to publish"); DEFINE_uint32(concurrency, 128, "Concurrency of async send"); diff --git a/cpp/examples/ExampleProducerWithFifoMessage.cpp b/cpp/examples/ExampleProducerWithFifoMessage.cpp index 4fa34f9d8..2d6789ba9 100644 --- a/cpp/examples/ExampleProducerWithFifoMessage.cpp +++ b/cpp/examples/ExampleProducerWithFifoMessage.cpp @@ -48,8 +48,8 @@ std::string randomString(std::string::size_type len) { return result; } -DEFINE_string(topic, "fifo_topic_sample", "Topic to which messages are published"); -DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider"); +DEFINE_string(topic, "FifoTopic", "Topic to which messages are published"); +DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider"); DEFINE_int32(message_body_size, 4096, "Message body size"); DEFINE_uint32(total, 256, "Number of sample messages to publish"); DEFINE_string(access_key, "", "Your access key ID"); diff --git a/cpp/examples/ExampleProducerWithTimedMessage.cpp b/cpp/examples/ExampleProducerWithTimedMessage.cpp index d62374599..ba2a45f7d 100644 --- a/cpp/examples/ExampleProducerWithTimedMessage.cpp +++ b/cpp/examples/ExampleProducerWithTimedMessage.cpp @@ -17,7 +17,6 @@ #include #include #include -#include #include #include #include @@ -50,8 +49,8 @@ std::string randomString(std::string::size_type len) { return result; } -DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published"); -DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider"); +DEFINE_string(topic, "TimerTopic", "Topic to which messages are published"); +DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider"); DEFINE_int32(message_body_size, 4096, "Message body size"); DEFINE_uint32(total, 256, "Number of sample messages to publish"); DEFINE_string(access_key, "", "Your access key ID"); diff --git a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp index 50620c5a0..f595c6efb 100644 --- a/cpp/examples/ExampleProducerWithTransactionalMessage.cpp +++ b/cpp/examples/ExampleProducerWithTransactionalMessage.cpp @@ -48,8 +48,8 @@ std::string randomString(std::string::size_type len) { return result; } -DEFINE_string(topic, "tx_topic_sample", "Topic to which messages are published"); -DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider"); +DEFINE_string(topic, "TransTopic", "Topic to which messages are published"); +DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider"); DEFINE_int32(message_body_size, 4096, "Message body size"); DEFINE_uint32(total, 256, "Number of sample messages to publish"); DEFINE_string(access_key, "", "Your access key ID"); diff --git a/cpp/examples/ExamplePushConsumer.cpp b/cpp/examples/ExamplePushConsumer.cpp index 66a85f4b5..7017ec926 100644 --- a/cpp/examples/ExamplePushConsumer.cpp +++ b/cpp/examples/ExamplePushConsumer.cpp @@ -24,9 +24,9 @@ using namespace ROCKETMQ_NAMESPACE; -DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published"); -DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider"); -DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through your instance management console"); +DEFINE_string(topic, "NormalTopic", "Topic to which messages are published"); +DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider"); +DEFINE_string(group, "PushConsumer", "GroupId, created through your instance management console"); DEFINE_string(access_key, "", "Your access key ID"); DEFINE_string(access_secret, "", "Your access secret"); DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); diff --git a/cpp/examples/ExampleSimpleConsumer.cpp b/cpp/examples/ExampleSimpleConsumer.cpp index aedec71e8..41262ad05 100644 --- a/cpp/examples/ExampleSimpleConsumer.cpp +++ b/cpp/examples/ExampleSimpleConsumer.cpp @@ -23,9 +23,9 @@ using namespace ROCKETMQ_NAMESPACE; -DEFINE_string(topic, "standard_topic_sample", "Topic to which messages are published"); -DEFINE_string(access_point, "121.196.167.124:8081", "Service access URL, provided by your service provider"); -DEFINE_string(group, "CID_standard_topic_sample", "GroupId, created through your instance management console"); +DEFINE_string(topic, "NormalTopic", "Topic to which messages are published"); +DEFINE_string(access_point, "127.0.0.1:8081", "Service access URL, provided by your service provider"); +DEFINE_string(group, "SimpleConsumer", "GroupId, created through your instance management console"); DEFINE_string(access_key, "", "Your access key ID"); DEFINE_string(access_secret, "", "Your access secret"); DEFINE_bool(tls, false, "Use HTTP2 with TLS/SSL"); diff --git a/cpp/source/base/include/InvocationContext.h b/cpp/source/base/include/InvocationContext.h index 0e138c815..dd6864bf1 100644 --- a/cpp/source/base/include/InvocationContext.h +++ b/cpp/source/base/include/InvocationContext.h @@ -81,8 +81,8 @@ struct InvocationContext : public BaseInvocationContext { if (!status.ok() && grpc::StatusCode::DEADLINE_EXCEEDED == status.error_code()) { auto diff = - std::chrono::duration_cast(std::chrono::system_clock::now() - context.deadline()) - .count(); + std::chrono::duration_cast( + std::chrono::system_clock::now() - context.deadline()).count(); SPDLOG_WARN("Asynchronous RPC[{}.{}] timed out, elapsing {}ms, deadline-over-due: {}ms", absl::FormatTime(created_time, absl::UTCTimeZone()), elapsed, diff); } diff --git a/cpp/source/client/ClientManagerImpl.cpp b/cpp/source/client/ClientManagerImpl.cpp index 7d724c7b0..bb1e2e671 100644 --- a/cpp/source/client/ClientManagerImpl.cpp +++ b/cpp/source/client/ClientManagerImpl.cpp @@ -48,6 +48,7 @@ ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool with_s state_(State::CREATED), callback_thread_pool_(absl::make_unique(std::thread::hardware_concurrency())), with_ssl_(with_ssl) { + certificate_verifier_ = grpc::experimental::ExternalCertificateVerifier::Create(); tls_channel_credential_options_.set_verify_server_certs(false); tls_channel_credential_options_.set_check_call_host(false); @@ -78,7 +79,7 @@ ClientManagerImpl::ClientManagerImpl(std::string resource_namespace, bool with_s */ channel_arguments_.SetInt(GRPC_ARG_ENABLE_RETRIES, 0); - channel_arguments_.SetSslTargetNameOverride("localhost"); + // channel_arguments_.SetSslTargetNameOverride("localhost"); SPDLOG_INFO("ClientManager[ResourceNamespace={}] created", resource_namespace_); } @@ -282,7 +283,7 @@ bool ClientManagerImpl::send(const std::string& target_host, SendMessageRequest& request, SendResultCallback cb) { assert(cb); - SPDLOG_DEBUG("Prepare to send message to {} asynchronously. Request: {}", target_host, request.DebugString()); + SPDLOG_DEBUG("Prepare to send message to {} asynchronously. Request: {}", target_host, request.ShortDebugString()); RpcClientSharedPtr client = getRpcClient(target_host); // Invocation context will be deleted in its onComplete() method. auto invocation_context = new InvocationContext(); @@ -440,7 +441,7 @@ bool ClientManagerImpl::send(const std::string& target_host, case rmq::Code::MESSAGE_PROPERTY_CONFLICT_WITH_TYPE: { SPDLOG_WARN("Message-property-conflict-with-type: Host={}, Response={}", invocation_context->remote_address, - invocation_context->response.DebugString()); + invocation_context->response.ShortDebugString()); send_result.ec = ErrorCode::MessagePropertyConflictWithType; break; } @@ -482,7 +483,7 @@ RpcClientSharedPtr ClientManagerImpl::getRpcClient(const std::string& target_hos auto search = rpc_clients_.find(target_host); if (search == rpc_clients_.end() || !search->second->ok()) { if (search == rpc_clients_.end()) { - SPDLOG_INFO("Create a RPC client to {}", target_host.data()); + SPDLOG_INFO("Create a RPC client to [{}]", target_host.data()); } else if (!search->second->ok()) { SPDLOG_INFO("Prior RPC client to {} is not OK. Re-create one", target_host); } @@ -549,7 +550,7 @@ void ClientManagerImpl::resolveRoute(const std::string& target_host, std::chrono::milliseconds timeout, const std::function& cb) { SPDLOG_DEBUG("Name server connection URL: {}", target_host); - SPDLOG_DEBUG("Query route request: {}", request.DebugString()); + SPDLOG_DEBUG("Query route request: {}", request.ShortDebugString()); RpcClientSharedPtr client = getRpcClient(target_host, false); if (!client) { SPDLOG_WARN("Failed to create RPC client for name server[host={}]", target_host); diff --git a/cpp/source/client/LogInterceptor.cpp b/cpp/source/client/LogInterceptor.cpp index 770286451..9e6eded0c 100644 --- a/cpp/source/client/LogInterceptor.cpp +++ b/cpp/source/client/LogInterceptor.cpp @@ -52,8 +52,8 @@ void LogInterceptor::Intercept(grpc::experimental::InterceptorBatchMethods* meth if (methods->QueryInterceptionHookPoint(grpc::experimental::InterceptionHookPoints::PRE_SEND_INITIAL_METADATA)) { std::multimap* metadata = methods->GetSendInitialMetadata(); if (metadata) { - SPDLOG_DEBUG("[Outbound]Headers of {}: \n{}", client_rpc_info_->method(), - absl::StrJoin(*metadata, "\n", absl::PairFormatter(" --> "))); + SPDLOG_DEBUG("[Outbound]Headers of {}: {}", client_rpc_info_->method(), + absl::StrJoin(*metadata, " ", absl::PairFormatter(" --> "))); } } @@ -73,8 +73,8 @@ void LogInterceptor::Intercept(grpc::experimental::InterceptorBatchMethods* meth absl::string_view(it.second.data(), it.second.length())}); } if (!response_headers.empty()) { - SPDLOG_DEBUG("[Inbound]Response Headers of {}:\n{}", client_rpc_info_->method(), - absl::StrJoin(response_headers, "\n", absl::PairFormatter(" --> "))); + SPDLOG_DEBUG("[Inbound]Response Headers of {}: {}", client_rpc_info_->method(), + absl::StrJoin(response_headers, " ", absl::PairFormatter(" --> "))); } else { SPDLOG_DEBUG("[Inbound]Response metadata of {} is empty", client_rpc_info_->method()); } @@ -85,12 +85,12 @@ void LogInterceptor::Intercept(grpc::experimental::InterceptorBatchMethods* meth void* message = methods->GetRecvMessage(); if (message) { auto* response = reinterpret_cast(message); - std::string&& response_text = response->DebugString(); + std::string&& response_text = response->ShortDebugString(); std::size_t limit = 1024; if (response_text.size() <= limit) { - SPDLOG_DEBUG("[Inbound] {}\n{}", client_rpc_info_->method(), response_text); + SPDLOG_DEBUG("[Inbound] {} {}", client_rpc_info_->method(), response_text); } else { - SPDLOG_DEBUG("[Inbound] {}\n{}...", client_rpc_info_->method(), response_text.substr(0, limit)); + SPDLOG_DEBUG("[Inbound] {} {}...", client_rpc_info_->method(), response_text.substr(0, limit)); } } } diff --git a/cpp/source/client/RpcClientImpl.cpp b/cpp/source/client/RpcClientImpl.cpp index 35016c349..d9f102127 100644 --- a/cpp/source/client/RpcClientImpl.cpp +++ b/cpp/source/client/RpcClientImpl.cpp @@ -16,7 +16,6 @@ */ #include "RpcClientImpl.h" -#include #include #include #include @@ -26,7 +25,6 @@ #include "RpcClient.h" #include "TelemetryBidiReactor.h" #include "TlsHelper.h" -#include "absl/time/time.h" ROCKETMQ_NAMESPACE_BEGIN diff --git a/cpp/source/client/SessionImpl.cpp b/cpp/source/client/SessionImpl.cpp index b3f8b73bc..151f1c3f3 100644 --- a/cpp/source/client/SessionImpl.cpp +++ b/cpp/source/client/SessionImpl.cpp @@ -34,7 +34,7 @@ bool SessionImpl::await() { void SessionImpl::syncSettings() { auto ptr = client_.lock(); - SPDLOG_INFO("Sync client settings to {}", rpc_client_->remoteAddress()); + SPDLOG_INFO("Request client settings to {}", rpc_client_->remoteAddress()); TelemetryCommand command; command.mutable_settings()->CopyFrom(ptr->clientSettings()); telemetry_->write(command); diff --git a/cpp/source/client/TelemetryBidiReactor.cpp b/cpp/source/client/TelemetryBidiReactor.cpp index e0a83a28b..27557cf2e 100644 --- a/cpp/source/client/TelemetryBidiReactor.cpp +++ b/cpp/source/client/TelemetryBidiReactor.cpp @@ -16,15 +16,12 @@ */ #include "TelemetryBidiReactor.h" -#include -#include #include #include #include "ClientManager.h" #include "MessageExt.h" #include "Metadata.h" -#include "RpcClient.h" #include "Signature.h" #include "google/protobuf/util/time_util.h" #include "rocketmq/Logger.h" @@ -70,7 +67,7 @@ void TelemetryBidiReactor::OnWriteDone(bool ok) { RemoveHold(); if (!ok) { - SPDLOG_WARN("Failed to write telemetry command {} to {}", writes_.front().DebugString(), peer_address_); + SPDLOG_WARN("Failed to write telemetry command {} to {}", writes_.front().ShortDebugString(), peer_address_); signalClose(); return; } @@ -91,7 +88,7 @@ void TelemetryBidiReactor::OnReadDone(bool ok) { if (!ok) { // for read stream RemoveHold(); - SPDLOG_WARN("Failed to read from telemetry stream from {}", peer_address_); + // SPDLOG_WARN("Failed to read from telemetry stream from {}", peer_address_); signalClose(); return; } @@ -103,7 +100,7 @@ void TelemetryBidiReactor::OnReadDone(bool ok) { } } - SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_, read_.DebugString()); + SPDLOG_DEBUG("Read a telemetry command from {}: {}", peer_address_, read_.ShortDebugString()); auto client = client_.lock(); if (!client) { SPDLOG_INFO("Client for {} has destructed", peer_address_); @@ -114,19 +111,20 @@ void TelemetryBidiReactor::OnReadDone(bool ok) { switch (read_.command_case()) { case rmq::TelemetryCommand::kSettings: { auto settings = read_.settings(); - SPDLOG_INFO("Received settings from {}: {}", peer_address_, settings.DebugString()); + SPDLOG_INFO("Receive settings from {}: {}", peer_address_, settings.ShortDebugString()); applySettings(settings); sync_settings_promise_.set_value(true); break; } + case rmq::TelemetryCommand::kRecoverOrphanedTransactionCommand: { - SPDLOG_DEBUG("Receive orphan transaction command: {}", read_.DebugString()); - auto message = client->manager()->wrapMessage(read_.release_verify_message_command()->message()); + SPDLOG_INFO("Receive orphan transaction command: {}", read_.ShortDebugString()); + auto message = client->manager()->wrapMessage( + read_.recover_orphaned_transaction_command().message()); auto raw = const_cast(message.get()); raw->mutableExtension().target_endpoint = peer_address_; raw->mutableExtension().transaction_id = read_.recover_orphaned_transaction_command().transaction_id(); client->recoverOrphanedTransaction(message); - break; } @@ -156,7 +154,7 @@ void TelemetryBidiReactor::OnReadDone(bool ok) { } default: { - SPDLOG_WARN("Unsupported command"); + SPDLOG_WARN("Telemetry command receive unsupported command"); break; } } @@ -291,7 +289,7 @@ void TelemetryBidiReactor::tryWriteNext() { } if (!writes_.empty()) { - SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, writes_.front().DebugString()); + SPDLOG_DEBUG("Writing telemetry command to {}: {}", peer_address_, writes_.front().ShortDebugString()); AddHold(); StartWrite(&(writes_.front())); } diff --git a/cpp/source/client/include/TopicRouteData.h b/cpp/source/client/include/TopicRouteData.h index 807ac811b..aac41f1c1 100644 --- a/cpp/source/client/include/TopicRouteData.h +++ b/cpp/source/client/include/TopicRouteData.h @@ -43,7 +43,7 @@ class TopicRouteData { std::string debugString() const { return absl::StrJoin(message_queues_.begin(), message_queues_.end(), ",", - [](std::string* out, const rmq::MessageQueue& m) { out->append(m.DebugString()); }); + [](std::string* out, const rmq::MessageQueue& m) { out->append(m.ShortDebugString()); }); }; private: diff --git a/cpp/source/log/LoggerImpl.cpp b/cpp/source/log/LoggerImpl.cpp index 6ff39f105..cdd2f473e 100644 --- a/cpp/source/log/LoggerImpl.cpp +++ b/cpp/source/log/LoggerImpl.cpp @@ -131,6 +131,7 @@ Logger& getLogger() { const std::size_t LoggerImpl::DEFAULT_MAX_LOG_FILE_QUANTITY = 16; const std::size_t LoggerImpl::DEFAULT_FILE_SIZE = 1048576 * 256; const char* LoggerImpl::USER_HOME_ENV = "HOME"; -const char* LoggerImpl::DEFAULT_PATTERN = "[%Y/%m/%d-%H:%M:%S.%e %z] [%n] [%^---%L---%$] [thread %t] %v %@"; +const char* LoggerImpl::DEFAULT_PATTERN = "%Y-%m-%d %H:%M:%S.%e [%^--%L--%$] [%7t] %v %@"; +// const char* LoggerImpl::DEFAULT_PATTERN = "[%Y/%m/%d-%H:%M:%S.%e %z] [%n] [%^---%L---%$] [thread %t] %v %@"; ROCKETMQ_NAMESPACE_END \ No newline at end of file diff --git a/cpp/source/rocketmq/ClientImpl.cpp b/cpp/source/rocketmq/ClientImpl.cpp index 5be559b6d..5fb66877d 100644 --- a/cpp/source/rocketmq/ClientImpl.cpp +++ b/cpp/source/rocketmq/ClientImpl.cpp @@ -19,11 +19,9 @@ #include #include #include -#include #include #include #include -#include #include #include #include @@ -36,16 +34,12 @@ #include "NamingScheme.h" #include "SessionImpl.h" #include "Signature.h" -#include "StdoutHandler.h" #include "UtilAll.h" #include "absl/strings/numbers.h" #include "absl/strings/str_join.h" #include "absl/strings/str_split.h" #include "fmt/format.h" #include "opencensus/stats/stats.h" -#include "rocketmq/Logger.h" -#include "rocketmq/Message.h" -#include "rocketmq/MessageListener.h" #include "spdlog/spdlog.h" ROCKETMQ_NAMESPACE_BEGIN @@ -175,12 +169,14 @@ void ClientImpl::start() { auto telemetry_functor = [ptr]() { std::shared_ptr base = ptr.lock(); if (base) { - SPDLOG_INFO("Sync client settings to servers"); + SPDLOG_DEBUG("Sync client settings to servers"); base->syncClientSettings(); } }; - telemetry_handle_ = client_manager_->getScheduler()->schedule(telemetry_functor, TELEMETRY_TASK_NAME, - std::chrono::minutes(5), std::chrono::minutes(5)); + + telemetry_handle_ = client_manager_->getScheduler()->schedule( + telemetry_functor, TELEMETRY_TASK_NAME, + std::chrono::minutes(5), std::chrono::minutes(5)); auto&& metric_service_endpoint = metricServiceEndpoint(); if (!metric_service_endpoint.empty()) { diff --git a/cpp/source/rocketmq/ProducerImpl.cpp b/cpp/source/rocketmq/ProducerImpl.cpp index 34c5b29c6..9b664d598 100644 --- a/cpp/source/rocketmq/ProducerImpl.cpp +++ b/cpp/source/rocketmq/ProducerImpl.cpp @@ -227,7 +227,10 @@ SendReceipt ProducerImpl::send(MessageConstPtr message, std::error_code& ec) noe auto callback = [&, mtx, cv](const std::error_code& code, const SendReceipt& receipt) mutable { ec = code; SendReceipt& receipt_mut = const_cast(receipt); + send_receipt.target = std::move(receipt_mut.target); + send_receipt.message_id = std::move(receipt_mut.message_id); send_receipt.message = std::move(receipt_mut.message); + send_receipt.transaction_id = std::move(receipt_mut.transaction_id); { absl::MutexLock lk(mtx.get()); completed = true; @@ -354,7 +357,7 @@ void ProducerImpl::sendImpl(std::shared_ptr context) { client_manager_->send(target, metadata, request, callback); } -void ProducerImpl::send0(MessageConstPtr message, SendCallback callback, std::vector list) { +void ProducerImpl::send0(MessageConstPtr message, const SendCallback& callback, std::vector list) { SendReceipt send_receipt; std::error_code ec; validate(*message, ec); @@ -371,7 +374,8 @@ void ProducerImpl::send0(MessageConstPtr message, SendCallback callback, std::ve return; } - auto context = std::make_shared(shared_from_this(), std::move(message), callback, std::move(list)); + auto context = std::make_shared( + shared_from_this(), std::move(message), callback, std::move(list)); sendImpl(context); } diff --git a/cpp/source/rocketmq/include/ClientImpl.h b/cpp/source/rocketmq/include/ClientImpl.h index d7693962c..25cef46c2 100644 --- a/cpp/source/rocketmq/include/ClientImpl.h +++ b/cpp/source/rocketmq/include/ClientImpl.h @@ -147,6 +147,7 @@ class ClientImpl : virtual public Client { absl::flat_hash_map>> inflight_route_requests_ GUARDED_BY(inflight_route_requests_mtx_); absl::Mutex inflight_route_requests_mtx_ ACQUIRED_BEFORE(topic_route_table_mtx_); // Protects inflight_route_requests_ + static const char* UPDATE_ROUTE_TASK_NAME; std::uint32_t route_update_handle_{0}; diff --git a/cpp/source/rocketmq/include/ProducerImpl.h b/cpp/source/rocketmq/include/ProducerImpl.h index b572f20d6..2c284172d 100644 --- a/cpp/source/rocketmq/include/ProducerImpl.h +++ b/cpp/source/rocketmq/include/ProducerImpl.h @@ -176,7 +176,7 @@ class ProducerImpl : virtual public ClientImpl, public std::enable_shared_from_t void validate(const Message& message, std::error_code& ec); - void send0(MessageConstPtr message, SendCallback callback, std::vector list); + void send0(MessageConstPtr message, const SendCallback& callback, std::vector list); void isolatedEndpoints(absl::flat_hash_set& endpoints) LOCKS_EXCLUDED(isolated_endpoints_mtx_);