Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

feat: add some plumbing to enable multiple channels #1050

Merged
merged 3 commits into from
Nov 14, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -314,7 +314,8 @@ class ReadExperiment : public Experiment {
auto options = cs::ConnectionOptions().set_channel_pool_domain(
"task:" + std::to_string(i));
clients.emplace_back(cs::Client(cs::MakeConnection(database, options)));
stubs.emplace_back(cs::internal::CreateDefaultSpannerStub(options));
stubs.emplace_back(
cs::internal::CreateDefaultSpannerStub(options, /*channel_id=*/0));
std::cout << '.' << std::flush;
}
std::cout << " DONE\n";
Expand Down
9 changes: 7 additions & 2 deletions google/cloud/spanner/client.cc
Original file line number Diff line number Diff line change
Expand Up @@ -219,8 +219,13 @@ StatusOr<PartitionedDmlResult> Client::ExecutePartitionedDml(

std::shared_ptr<Connection> MakeConnection(Database const& db,
ConnectionOptions const& options) {
auto stub = internal::CreateDefaultSpannerStub(options);
return internal::MakeConnection(db, std::move(stub));
std::vector<std::shared_ptr<internal::SpannerStub>> stubs;
int num_channels = std::min(options.num_channels(), 1);
stubs.reserve(num_channels);
for (int channel_id = 0; channel_id < num_channels; ++channel_id) {
stubs.push_back(internal::CreateDefaultSpannerStub(options, channel_id));
}
return internal::MakeConnection(db, std::move(stubs));
}

} // namespace SPANNER_CLIENT_NS
Expand Down
1 change: 1 addition & 0 deletions google/cloud/spanner/connection_options.cc
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ ConnectionOptions::ConnectionOptions(
std::shared_ptr<grpc::ChannelCredentials> credentials)
: credentials_(std::move(credentials)),
endpoint_("spanner.googleapis.com"),
num_channels_(1),
user_agent_prefix_(internal::BaseUserAgentPrefix()),
background_threads_factory_([] {
return google::cloud::internal::make_unique<
Expand Down
19 changes: 19 additions & 0 deletions google/cloud/spanner/connection_options.h
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,24 @@ class ConnectionOptions {
/// The endpoint used by clients configured with this object.
std::string const& endpoint() const { return endpoint_; }

/**
* The number of transport channels to create.
*
* Some transports limit the number of simultaneous calls in progress on a
* channel (for gRPC the limit is 100). Increasing the number of channels
* thus increases the number of operations that can be in progress in
* parallel.
*
* The default value is 1. TODO(#307) increase this.
*/
int num_channels() const { return num_channels_; }

/// Set the value for `num_channels()`.
ConnectionOptions& set_num_channels(int num_channels) {
num_channels_ = num_channels;
return *this;
}

/**
* Return whether tracing is enabled for the given @p component.
*
Expand Down Expand Up @@ -177,6 +195,7 @@ class ConnectionOptions {
private:
std::shared_ptr<grpc::ChannelCredentials> credentials_;
std::string endpoint_;
int num_channels_;
std::set<std::string> tracing_components_;
std::string channel_pool_domain_;

Expand Down
9 changes: 9 additions & 0 deletions google/cloud/spanner/connection_options_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,15 @@ TEST(ConnectionOptionsTest, AdminEndpoint) {
EXPECT_EQ("invalid-endpoint", options.endpoint());
}

TEST(ConnectionOptionsTest, NumChannels) {
ConnectionOptions options(grpc::InsecureChannelCredentials());
int num_channels = options.num_channels();
EXPECT_LT(0, num_channels);
num_channels *= 2; // ensure we change it from the default value.
options.set_num_channels(num_channels);
EXPECT_EQ(num_channels, options.num_channels());
}

TEST(ConnectionOptionsTest, Tracing) {
ConnectionOptions options(grpc::InsecureChannelCredentials());
options.enable_tracing("fake-component");
Expand Down
9 changes: 5 additions & 4 deletions google/cloud/spanner/internal/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -94,22 +94,23 @@ std::unique_ptr<BackoffPolicy> DefaultConnectionBackoffPolicy() {
}

std::shared_ptr<ConnectionImpl> MakeConnection(
Database db, std::shared_ptr<SpannerStub> stub,
Database db, std::vector<std::shared_ptr<SpannerStub>> stubs,
std::unique_ptr<RetryPolicy> retry_policy,
std::unique_ptr<BackoffPolicy> backoff_policy) {
return std::shared_ptr<ConnectionImpl>(
new ConnectionImpl(std::move(db), std::move(stub),
new ConnectionImpl(std::move(db), std::move(stubs),
std::move(retry_policy), std::move(backoff_policy)));
}

ConnectionImpl::ConnectionImpl(Database db, std::shared_ptr<SpannerStub> stub,
ConnectionImpl::ConnectionImpl(Database db,
std::vector<std::shared_ptr<SpannerStub>> stubs,
std::unique_ptr<RetryPolicy> retry_policy,
std::unique_ptr<BackoffPolicy> backoff_policy)
: db_(std::move(db)),
retry_policy_prototype_(std::move(retry_policy)),
backoff_policy_prototype_(std::move(backoff_policy)),
session_pool_(std::make_shared<SessionPool>(
db_, std::move(stub), retry_policy_prototype_->clone(),
db_, std::move(stubs), retry_policy_prototype_->clone(),
backoff_policy_prototype_->clone())) {}

RowStream ConnectionImpl::Read(ReadParams params) {
Expand Down
10 changes: 5 additions & 5 deletions google/cloud/spanner/internal/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -47,11 +47,11 @@ std::unique_ptr<BackoffPolicy> DefaultConnectionBackoffPolicy();
/**
* Factory method to construct a `ConnectionImpl`.
*
* @note In tests we can use a mock stub and custom (or mock) policies.
* @note In tests we can use mock stubs and custom (or mock) policies.
*/
class ConnectionImpl;
std::shared_ptr<ConnectionImpl> MakeConnection(
Database db, std::shared_ptr<SpannerStub> stub,
Database db, std::vector<std::shared_ptr<SpannerStub>> stubs,
std::unique_ptr<RetryPolicy> retry_policy = DefaultConnectionRetryPolicy(),
std::unique_ptr<BackoffPolicy> backoff_policy =
DefaultConnectionBackoffPolicy());
Expand Down Expand Up @@ -82,9 +82,9 @@ class ConnectionImpl : public Connection {
private:
// Only the factory method can construct instances of this class.
friend std::shared_ptr<ConnectionImpl> MakeConnection(
Database, std::shared_ptr<SpannerStub>, std::unique_ptr<RetryPolicy>,
std::unique_ptr<BackoffPolicy>);
ConnectionImpl(Database db, std::shared_ptr<SpannerStub> stub,
Database, std::vector<std::shared_ptr<SpannerStub>>,
std::unique_ptr<RetryPolicy>, std::unique_ptr<BackoffPolicy>);
ConnectionImpl(Database db, std::vector<std::shared_ptr<SpannerStub>> stubs,
std::unique_ptr<RetryPolicy> retry_policy,
std::unique_ptr<BackoffPolicy> backoff_policy);

Expand Down
Loading