Skip to content

Commit

Permalink
feat: associate SpannerStub with a Session (googleapis/google-clo…
Browse files Browse the repository at this point in the history
…ud-cpp-spanner#1041)

*`SessionPool` manages assigning a `SpannerStub` to each `Session`
*`ConnectionImpl` obtains the stub to use for a call from the `SessionPool` and no longer retains any knowledge of the stub.

This will enable the use of multiple stubs per `Connection`; it's important for a `Session` to remain associated with the stub/channel that created it, otherwise there is a performance penalty.

Part of googleapis/google-cloud-cpp-spanner#307
  • Loading branch information
mr-salty authored Nov 13, 2019
1 parent b4a1652 commit e3615bf
Show file tree
Hide file tree
Showing 9 changed files with 106 additions and 46 deletions.
1 change: 1 addition & 0 deletions google/cloud/spanner/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ add_library(
internal/range_from_pagination.h
internal/retry_loop.cc
internal/retry_loop.h
internal/session.cc
internal/session.h
internal/session_pool.cc
internal/session_pool.h
Expand Down
74 changes: 38 additions & 36 deletions google/cloud/spanner/internal/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,10 @@ ConnectionImpl::ConnectionImpl(Database db, std::shared_ptr<SpannerStub> stub,
std::unique_ptr<RetryPolicy> retry_policy,
std::unique_ptr<BackoffPolicy> backoff_policy)
: db_(std::move(db)),
stub_(std::move(stub)),
retry_policy_prototype_(std::move(retry_policy)),
backoff_policy_prototype_(std::move(backoff_policy)),
session_pool_(std::make_shared<SessionPool>(
db_, stub_, retry_policy_prototype_->clone(),
db_, std::move(stub), retry_policy_prototype_->clone(),
backoff_policy_prototype_->clone())) {}

RowStream ConnectionImpl::Read(ReadParams params) {
Expand Down Expand Up @@ -328,10 +327,9 @@ RowStream ConnectionImpl::ReadImpl(SessionHolder& session,
request.set_partition_token(*std::move(params.partition_token));
}

auto const& stub = stub_;
// Capture a copy of `stub` to ensure the `shared_ptr<>` remains valid through
// the lifetime of the lambda. Note that the local variable `stub` is a
// reference to avoid increasing refcounts twice, but the capture is by value.
// the lifetime of the lambda.
auto stub = session_pool_->GetStub(*session);
auto factory = [stub, request](std::string const& resume_token) mutable {
request.set_resume_token(resume_token);
auto context = google::cloud::internal::make_unique<grpc::ClientContext>();
Expand Down Expand Up @@ -379,12 +377,13 @@ StatusOr<std::vector<ReadPartition>> ConnectionImpl::PartitionReadImpl(
*request.mutable_key_set() = internal::ToProto(params.keys);
*request.mutable_partition_options() = internal::ToProto(partition_options);

auto stub = session_pool_->GetStub(*session);
auto response = internal::RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
true,
[this](grpc::ClientContext& context,
spanner_proto::PartitionReadRequest const& request) {
return stub_->PartitionRead(context, request);
[&stub](grpc::ClientContext& context,
spanner_proto::PartitionReadRequest const& request) {
return stub->PartitionRead(context, request);
},
request, __func__);
if (!response.ok()) {
Expand Down Expand Up @@ -452,11 +451,10 @@ ResultType ConnectionImpl::CommonQueryImpl(
if (!prepare_status.ok()) {
return MakeStatusOnlyResult<ResultType>(std::move(prepare_status));
}
// Capture a copy of of these member variables to ensure the `shared_ptr<>`
// remains valid through the lifetime of the lambda. Note that the local
// variables are a reference to avoid increasing refcounts twice, but the
// capture is by value.
auto const& stub = stub_;
// Capture a copy of of these to ensure the `shared_ptr<>` remains valid
// through the lifetime of the lambda. Note that the local variables are a
// reference to avoid increasing refcounts twice, but the capture is by value.
auto stub = session_pool_->GetStub(*session);
auto const& retry_policy = retry_policy_prototype_;
auto const& backoff_policy = backoff_policy_prototype_;

Expand Down Expand Up @@ -512,11 +510,10 @@ StatusOr<ResultType> ConnectionImpl::CommonDmlImpl(
if (!prepare_status.ok()) {
return prepare_status;
}
// Capture a copy of of these member variables to ensure the `shared_ptr<>`
// remains valid through the lifetime of the lambda. Note that the local
// variables are a reference to avoid increasing refcounts twice, but the
// capture is by value.
auto const& stub = stub_;
// Capture a copy of of these to ensure the `shared_ptr<>` remains valid
// through the lifetime of the lambda. Note that the local variables are a
// reference to avoid increasing refcounts twice, but the capture is by value.
auto stub = session_pool_->GetStub(*session);
auto const& retry_policy = retry_policy_prototype_;
auto const& backoff_policy = backoff_policy_prototype_;

Expand Down Expand Up @@ -586,12 +583,13 @@ StatusOr<std::vector<QueryPartition>> ConnectionImpl::PartitionQueryImpl(
std::move(*sql_statement.mutable_param_types());
*request.mutable_partition_options() = internal::ToProto(partition_options);

auto stub = session_pool_->GetStub(*session);
auto response = internal::RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
true,
[this](grpc::ClientContext& context,
spanner_proto::PartitionQueryRequest const& request) {
return stub_->PartitionQuery(context, request);
[&stub](grpc::ClientContext& context,
spanner_proto::PartitionQueryRequest const& request) {
return stub->PartitionQuery(context, request);
},
request, __func__);
if (!response.ok()) {
Expand Down Expand Up @@ -628,12 +626,13 @@ StatusOr<BatchDmlResult> ConnectionImpl::ExecuteBatchDmlImpl(
*request.add_statements() = internal::ToProto(std::move(sql));
}

auto stub = session_pool_->GetStub(*session);
auto response = internal::RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
true,
[this](grpc::ClientContext& context,
spanner_proto::ExecuteBatchDmlRequest const& request) {
return stub_->ExecuteBatchDml(context, request);
[&stub](grpc::ClientContext& context,
spanner_proto::ExecuteBatchDmlRequest const& request) {
return stub->ExecuteBatchDml(context, request);
},
request, __func__);
if (!response) {
Expand Down Expand Up @@ -665,12 +664,13 @@ StatusOr<PartitionedDmlResult> ConnectionImpl::ExecutePartitionedDmlImpl(
begin_request.set_session(session->session_name());
*begin_request.mutable_options()->mutable_partitioned_dml() =
spanner_proto::TransactionOptions_PartitionedDml();
auto stub = session_pool_->GetStub(*session);
auto begin_response = internal::RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
true,
[this](grpc::ClientContext& context,
spanner_proto::BeginTransactionRequest const& request) {
return stub_->BeginTransaction(context, request);
[&stub](grpc::ClientContext& context,
spanner_proto::BeginTransactionRequest const& request) {
return stub->BeginTransaction(context, request);
},
begin_request, __func__);
if (!begin_response) {
Expand All @@ -690,9 +690,9 @@ StatusOr<PartitionedDmlResult> ConnectionImpl::ExecutePartitionedDmlImpl(
auto response = internal::RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
true,
[this](grpc::ClientContext& context,
spanner_proto::ExecuteSqlRequest const& request) {
return stub_->ExecuteSql(context, request);
[&stub](grpc::ClientContext& context,
spanner_proto::ExecuteSqlRequest const& request) {
return stub->ExecuteSql(context, request);
},
request, __func__);
if (!response) {
Expand Down Expand Up @@ -729,12 +729,13 @@ StatusOr<CommitResult> ConnectionImpl::CommitImpl(
is_idempotent = true;
}

auto stub = session_pool_->GetStub(*session);
auto response = internal::RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
is_idempotent,
[this](grpc::ClientContext& context,
spanner_proto::CommitRequest const& request) {
return stub_->Commit(context, request);
[&stub](grpc::ClientContext& context,
spanner_proto::CommitRequest const& request) {
return stub->Commit(context, request);
},
request, __func__);
if (!response) {
Expand Down Expand Up @@ -764,12 +765,13 @@ Status ConnectionImpl::RollbackImpl(SessionHolder& session,
spanner_proto::RollbackRequest request;
request.set_session(session->session_name());
request.set_transaction_id(s.id());
auto stub = session_pool_->GetStub(*session);
return internal::RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
true,
[this](grpc::ClientContext& context,
spanner_proto::RollbackRequest const& request) {
return stub_->Rollback(context, request);
[&stub](grpc::ClientContext& context,
spanner_proto::RollbackRequest const& request) {
return stub->Rollback(context, request);
},
request, __func__);
}
Expand Down
1 change: 0 additions & 1 deletion google/cloud/spanner/internal/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ class ConnectionImpl : public Connection {
google::spanner::v1::ExecuteSqlRequest::QueryMode query_mode);

Database db_;
std::shared_ptr<SpannerStub> stub_;
std::shared_ptr<RetryPolicy const> retry_policy_prototype_;
std::shared_ptr<BackoffPolicy const> backoff_policy_prototype_;
std::shared_ptr<SessionPool> session_pool_;
Expand Down
32 changes: 32 additions & 0 deletions google/cloud/spanner/internal/session.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "google/cloud/spanner/internal/session.h"

namespace google {
namespace cloud {
namespace spanner {
inline namespace SPANNER_CLIENT_NS {
namespace internal {

SessionHolder MakeDissociatedSessionHolder(std::string session_name) {
return SessionHolder(new Session(std::move(session_name), /*stub=*/nullptr),
std::default_delete<Session>());
}

} // namespace internal
} // namespace SPANNER_CLIENT_NS
} // namespace spanner
} // namespace cloud
} // namespace google
17 changes: 9 additions & 8 deletions google/cloud/spanner/internal/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef GOOGLE_CLOUD_CPP_SPANNER_GOOGLE_CLOUD_SPANNER_INTERNAL_SESSION_H_
#define GOOGLE_CLOUD_CPP_SPANNER_GOOGLE_CLOUD_SPANNER_INTERNAL_SESSION_H_

#include "google/cloud/spanner/internal/spanner_stub.h"
#include "google/cloud/spanner/version.h"
#include <functional>
#include <memory>
Expand All @@ -32,8 +33,8 @@ namespace internal {
*/
class Session {
public:
Session(std::string session_name) noexcept
: session_name_(std::move(session_name)) {}
Session(std::string session_name, std::shared_ptr<SpannerStub> stub) noexcept
: session_name_(std::move(session_name)), stub_(std::move(stub)) {}

// Not copyable or moveable.
Session(Session const&) = delete;
Expand All @@ -44,7 +45,11 @@ class Session {
std::string const& session_name() const { return session_name_; }

private:
std::string session_name_;
friend class SessionPool; // for access to stub()
std::shared_ptr<SpannerStub> stub() const { return stub_; }

std::string const session_name_;
std::shared_ptr<SpannerStub> const stub_;
};

/**
Expand All @@ -60,11 +65,7 @@ using SessionHolder = std::unique_ptr<Session, std::function<void(Session*)>>;
* like partitioned operations where the `Session` may be used on multiple
* machines and should not be returned to the pool.
*/
template <typename... Args>
SessionHolder MakeDissociatedSessionHolder(Args&&... args) {
return SessionHolder(new Session(std::forward<Args>(args)...),
std::default_delete<Session>());
}
SessionHolder MakeDissociatedSessionHolder(std::string session_name);

} // namespace internal
} // namespace SPANNER_CLIENT_NS
Expand Down
11 changes: 10 additions & 1 deletion google/cloud/spanner/internal/session_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,15 @@ StatusOr<SessionHolder> SessionPool::Allocate(bool dissociate_from_pool) {
}
}

std::shared_ptr<SpannerStub> SessionPool::GetStub(Session const& session) {
std::shared_ptr<SpannerStub> stub = session.stub();
if (stub) return stub;

// Sessions that were created for partitioned Reads/Queries do not have
// their own stub, so return one to use.
return stub_;
}

void SessionPool::Release(Session* session) {
std::unique_lock<std::mutex> lk(mu_);
bool notify = sessions_.empty();
Expand Down Expand Up @@ -172,7 +181,7 @@ StatusOr<std::vector<std::unique_ptr<Session>>> SessionPool::CreateSessions(
sessions.reserve(response->session_size());
for (auto& session : *response->mutable_session()) {
sessions.push_back(google::cloud::internal::make_unique<Session>(
std::move(*session.mutable_name())));
std::move(*session.mutable_name()), stub_));
}
return {std::move(sessions)};
}
Expand Down
5 changes: 5 additions & 0 deletions google/cloud/spanner/internal/session_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
*/
StatusOr<SessionHolder> Allocate(bool dissociate_from_pool = false);

/**
* Return a `SpannerStub` to be used when making calls using `session`.
*/
std::shared_ptr<SpannerStub> GetStub(Session const& session);

private:
/**
* Release session back to the pool.
Expand Down
10 changes: 10 additions & 0 deletions google/cloud/spanner/internal/session_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ TEST(SessionPool, Allocate) {
auto session = pool->Allocate();
ASSERT_STATUS_OK(session);
EXPECT_EQ((*session)->session_name(), "session1");
EXPECT_EQ(pool->GetStub(**session), mock);
}

TEST(SessionPool, CreateError) {
Expand Down Expand Up @@ -233,6 +234,15 @@ TEST(SessionPool, MaxSessionsBlockUntilRelease) {
t.join();
}

TEST(SessionPool, GetStubForStublessSession) {
auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
auto db = Database("project", "instance", "database");
auto pool = MakeSessionPool(db, mock);
// ensure we get a stub even if we didn't allocate from the pool.
auto session = MakeDissociatedSessionHolder("session_id");
EXPECT_EQ(pool->GetStub(*session), mock);
}

} // namespace
} // namespace internal
} // namespace SPANNER_CLIENT_NS
Expand Down
1 change: 1 addition & 0 deletions google/cloud/spanner/spanner_client.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ spanner_client_srcs = [
"internal/partial_result_set_resume.cc",
"internal/partial_result_set_source.cc",
"internal/retry_loop.cc",
"internal/session.cc",
"internal/session_pool.cc",
"internal/spanner_stub.cc",
"internal/time.cc",
Expand Down

0 comments on commit e3615bf

Please sign in to comment.