From 58d0278ab4dbbf2f7c9ccc5bea671bb44c504cc6 Mon Sep 17 00:00:00 2001 From: Todd Derr Date: Mon, 9 Mar 2020 15:32:13 -0400 Subject: [PATCH 1/4] fix: associate each `Session` with a `Channel` Factor the `SessionPool::ChannelInfo` struct out to a stand-alone object, so we can store a `shared_ptr` to it in each `Session`. This allows us to properly account for `Sessions` that are marked bad or disassociated from the pool. Previously, the per-Channel session count was not being decremented in those cases. Since the `Channel` also holds a `shared_ptr`, we can eliminate the `Session::stub_` member. Fixes #1344 --- google/cloud/spanner/CMakeLists.txt | 1 + google/cloud/spanner/internal/channel.h | 45 +++++++++++++++++ google/cloud/spanner/internal/session.cc | 5 +- google/cloud/spanner/internal/session.h | 11 ++-- google/cloud/spanner/internal/session_pool.cc | 50 ++++++++++++------- google/cloud/spanner/internal/session_pool.h | 20 +++----- google/cloud/spanner/spanner_client.bzl | 1 + 7 files changed, 94 insertions(+), 39 deletions(-) create mode 100644 google/cloud/spanner/internal/channel.h diff --git a/google/cloud/spanner/CMakeLists.txt b/google/cloud/spanner/CMakeLists.txt index b929ecaf..1f46bbe0 100644 --- a/google/cloud/spanner/CMakeLists.txt +++ b/google/cloud/spanner/CMakeLists.txt @@ -126,6 +126,7 @@ add_library( internal/api_client_header.cc internal/api_client_header.h internal/build_info.h + internal/channel.h internal/compiler_info.cc internal/compiler_info.h internal/connection_impl.cc diff --git a/google/cloud/spanner/internal/channel.h b/google/cloud/spanner/internal/channel.h new file mode 100644 index 00000000..c591aadd --- /dev/null +++ b/google/cloud/spanner/internal/channel.h @@ -0,0 +1,45 @@ +// Copyright 2019 Google LLC +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#ifndef GOOGLE_CLOUD_CPP_SPANNER_GOOGLE_CLOUD_SPANNER_INTERNAL_CHANNEL_H +#define GOOGLE_CLOUD_CPP_SPANNER_GOOGLE_CLOUD_SPANNER_INTERNAL_CHANNEL_H + +#include "google/cloud/spanner/internal/spanner_stub.h" +#include + +namespace google { +namespace cloud { +namespace spanner { +inline namespace SPANNER_CLIENT_NS { +namespace internal { + +/** + * `Channel` represents a single gRPC Channel/Stub. + */ +struct Channel { + /// @p stub must not be nullptr + explicit Channel(std::shared_ptr stub_param) + : stub(std::move(stub_param)) {} + + std::shared_ptr const stub; + int session_count = 0; +}; + +} // namespace internal +} // namespace SPANNER_CLIENT_NS +} // namespace spanner +} // namespace cloud +} // namespace google + +#endif // GOOGLE_CLOUD_CPP_SPANNER_GOOGLE_CLOUD_SPANNER_INTERNAL_CHANNEL_H diff --git a/google/cloud/spanner/internal/session.cc b/google/cloud/spanner/internal/session.cc index ba385cda..1145835b 100644 --- a/google/cloud/spanner/internal/session.cc +++ b/google/cloud/spanner/internal/session.cc @@ -21,8 +21,9 @@ inline namespace SPANNER_CLIENT_NS { namespace internal { SessionHolder MakeDissociatedSessionHolder(std::string session_name) { - return SessionHolder(new Session(std::move(session_name), /*stub=*/nullptr), - std::default_delete()); + return SessionHolder( + new Session(std::move(session_name), /*channel=*/nullptr), + std::default_delete()); } } // namespace internal diff --git a/google/cloud/spanner/internal/session.h b/google/cloud/spanner/internal/session.h index e47f44f5..8ab8fee7 100644 --- a/google/cloud/spanner/internal/session.h +++ b/google/cloud/spanner/internal/session.h @@ -15,6 +15,7 @@ #ifndef GOOGLE_CLOUD_CPP_SPANNER_GOOGLE_CLOUD_SPANNER_INTERNAL_SESSION_H #define GOOGLE_CLOUD_CPP_SPANNER_GOOGLE_CLOUD_SPANNER_INTERNAL_SESSION_H +#include "google/cloud/spanner/internal/channel.h" #include "google/cloud/spanner/internal/spanner_stub.h" #include "google/cloud/spanner/version.h" #include @@ -35,9 +36,9 @@ namespace internal { */ class Session { public: - Session(std::string session_name, std::shared_ptr stub) + Session(std::string session_name, std::shared_ptr channel) : session_name_(std::move(session_name)), - stub_(std::move(stub)), + channel_(std::move(channel)), is_bad_(false) {} // Not copyable or moveable. @@ -53,11 +54,11 @@ class Session { bool is_bad() const { return is_bad_.load(std::memory_order_relaxed); } private: - friend class SessionPool; // for access to stub() - std::shared_ptr stub() const { return stub_; } + friend class SessionPool; // for access to channel() + std::shared_ptr const& channel() const { return channel_; } std::string const session_name_; - std::shared_ptr const stub_; + std::shared_ptr const channel_; std::atomic is_bad_; }; diff --git a/google/cloud/spanner/internal/session_pool.cc b/google/cloud/spanner/internal/session_pool.cc index 9cc4fcd4..5015796a 100644 --- a/google/cloud/spanner/internal/session_pool.cc +++ b/google/cloud/spanner/internal/session_pool.cc @@ -69,7 +69,7 @@ SessionPool::SessionPool(Database db, channels_.reserve(stubs.size()); for (auto& stub : stubs) { - channels_.emplace_back(std::move(stub)); + channels_.push_back(std::make_shared(std::move(stub))); } // `channels_` is never resized after this point. next_dissociated_stub_channel_ = channels_.begin(); @@ -147,6 +147,10 @@ StatusOr SessionPool::Allocate(bool dissociate_from_pool) { sessions_.pop_back(); if (dissociate_from_pool) { --total_sessions_; + auto const& channel = session->channel(); + if (channel) { + --channel->session_count; + } } return {MakeSessionHolder(std::move(session), dissociate_from_pool)}; } @@ -176,10 +180,10 @@ StatusOr SessionPool::Allocate(bool dissociate_from_pool) { // Add `min_sessions` to the pool (plus the one we're going to return), // subject to the `max_sessions_per_channel` cap. - ChannelInfo& channel = *next_channel_for_create_sessions_; - int sessions_to_create = - (std::min)(options_.min_sessions() + 1, - options_.max_sessions_per_channel() - channel.session_count); + auto const& channel = *next_channel_for_create_sessions_; + int sessions_to_create = (std::min)( + options_.min_sessions() + 1, + options_.max_sessions_per_channel() - channel->session_count); auto create_status = CreateSessions(lk, channel, options_.labels(), sessions_to_create); if (!create_status.ok()) { @@ -191,15 +195,18 @@ StatusOr SessionPool::Allocate(bool dissociate_from_pool) { } std::shared_ptr SessionPool::GetStub(Session const& session) { - std::shared_ptr stub = session.stub(); - if (!stub) { - // Sessions that were created for partitioned Reads/Queries do not have - // their own stub; return one to use by round-robining between the channels. - std::unique_lock lk(mu_); - stub = next_dissociated_stub_channel_->stub; - if (++next_dissociated_stub_channel_ == channels_.end()) { - next_dissociated_stub_channel_ = channels_.begin(); - } + auto const& channel = session.channel(); + if (channel) { + return channel->stub; + } + + // Sessions that were created for partitioned Reads/Queries do not have + // their own channel/stub; return a stub to use by round-robining between + // the channels. + std::unique_lock lk(mu_); + auto stub = (*next_dissociated_stub_channel_)->stub; + if (++next_dissociated_stub_channel_ == channels_.end()) { + next_dissociated_stub_channel_ = channels_.begin(); } return stub; } @@ -210,6 +217,10 @@ void SessionPool::Release(std::unique_ptr session) { // Once we have support for background processing, we may want to signal // that to replenish this bad session. --total_sessions_; + auto const& channel = session->channel(); + if (channel) { + --channel->session_count; + } return; } sessions_.push_back(std::move(session)); @@ -224,7 +235,7 @@ void SessionPool::Release(std::unique_ptr session) { // Requires `lk` has locked `mu_` prior to this call. `lk` will be dropped // while the RPC is in progress and then reacquired. Status SessionPool::CreateSessions( - std::unique_lock& lk, ChannelInfo& channel, + std::unique_lock& lk, std::shared_ptr const& channel, std::map const& labels, int num_sessions) { create_in_progress_ = true; lk.unlock(); @@ -233,7 +244,7 @@ Status SessionPool::CreateSessions( request.mutable_session_template()->mutable_labels()->insert(labels.begin(), labels.end()); request.set_session_count(std::int32_t{num_sessions}); - auto const& stub = channel.stub; + auto const& stub = channel->stub; auto response = RetryLoop( retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(), true, @@ -249,12 +260,12 @@ Status SessionPool::CreateSessions( } // Add sessions to the pool and update counters for `channel` and the pool. int sessions_created = response->session_size(); - channel.session_count += sessions_created; + channel->session_count += sessions_created; total_sessions_ += sessions_created; sessions_.reserve(sessions_.size() + sessions_created); for (auto& session : *response->mutable_session()) { sessions_.push_back(google::cloud::internal::make_unique( - std::move(*session.mutable_name()), stub)); + std::move(*session.mutable_name()), channel)); } // Shuffle the pool so we distribute returned sessions across channels. std::shuffle(sessions_.begin(), sessions_.end(), @@ -269,7 +280,8 @@ void SessionPool::UpdateNextChannelForCreateSessions() { // `mu_` must be held by the caller. next_channel_for_create_sessions_ = channels_.begin(); for (auto it = channels_.begin(); it != channels_.end(); ++it) { - if (it->session_count < next_channel_for_create_sessions_->session_count) { + if ((*it)->session_count < + (*next_channel_for_create_sessions_)->session_count) { next_channel_for_create_sessions_ = it; } } diff --git a/google/cloud/spanner/internal/session_pool.h b/google/cloud/spanner/internal/session_pool.h index f4f9442d..ebc3f3a3 100644 --- a/google/cloud/spanner/internal/session_pool.h +++ b/google/cloud/spanner/internal/session_pool.h @@ -17,6 +17,7 @@ #include "google/cloud/spanner/backoff_policy.h" #include "google/cloud/spanner/database.h" +#include "google/cloud/spanner/internal/channel.h" #include "google/cloud/spanner/internal/session.h" #include "google/cloud/spanner/internal/spanner_stub.h" #include "google/cloud/spanner/retry_policy.h" @@ -97,13 +98,6 @@ class SessionPool : public std::enable_shared_from_this { std::shared_ptr GetStub(Session const& session); private: - struct ChannelInfo { - explicit ChannelInfo(std::shared_ptr stub_param) - : stub(std::move(stub_param)) {} - std::shared_ptr const stub; - int session_count = 0; - }; - // Release session back to the pool. void Release(std::unique_ptr session); @@ -116,7 +110,8 @@ class SessionPool : public std::enable_shared_from_this { --num_waiting_for_session_; } - Status CreateSessions(std::unique_lock& lk, ChannelInfo& channel, + Status CreateSessions(std::unique_lock& lk, + std::shared_ptr const& channel, std::map const& labels, int num_sessions); // EXCLUSIVE_LOCKS_REQUIRED(mu_) @@ -146,11 +141,10 @@ class SessionPool : public std::enable_shared_from_this { // `channels_` is guaranteed to be non-empty and will not be resized after // the constructor runs (so the iterators are guaranteed to always be valid). // TODO(#566) replace `vector` with `absl::FixedArray` when available. - std::vector channels_; // GUARDED_BY(mu_) - std::vector::iterator - next_channel_for_create_sessions_; // GUARDED_BY(mu_) - std::vector::iterator - next_dissociated_stub_channel_; // GUARDED_BY(mu_) + using ChannelVec = std::vector>; + ChannelVec channels_; // GUARDED_BY(mu_) + ChannelVec::iterator next_channel_for_create_sessions_; // GUARDED_BY(mu_) + ChannelVec::iterator next_dissociated_stub_channel_; // GUARDED_BY(mu_) }; /** diff --git a/google/cloud/spanner/spanner_client.bzl b/google/cloud/spanner/spanner_client.bzl index a7d463e1..5ec0fe19 100644 --- a/google/cloud/spanner/spanner_client.bzl +++ b/google/cloud/spanner/spanner_client.bzl @@ -35,6 +35,7 @@ spanner_client_hdrs = [ "instance_admin_connection.h", "internal/api_client_header.h", "internal/build_info.h", + "internal/channel.h", "internal/compiler_info.h", "internal/connection_impl.h", "internal/database_admin_logging.h", From d534999c92405c368286e870729e07b93657cde1 Mon Sep 17 00:00:00 2001 From: Todd Derr Date: Mon, 9 Mar 2020 16:29:35 -0400 Subject: [PATCH 2/4] address review comments --- google/cloud/spanner/internal/channel.h | 3 ++- google/cloud/spanner/internal/session.h | 1 - 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/google/cloud/spanner/internal/channel.h b/google/cloud/spanner/internal/channel.h index c591aadd..fd7d3c88 100644 --- a/google/cloud/spanner/internal/channel.h +++ b/google/cloud/spanner/internal/channel.h @@ -16,6 +16,7 @@ #define GOOGLE_CLOUD_CPP_SPANNER_GOOGLE_CLOUD_SPANNER_INTERNAL_CHANNEL_H #include "google/cloud/spanner/internal/spanner_stub.h" +#include "google/cloud/spanner/version.h" #include namespace google { @@ -28,7 +29,7 @@ namespace internal { * `Channel` represents a single gRPC Channel/Stub. */ struct Channel { - /// @p stub must not be nullptr + /// @p stub_param must not be nullptr explicit Channel(std::shared_ptr stub_param) : stub(std::move(stub_param)) {} diff --git a/google/cloud/spanner/internal/session.h b/google/cloud/spanner/internal/session.h index 8ab8fee7..ca607cc7 100644 --- a/google/cloud/spanner/internal/session.h +++ b/google/cloud/spanner/internal/session.h @@ -16,7 +16,6 @@ #define GOOGLE_CLOUD_CPP_SPANNER_GOOGLE_CLOUD_SPANNER_INTERNAL_SESSION_H #include "google/cloud/spanner/internal/channel.h" -#include "google/cloud/spanner/internal/spanner_stub.h" #include "google/cloud/spanner/version.h" #include #include From a6a2cf13e93c74a79306524969d159fb9d1ee7ea Mon Sep 17 00:00:00 2001 From: Todd Derr Date: Mon, 9 Mar 2020 17:09:32 -0400 Subject: [PATCH 3/4] address review comments --- google/cloud/spanner/internal/channel.h | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/google/cloud/spanner/internal/channel.h b/google/cloud/spanner/internal/channel.h index fd7d3c88..85cf9b87 100644 --- a/google/cloud/spanner/internal/channel.h +++ b/google/cloud/spanner/internal/channel.h @@ -1,4 +1,4 @@ -// Copyright 2019 Google LLC +// Copyright 2020 Google LLC // // Licensed under the Apache License, Version 2.0 (the "License"); // you may not use this file except in compliance with the License. @@ -33,6 +33,10 @@ struct Channel { explicit Channel(std::shared_ptr stub_param) : stub(std::move(stub_param)) {} + // This class is not copyable or movable. + Channel(const Channel&) = delete; + Channel& operator=(const Channel&) = delete; + std::shared_ptr const stub; int session_count = 0; }; From 809d81005ebb43a6c4f30ac0a15e3ef52c6f408a Mon Sep 17 00:00:00 2001 From: Todd Derr Date: Mon, 9 Mar 2020 17:31:06 -0400 Subject: [PATCH 4/4] review comments --- google/cloud/spanner/internal/channel.h | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/google/cloud/spanner/internal/channel.h b/google/cloud/spanner/internal/channel.h index 85cf9b87..32ef5969 100644 --- a/google/cloud/spanner/internal/channel.h +++ b/google/cloud/spanner/internal/channel.h @@ -34,8 +34,8 @@ struct Channel { : stub(std::move(stub_param)) {} // This class is not copyable or movable. - Channel(const Channel&) = delete; - Channel& operator=(const Channel&) = delete; + Channel(Channel const&) = delete; + Channel& operator=(Channel const&) = delete; std::shared_ptr const stub; int session_count = 0;