Skip to content

Commit

Permalink
[#3989] YBClient sbould wait for all sync operation to complete upon …
Browse files Browse the repository at this point in the history
…shutdown

Summary:
In YSQL layer we could run YBCPgGetCatalogMasterVersion and close client immediately.
It results in heap-use-after-free error:

```
[ts-1]     #0 0x7f006c05b5a5 in std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> >::basic_string(std::__1::basic_string<char, std::__1::char_traits<char>, std::__1::allocator<char> > const&) (/opt/yb-build/thirdparty/yugabyte-db-thirdparty-v20191209181439-7fc63d1583-centos/installed/asan/libcxx/lib/libc++.so.1+0x2065a5)
[ts-1]     #1 0x7f007d8cc059 in yb::HostPort::HostPort(yb::HostPort const&) /nfusr/centos-gcp-cloud/jenkins-slave-150/jenkins/jenkins-github-yugabyte-db-centos-master-clang-asan-758/build/asan-clang-dynamic-ninja/../../src/yb/util/net/net_util.h:49:7
[ts-1]     #2 0x7f007d8f61b7 in yb::client::YBClient::Data::leader_master_hostport() const /nfusr/centos-gcp-cloud/jenkins-slave-150/jenkins/jenkins-github-yugabyte-db-centos-master-clang-asan-758/build/asan-clang-dynamic-ninja/../../src/yb/client/client-internal.cc:1825:10
[ts-1]     #3 0x7f007d9306a5 in yb::Status yb::client::YBClient::Data::SyncLeaderMasterRpc<yb::master::GetYsqlCatalogConfigRequestPB, yb::master::GetYsqlCatalogConfigResponsePB>(std::__1::chrono::time_point<yb::CoarseMonoClock, std::__1::chrono::duration<long long, std::__1::ratio<1l, 1000000000l> > >, yb::client::YBClient*, yb::master::GetYsqlCatalogConfigRequestPB const&, yb::master::GetYsqlCatalogConfigResponsePB*, int*, char const*, std::__1::function<yb::Status (yb::master::MasterServiceProxy*, yb::master::GetYsqlCatalogConfigRequestPB const&, yb::master::GetYsqlCatalogConfigResponsePB*, yb::rpc::RpcController*)> const&) /nfusr/centos-gcp-cloud/jenkins-slave-150/jenkins/jenkins-github-yugabyte-db-centos-master-clang-asan-758/build/asan-clang-dynamic-ninja/../../src/yb/client/client-internal.cc:192:40
[ts-1]     #4 0x7f007d8abc33 in yb::client::YBClient::GetYsqlCatalogMasterVersion(unsigned long*) /nfusr/centos-gcp-cloud/jenkins-slave-150/jenkins/jenkins-github-yugabyte-db-centos-master-clang-asan-758/build/asan-clang-dynamic-ninja/../../src/yb/client/client.cc:654:3
[ts-1]     #5 0x7f00800a14f5 in yb::pggate::PgSession::GetCatalogMasterVersion(unsigned long*) /nfusr/centos-gcp-cloud/jenkins-slave-150/jenkins/jenkins-github-yugabyte-db-centos-master-clang-asan-758/build/asan-clang-dynamic-ninja/../../src/yb/yql/pggate/pg_session.cc:457:19
[ts-1]     #6 0x7f008007be7c in yb::pggate::PgApiImpl::GetCatalogMasterVersion(unsigned long*) /nfusr/centos-gcp-cloud/jenkins-slave-150/jenkins/jenkins-github-yugabyte-db-centos-master-clang-asan-758/build/asan-clang-dynamic-ninja/../../src/yb/yql/pggate/pggate.cc:342:23
[ts-1]     #7 0x7f0080067f27 in YBCPgGetCatalogMasterVersion /nfusr/centos-gcp-cloud/jenkins-slave-150/jenkins/jenkins-github-yugabyte-db-centos-master-clang-asan-758/build/asan-clang-dynamic-ninja/../../src/yb/yql/pggate/ybc_pggate.cc:179:29
```

Fixed by forcing YBClient to wait for all sync operations to complete.

Test Plan: ybd asan --gtest_filter PgLibPqTest.TxnConflictsForColocatedTables

Reviewers: dmitry

Reviewed By: dmitry

Subscribers: ybase

Differential Revision: https://phabricator.dev.yugabyte.com/D8178
  • Loading branch information
spolitov committed Mar 25, 2020
1 parent 4f44fbf commit e9f9924
Show file tree
Hide file tree
Showing 6 changed files with 77 additions and 55 deletions.
85 changes: 55 additions & 30 deletions src/yb/client/client-internal.cc
Original file line number Diff line number Diff line change
Expand Up @@ -73,6 +73,7 @@
#include "yb/util/flags.h"
#include "yb/util/flag_tags.h"
#include "yb/util/net/net_util.h"
#include "yb/util/scope_exit.h"
#include "yb/util/thread_restrictions.h"

using namespace std::literals;
Expand Down Expand Up @@ -147,14 +148,23 @@ Status RetryFunc(

template <class ReqClass, class RespClass>
Status YBClient::Data::SyncLeaderMasterRpc(
CoarseTimePoint deadline, YBClient* client, const ReqClass& req, RespClass* resp,
CoarseTimePoint deadline, const ReqClass& req, RespClass* resp,
int* num_attempts, const char* func_name,
const std::function<Status(MasterServiceProxy*, const ReqClass&, RespClass*, RpcController*)>&
func) {
running_sync_requests_.fetch_add(1, std::memory_order_acquire);
auto se = ScopeExit([this] {
running_sync_requests_.fetch_sub(1, std::memory_order_acquire);
});

DSCHECK(deadline != CoarseTimePoint(), InvalidArgument, "Deadline is not set");
CoarseTimePoint start_time;

while (true) {
if (closing_.load(std::memory_order_acquire)) {
return STATUS(Aborted, "Client is shutting down");
}

RpcController rpc;

// Have we already exceeded our deadline?
Expand All @@ -173,7 +183,7 @@ Status YBClient::Data::SyncLeaderMasterRpc(
// leader master and retry before the overall deadline expires.
//
// TODO: KUDU-683 tracks cleanup for this.
auto rpc_deadline = now + client->default_rpc_timeout();
auto rpc_deadline = now + default_rpc_timeout_;
rpc.set_deadline(std::min(rpc_deadline, deadline));

if (num_attempts != nullptr) {
Expand All @@ -191,9 +201,9 @@ Status YBClient::Data::SyncLeaderMasterRpc(
<< "Unable to send the request " << req.GetTypeName() << " (" << req.ShortDebugString()
<< ") to leader Master (" << leader_master_hostport().ToString()
<< "): " << s;
if (client->IsMultiMaster()) {
if (IsMultiMaster()) {
YB_LOG_EVERY_N_SECS(INFO, 1) << "Determining the new leader Master and retrying...";
WARN_NOT_OK(SetMasterServerProxy(client, deadline),
WARN_NOT_OK(SetMasterServerProxy(deadline),
"Unable to determine the new leader Master");
}
continue;
Expand All @@ -206,9 +216,9 @@ Status YBClient::Data::SyncLeaderMasterRpc(
<< "Unable to send the request (" << req.ShortDebugString()
<< ") to leader Master (" << leader_master_hostport().ToString()
<< "): " << s.ToString();
if (client->IsMultiMaster()) {
if (IsMultiMaster()) {
YB_LOG_EVERY_N_SECS(INFO, 1) << "Determining the new leader Master and retrying...";
WARN_NOT_OK(SetMasterServerProxy(client, deadline),
WARN_NOT_OK(SetMasterServerProxy(deadline),
"Unable to determine the new leader Master");
}
continue;
Expand All @@ -223,9 +233,9 @@ Status YBClient::Data::SyncLeaderMasterRpc(
if (s.ok() && resp->has_error()) {
if (resp->error().code() == MasterErrorPB::NOT_THE_LEADER ||
resp->error().code() == MasterErrorPB::CATALOG_MANAGER_NOT_INITIALIZED) {
if (client->IsMultiMaster()) {
if (IsMultiMaster()) {
YB_LOG_EVERY_N_SECS(INFO, 1) << "Determining the new leader Master and retrying...";
WARN_NOT_OK(SetMasterServerProxy(client, deadline),
WARN_NOT_OK(SetMasterServerProxy(deadline),
"Unable to determine the new leader Master");
}
continue;
Expand All @@ -241,7 +251,7 @@ Status YBClient::Data::SyncLeaderMasterRpc(
using yb::master::RequestTypePB; \
using yb::master::ResponseTypePB; \
template Status YBClient::Data::SyncLeaderMasterRpc( \
CoarseTimePoint deadline, YBClient* client, const RequestTypePB& req, \
CoarseTimePoint deadline, const RequestTypePB& req, \
ResponseTypePB* resp, int* num_attempts, const char* func_name, \
const std::function<Status( \
MasterServiceProxy*, const RequestTypePB&, ResponseTypePB*, RpcController*)>& \
Expand Down Expand Up @@ -442,7 +452,7 @@ Status YBClient::Data::CreateTable(YBClient* client,

int attempts = 0;
Status s = SyncLeaderMasterRpc<CreateTableRequestPB, CreateTableResponsePB>(
deadline, client, req, &resp, &attempts, "CreateTable", &MasterServiceProxy::CreateTable);
deadline, req, &resp, &attempts, "CreateTable", &MasterServiceProxy::CreateTable);
// Set the table id even if there was an error. This is useful when the error is IsAlreadyPresent
// so that we can wait for the existing table to be available to receive requests.
*table_id = resp.table_id();
Expand Down Expand Up @@ -540,7 +550,6 @@ Status YBClient::Data::IsCreateTableInProgress(YBClient* client,
const Status s =
SyncLeaderMasterRpc<IsCreateTableDoneRequestPB, IsCreateTableDoneResponsePB>(
deadline,
client,
req,
&resp,
nullptr /* num_attempts */,
Expand Down Expand Up @@ -587,8 +596,7 @@ Status YBClient::Data::DeleteTable(YBClient* client,
}
req.set_is_index_table(is_index_table);
const Status s = SyncLeaderMasterRpc<DeleteTableRequestPB, DeleteTableResponsePB>(
deadline, client, req, &resp,
&attempts, "DeleteTable", &MasterServiceProxy::DeleteTable);
deadline, req, &resp, &attempts, "DeleteTable", &MasterServiceProxy::DeleteTable);

// Handle special cases based on resp.error().
if (resp.has_error()) {
Expand Down Expand Up @@ -634,7 +642,6 @@ Status YBClient::Data::IsDeleteTableInProgress(YBClient* client,
const Status s =
SyncLeaderMasterRpc<IsDeleteTableDoneRequestPB, IsDeleteTableDoneResponsePB>(
deadline,
client,
req,
&resp,
nullptr /* num_attempts */,
Expand Down Expand Up @@ -675,7 +682,7 @@ Status YBClient::Data::TruncateTables(YBClient* client,
req.add_table_ids(table_id);
}
RETURN_NOT_OK((SyncLeaderMasterRpc<TruncateTableRequestPB, TruncateTableResponsePB>(
deadline, client, req, &resp, nullptr /* num_attempts */, "TruncateTable",
deadline, req, &resp, nullptr /* num_attempts */, "TruncateTable",
&MasterServiceProxy::TruncateTable)));
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
Expand All @@ -702,7 +709,7 @@ Status YBClient::Data::IsTruncateTableInProgress(YBClient* client,

req.set_table_id(table_id);
RETURN_NOT_OK((SyncLeaderMasterRpc<IsTruncateTableDoneRequestPB, IsTruncateTableDoneResponsePB>(
deadline, client, req, &resp, nullptr /* num_attempts */, "IsTruncateTableDone",
deadline, req, &resp, nullptr /* num_attempts */, "IsTruncateTableDone",
&MasterServiceProxy::IsTruncateTableDone)));
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
Expand All @@ -728,7 +735,6 @@ Status YBClient::Data::AlterNamespace(YBClient* client,
Status s =
SyncLeaderMasterRpc<AlterNamespaceRequestPB, AlterNamespaceResponsePB>(
deadline,
client,
req,
&resp,
nullptr /* num_attempts */,
Expand All @@ -748,7 +754,6 @@ Status YBClient::Data::AlterTable(YBClient* client,
Status s =
SyncLeaderMasterRpc<AlterTableRequestPB, AlterTableResponsePB>(
deadline,
client,
req,
&resp,
nullptr /* num_attempts */,
Expand Down Expand Up @@ -785,7 +790,6 @@ Status YBClient::Data::IsAlterTableInProgress(YBClient* client,
Status s =
SyncLeaderMasterRpc<IsAlterTableDoneRequestPB, IsAlterTableDoneResponsePB>(
deadline,
client,
req,
&resp,
nullptr /* num_attempts */,
Expand Down Expand Up @@ -827,7 +831,7 @@ Status YBClient::Data::FlushTable(YBClient* client,
}
req.set_is_compaction(is_compaction);
RETURN_NOT_OK((SyncLeaderMasterRpc<FlushTablesRequestPB, FlushTablesResponsePB>(
deadline, client, req, &resp, &attempts, "FlushTables", &MasterServiceProxy::FlushTables)));
deadline, req, &resp, &attempts, "FlushTables", &MasterServiceProxy::FlushTables)));
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
}
Expand All @@ -853,7 +857,7 @@ Status YBClient::Data::IsFlushTableInProgress(YBClient* client,

req.set_flush_request_id(flush_id);
RETURN_NOT_OK((SyncLeaderMasterRpc<IsFlushTablesDoneRequestPB, IsFlushTablesDoneResponsePB>(
deadline, client, req, &resp, nullptr /* num_attempts */, "IsFlushTableDone",
deadline, req, &resp, nullptr /* num_attempts */, "IsFlushTableDone",
&MasterServiceProxy::IsFlushTablesDone)));
if (resp.has_error()) {
return StatusFromPB(resp.error().status());
Expand Down Expand Up @@ -1033,7 +1037,6 @@ ClientMasterRpc::~ClientMasterRpc() {

void ClientMasterRpc::ResetLeaderMasterAndRetry() {
client_->data_->SetMasterServerProxyAsync(
client_,
retrier().deadline(),
false /* skip_resolution */,
Bind(&ClientMasterRpc::NewLeaderMasterDeterminedCb,
Expand Down Expand Up @@ -1592,16 +1595,14 @@ void YBClient::Data::LeaderMasterDetermined(const Status& status,
}
}

Status YBClient::Data::SetMasterServerProxy(YBClient* client,
CoarseTimePoint deadline,
Status YBClient::Data::SetMasterServerProxy(CoarseTimePoint deadline,
bool skip_resolution) {
Synchronizer sync;
SetMasterServerProxyAsync(client, deadline, skip_resolution, sync.AsStatusCallback());
SetMasterServerProxyAsync(deadline, skip_resolution, sync.AsStatusCallback());
return sync.Wait();
}

void YBClient::Data::SetMasterServerProxyAsync(YBClient* client,
CoarseTimePoint deadline,
void YBClient::Data::SetMasterServerProxyAsync(CoarseTimePoint deadline,
bool skip_resolution,
const StatusCallback& cb) {
DCHECK(deadline != CoarseTimePoint::max());
Expand Down Expand Up @@ -1638,7 +1639,7 @@ void YBClient::Data::SetMasterServerProxyAsync(YBClient* client,
// Finding a new master involves a fan-out RPC to each master. A single
// RPC timeout's worth of time should be sufficient, though we'll use
// the provided deadline if it's sooner.
auto leader_master_deadline = CoarseMonoClock::Now() + client->default_rpc_timeout();
auto leader_master_deadline = CoarseMonoClock::Now() + default_rpc_timeout_;
auto actual_deadline = std::min(deadline, leader_master_deadline);

// This ensures that no more than one GetLeaderMasterRpc is in
Expand Down Expand Up @@ -1791,7 +1792,7 @@ Status YBClient::Data::SetReplicationInfo(
GetMasterClusterConfigRequestPB get_req;
GetMasterClusterConfigResponsePB get_resp;
Status s = SyncLeaderMasterRpc<GetMasterClusterConfigRequestPB, GetMasterClusterConfigResponsePB>(
deadline, client, get_req, &get_resp, nullptr /* num_attempts */, "GetMasterClusterConfig",
deadline, get_req, &get_resp, nullptr /* num_attempts */, "GetMasterClusterConfig",
&MasterServiceProxy::GetMasterClusterConfig);
RETURN_NOT_OK(s);
if (get_resp.has_error()) {
Expand All @@ -1808,7 +1809,7 @@ Status YBClient::Data::SetReplicationInfo(

// Try to update it on the live cluster.
s = SyncLeaderMasterRpc<ChangeMasterClusterConfigRequestPB, ChangeMasterClusterConfigResponsePB>(
deadline, client, change_req, &change_resp, nullptr /* num_attempts */,
deadline, change_req, &change_resp, nullptr /* num_attempts */,
"ChangeMasterClusterConfig", &MasterServiceProxy::ChangeMasterClusterConfig);
RETURN_NOT_OK(s);
if (change_resp.has_error()) {
Expand Down Expand Up @@ -1838,5 +1839,29 @@ void YBClient::Data::UpdateLatestObservedHybridTime(uint64_t hybrid_time) {
latest_observed_hybrid_time_.StoreMax(hybrid_time);
}

void YBClient::Data::StartShutdown() {
closing_.store(true, std::memory_order_release);
}

bool YBClient::Data::IsMultiMaster() {
std::lock_guard<simple_spinlock> l(master_server_addrs_lock_);
if (master_server_addrs_.size() > 1) {
return true;
}
// For single entry case, check if it is a list of host/ports.
std::vector<Endpoint> addrs;
const auto status = ParseAddressList(master_server_addrs_[0],
yb::master::kMasterDefaultPort,
&addrs);
return status.ok() && (addrs.size() > 1);
}

void YBClient::Data::CompleteShutdown() {
while (running_sync_requests_.load(std::memory_order_acquire)) {
YB_LOG_EVERY_N_SECS(INFO, 5) << "Waiting sync requests to finish";
std::this_thread::sleep_for(100ms);
}
}

} // namespace client
} // namespace yb
18 changes: 13 additions & 5 deletions src/yb/client/client-internal.h
Original file line number Diff line number Diff line change
Expand Up @@ -241,8 +241,7 @@ class YBClient::Data {
// Invokes 'cb' with the appropriate status when finished.
//
// Works with both a distributed and non-distributed configuration.
void SetMasterServerProxyAsync(YBClient* client,
CoarseTimePoint deadline,
void SetMasterServerProxyAsync(CoarseTimePoint deadline,
bool skip_resolution,
const StatusCallback& cb);

Expand All @@ -253,8 +252,7 @@ class YBClient::Data {
//
// TODO (KUDU-492): Get rid of this method and re-factor the client
// to lazily initialize 'master_proxy_'.
CHECKED_STATUS SetMasterServerProxy(YBClient* client,
CoarseTimePoint deadline,
CHECKED_STATUS SetMasterServerProxy(CoarseTimePoint deadline,
bool skip_resolution = false);

std::shared_ptr<master::MasterServiceProxy> master_proxy() const;
Expand Down Expand Up @@ -296,11 +294,17 @@ class YBClient::Data {
// the resulting Status.
template <class ReqClass, class RespClass>
CHECKED_STATUS SyncLeaderMasterRpc(
CoarseTimePoint deadline, YBClient* client, const ReqClass& req, RespClass* resp,
CoarseTimePoint deadline, const ReqClass& req, RespClass* resp,
int* num_attempts, const char* func_name,
const std::function<Status(
master::MasterServiceProxy*, const ReqClass&, RespClass*, rpc::RpcController*)>& func);

bool IsMultiMaster();

void StartShutdown();

void CompleteShutdown();

rpc::Messenger* messenger_ = nullptr;
std::unique_ptr<rpc::Messenger> messenger_holder_;
std::unique_ptr<rpc::ProxyCache> proxy_cache_;
Expand Down Expand Up @@ -350,6 +354,10 @@ class YBClient::Data {

AtomicInt<uint64_t> latest_observed_hybrid_time_;

std::atomic<bool> closing_{false};

std::atomic<int> running_sync_requests_{0};

// Cloud info indicating placement information of client.
CloudInfoPB cloud_info_pb_;

Expand Down
21 changes: 5 additions & 16 deletions src/yb/client/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,6 @@ using std::shared_ptr;
Status s = data_->SyncLeaderMasterRpc<BOOST_PP_CAT(method, RequestPB), \
BOOST_PP_CAT(method, ResponsePB)>( \
deadline, \
this, \
req, \
&resp, \
nullptr, \
Expand Down Expand Up @@ -357,7 +356,7 @@ Status YBClientBuilder::DoBuild(rpc::Messenger* messenger, std::unique_ptr<YBCli
// time around.
auto deadline = CoarseMonoClock::Now() + c->default_admin_operation_timeout();
RETURN_NOT_OK_PREPEND(
c->data_->SetMasterServerProxy(c.get(), deadline, data_->skip_master_leader_resolution_),
c->data_->SetMasterServerProxy(deadline, data_->skip_master_leader_resolution_),
"Could not locate the leader master");

c->data_->meta_cache_.reset(new MetaCache(c.get()));
Expand Down Expand Up @@ -403,6 +402,7 @@ YBClient::~YBClient() {
}

void YBClient::Shutdown() {
data_->StartShutdown();
if (data_->messenger_holder_) {
data_->messenger_holder_->Shutdown();
}
Expand All @@ -412,6 +412,7 @@ void YBClient::Shutdown() {
if (data_->cb_threadpool_) {
data_->cb_threadpool_->Shutdown();
}
data_->CompleteShutdown();
}

std::unique_ptr<YBTableCreator> YBClient::NewTableCreator() {
Expand Down Expand Up @@ -1227,7 +1228,7 @@ Status YBClient::ListMasters(CoarseTimePoint deadline, std::vector<std::string>*

Result<HostPort> YBClient::RefreshMasterLeaderAddress() {
auto deadline = CoarseMonoClock::Now() + default_admin_operation_timeout();
RETURN_NOT_OK(data_->SetMasterServerProxy(this, deadline));
RETURN_NOT_OK(data_->SetMasterServerProxy(deadline));

return GetMasterLeaderAddress();
}
Expand Down Expand Up @@ -1354,19 +1355,7 @@ shared_ptr<YBSession> YBClient::NewSession() {
}

bool YBClient::IsMultiMaster() const {
std::lock_guard<simple_spinlock> l(data_->master_server_addrs_lock_);
if (data_->master_server_addrs_.size() > 1) {
return true;
}
// For single entry case, check if it is a list of host/ports.
vector<Endpoint> addrs;
const auto status = ParseAddressList(data_->master_server_addrs_[0],
yb::master::kMasterDefaultPort,
&addrs);
if (!status.ok()) {
return false;
}
return addrs.size() > 1;
return data_->IsMultiMaster();
}

Result<int> YBClient::NumTabletsForUserTable(TableType table_type) {
Expand Down
1 change: 0 additions & 1 deletion src/yb/client/meta_cache.cc
Original file line number Diff line number Diff line change
Expand Up @@ -596,7 +596,6 @@ void LookupRpc::SendRpc() {

void LookupRpc::ResetMasterLeaderAndRetry() {
client()->data_->SetMasterServerProxyAsync(
client(),
retrier().deadline(),
false /* skip_resolution */,
Bind(&LookupRpc::NewLeaderMasterDeterminedCb,
Expand Down
Loading

0 comments on commit e9f9924

Please sign in to comment.