Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

fix: associate each Session with a Channel #1346

Merged
merged 4 commits into from
Mar 9, 2020
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions google/cloud/spanner/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,7 @@ add_library(
internal/api_client_header.cc
internal/api_client_header.h
internal/build_info.h
internal/channel.h
internal/compiler_info.cc
internal/compiler_info.h
internal/connection_impl.cc
Expand Down
45 changes: 45 additions & 0 deletions google/cloud/spanner/internal/channel.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef GOOGLE_CLOUD_CPP_SPANNER_GOOGLE_CLOUD_SPANNER_INTERNAL_CHANNEL_H
#define GOOGLE_CLOUD_CPP_SPANNER_GOOGLE_CLOUD_SPANNER_INTERNAL_CHANNEL_H

#include "google/cloud/spanner/internal/spanner_stub.h"
#include <memory>

namespace google {
namespace cloud {
namespace spanner {
inline namespace SPANNER_CLIENT_NS {
devbww marked this conversation as resolved.
Show resolved Hide resolved
namespace internal {

/**
* `Channel` represents a single gRPC Channel/Stub.
*/
struct Channel {
/// @p stub must not be nullptr
devbww marked this conversation as resolved.
Show resolved Hide resolved
explicit Channel(std::shared_ptr<SpannerStub> stub_param)
: stub(std::move(stub_param)) {}

std::shared_ptr<SpannerStub> const stub;
int session_count = 0;
};

} // namespace internal
} // namespace SPANNER_CLIENT_NS
} // namespace spanner
} // namespace cloud
} // namespace google

#endif // GOOGLE_CLOUD_CPP_SPANNER_GOOGLE_CLOUD_SPANNER_INTERNAL_CHANNEL_H
5 changes: 3 additions & 2 deletions google/cloud/spanner/internal/session.cc
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@ inline namespace SPANNER_CLIENT_NS {
namespace internal {

SessionHolder MakeDissociatedSessionHolder(std::string session_name) {
return SessionHolder(new Session(std::move(session_name), /*stub=*/nullptr),
std::default_delete<Session>());
return SessionHolder(
new Session(std::move(session_name), /*channel=*/nullptr),
std::default_delete<Session>());
}

} // namespace internal
Expand Down
11 changes: 6 additions & 5 deletions google/cloud/spanner/internal/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef GOOGLE_CLOUD_CPP_SPANNER_GOOGLE_CLOUD_SPANNER_INTERNAL_SESSION_H
#define GOOGLE_CLOUD_CPP_SPANNER_GOOGLE_CLOUD_SPANNER_INTERNAL_SESSION_H

#include "google/cloud/spanner/internal/channel.h"
#include "google/cloud/spanner/internal/spanner_stub.h"
devbww marked this conversation as resolved.
Show resolved Hide resolved
#include "google/cloud/spanner/version.h"
#include <atomic>
Expand All @@ -35,9 +36,9 @@ namespace internal {
*/
class Session {
public:
Session(std::string session_name, std::shared_ptr<SpannerStub> stub)
Session(std::string session_name, std::shared_ptr<Channel> channel)
: session_name_(std::move(session_name)),
stub_(std::move(stub)),
channel_(std::move(channel)),
is_bad_(false) {}

// Not copyable or moveable.
Expand All @@ -53,11 +54,11 @@ class Session {
bool is_bad() const { return is_bad_.load(std::memory_order_relaxed); }

private:
friend class SessionPool; // for access to stub()
std::shared_ptr<SpannerStub> stub() const { return stub_; }
friend class SessionPool; // for access to channel()
std::shared_ptr<Channel> const& channel() const { return channel_; }

std::string const session_name_;
std::shared_ptr<SpannerStub> const stub_;
std::shared_ptr<Channel> const channel_;
std::atomic<bool> is_bad_;
};

Expand Down
50 changes: 31 additions & 19 deletions google/cloud/spanner/internal/session_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ SessionPool::SessionPool(Database db,

channels_.reserve(stubs.size());
for (auto& stub : stubs) {
channels_.emplace_back(std::move(stub));
channels_.push_back(std::make_shared<Channel>(std::move(stub)));
}
// `channels_` is never resized after this point.
next_dissociated_stub_channel_ = channels_.begin();
Expand Down Expand Up @@ -147,6 +147,10 @@ StatusOr<SessionHolder> SessionPool::Allocate(bool dissociate_from_pool) {
sessions_.pop_back();
if (dissociate_from_pool) {
--total_sessions_;
auto const& channel = session->channel();
if (channel) {
--channel->session_count;
}
}
return {MakeSessionHolder(std::move(session), dissociate_from_pool)};
}
Expand Down Expand Up @@ -176,10 +180,10 @@ StatusOr<SessionHolder> SessionPool::Allocate(bool dissociate_from_pool) {

// Add `min_sessions` to the pool (plus the one we're going to return),
// subject to the `max_sessions_per_channel` cap.
ChannelInfo& channel = *next_channel_for_create_sessions_;
int sessions_to_create =
(std::min)(options_.min_sessions() + 1,
options_.max_sessions_per_channel() - channel.session_count);
auto const& channel = *next_channel_for_create_sessions_;
int sessions_to_create = (std::min)(
options_.min_sessions() + 1,
options_.max_sessions_per_channel() - channel->session_count);
auto create_status =
CreateSessions(lk, channel, options_.labels(), sessions_to_create);
if (!create_status.ok()) {
Expand All @@ -191,15 +195,18 @@ StatusOr<SessionHolder> SessionPool::Allocate(bool dissociate_from_pool) {
}

std::shared_ptr<SpannerStub> SessionPool::GetStub(Session const& session) {
std::shared_ptr<SpannerStub> stub = session.stub();
if (!stub) {
// Sessions that were created for partitioned Reads/Queries do not have
// their own stub; return one to use by round-robining between the channels.
std::unique_lock<std::mutex> lk(mu_);
stub = next_dissociated_stub_channel_->stub;
if (++next_dissociated_stub_channel_ == channels_.end()) {
next_dissociated_stub_channel_ = channels_.begin();
}
auto const& channel = session.channel();
if (channel) {
return channel->stub;
}

// Sessions that were created for partitioned Reads/Queries do not have
// their own channel/stub; return a stub to use by round-robining between
// the channels.
std::unique_lock<std::mutex> lk(mu_);
auto stub = (*next_dissociated_stub_channel_)->stub;
if (++next_dissociated_stub_channel_ == channels_.end()) {
next_dissociated_stub_channel_ = channels_.begin();
}
return stub;
}
Expand All @@ -210,6 +217,10 @@ void SessionPool::Release(std::unique_ptr<Session> session) {
// Once we have support for background processing, we may want to signal
// that to replenish this bad session.
--total_sessions_;
auto const& channel = session->channel();
if (channel) {
--channel->session_count;
}
return;
}
sessions_.push_back(std::move(session));
Expand All @@ -224,7 +235,7 @@ void SessionPool::Release(std::unique_ptr<Session> session) {
// Requires `lk` has locked `mu_` prior to this call. `lk` will be dropped
// while the RPC is in progress and then reacquired.
Status SessionPool::CreateSessions(
std::unique_lock<std::mutex>& lk, ChannelInfo& channel,
std::unique_lock<std::mutex>& lk, std::shared_ptr<Channel> const& channel,
std::map<std::string, std::string> const& labels, int num_sessions) {
create_in_progress_ = true;
lk.unlock();
Expand All @@ -233,7 +244,7 @@ Status SessionPool::CreateSessions(
request.mutable_session_template()->mutable_labels()->insert(labels.begin(),
labels.end());
request.set_session_count(std::int32_t{num_sessions});
auto const& stub = channel.stub;
auto const& stub = channel->stub;
auto response = RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
true,
Expand All @@ -249,12 +260,12 @@ Status SessionPool::CreateSessions(
}
// Add sessions to the pool and update counters for `channel` and the pool.
int sessions_created = response->session_size();
channel.session_count += sessions_created;
channel->session_count += sessions_created;
total_sessions_ += sessions_created;
sessions_.reserve(sessions_.size() + sessions_created);
for (auto& session : *response->mutable_session()) {
sessions_.push_back(google::cloud::internal::make_unique<Session>(
std::move(*session.mutable_name()), stub));
std::move(*session.mutable_name()), channel));
}
// Shuffle the pool so we distribute returned sessions across channels.
std::shuffle(sessions_.begin(), sessions_.end(),
Expand All @@ -269,7 +280,8 @@ void SessionPool::UpdateNextChannelForCreateSessions() {
// `mu_` must be held by the caller.
next_channel_for_create_sessions_ = channels_.begin();
for (auto it = channels_.begin(); it != channels_.end(); ++it) {
if (it->session_count < next_channel_for_create_sessions_->session_count) {
if ((*it)->session_count <
(*next_channel_for_create_sessions_)->session_count) {
next_channel_for_create_sessions_ = it;
}
}
Expand Down
20 changes: 7 additions & 13 deletions google/cloud/spanner/internal/session_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

#include "google/cloud/spanner/backoff_policy.h"
#include "google/cloud/spanner/database.h"
#include "google/cloud/spanner/internal/channel.h"
#include "google/cloud/spanner/internal/session.h"
#include "google/cloud/spanner/internal/spanner_stub.h"
#include "google/cloud/spanner/retry_policy.h"
Expand Down Expand Up @@ -97,13 +98,6 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
std::shared_ptr<SpannerStub> GetStub(Session const& session);

private:
struct ChannelInfo {
explicit ChannelInfo(std::shared_ptr<SpannerStub> stub_param)
: stub(std::move(stub_param)) {}
std::shared_ptr<SpannerStub> const stub;
int session_count = 0;
};

// Release session back to the pool.
void Release(std::unique_ptr<Session> session);

Expand All @@ -116,7 +110,8 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
--num_waiting_for_session_;
}

Status CreateSessions(std::unique_lock<std::mutex>& lk, ChannelInfo& channel,
Status CreateSessions(std::unique_lock<std::mutex>& lk,
std::shared_ptr<Channel> const& channel,
std::map<std::string, std::string> const& labels,
int num_sessions); // EXCLUSIVE_LOCKS_REQUIRED(mu_)

Expand Down Expand Up @@ -146,11 +141,10 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
// `channels_` is guaranteed to be non-empty and will not be resized after
// the constructor runs (so the iterators are guaranteed to always be valid).
// TODO(#566) replace `vector` with `absl::FixedArray` when available.
std::vector<ChannelInfo> channels_; // GUARDED_BY(mu_)
std::vector<ChannelInfo>::iterator
next_channel_for_create_sessions_; // GUARDED_BY(mu_)
std::vector<ChannelInfo>::iterator
next_dissociated_stub_channel_; // GUARDED_BY(mu_)
using ChannelVec = std::vector<std::shared_ptr<Channel>>;
ChannelVec channels_; // GUARDED_BY(mu_)
ChannelVec::iterator next_channel_for_create_sessions_; // GUARDED_BY(mu_)
ChannelVec::iterator next_dissociated_stub_channel_; // GUARDED_BY(mu_)
};

/**
Expand Down
1 change: 1 addition & 0 deletions google/cloud/spanner/spanner_client.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ spanner_client_hdrs = [
"instance_admin_connection.h",
"internal/api_client_header.h",
"internal/build_info.h",
"internal/channel.h",
"internal/compiler_info.h",
"internal/connection_impl.h",
"internal/database_admin_logging.h",
Expand Down