Skip to content
This repository has been archived by the owner on Dec 8, 2021. It is now read-only.

feat: associate SpannerStub with a Session #1041

Merged
merged 4 commits into from
Nov 13, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions google/cloud/spanner/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ add_library(
internal/range_from_pagination.h
internal/retry_loop.cc
internal/retry_loop.h
internal/session.cc
internal/session.h
internal/session_pool.cc
internal/session_pool.h
Expand Down
74 changes: 38 additions & 36 deletions google/cloud/spanner/internal/connection_impl.cc
Original file line number Diff line number Diff line change
Expand Up @@ -106,11 +106,10 @@ ConnectionImpl::ConnectionImpl(Database db, std::shared_ptr<SpannerStub> stub,
std::unique_ptr<RetryPolicy> retry_policy,
std::unique_ptr<BackoffPolicy> backoff_policy)
: db_(std::move(db)),
stub_(std::move(stub)),
retry_policy_prototype_(std::move(retry_policy)),
backoff_policy_prototype_(std::move(backoff_policy)),
session_pool_(std::make_shared<SessionPool>(
db_, stub_, retry_policy_prototype_->clone(),
db_, std::move(stub), retry_policy_prototype_->clone(),
backoff_policy_prototype_->clone())) {}

RowStream ConnectionImpl::Read(ReadParams params) {
Expand Down Expand Up @@ -328,10 +327,9 @@ RowStream ConnectionImpl::ReadImpl(SessionHolder& session,
request.set_partition_token(*std::move(params.partition_token));
}

auto const& stub = stub_;
// Capture a copy of `stub` to ensure the `shared_ptr<>` remains valid through
// the lifetime of the lambda. Note that the local variable `stub` is a
// reference to avoid increasing refcounts twice, but the capture is by value.
// the lifetime of the lambda.
auto stub = session_pool_->GetStub(*session);
auto factory = [stub, request](std::string const& resume_token) mutable {
request.set_resume_token(resume_token);
auto context = google::cloud::internal::make_unique<grpc::ClientContext>();
Expand Down Expand Up @@ -379,12 +377,13 @@ StatusOr<std::vector<ReadPartition>> ConnectionImpl::PartitionReadImpl(
*request.mutable_key_set() = internal::ToProto(params.keys);
*request.mutable_partition_options() = internal::ToProto(partition_options);

auto stub = session_pool_->GetStub(*session);
auto response = internal::RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
true,
[this](grpc::ClientContext& context,
spanner_proto::PartitionReadRequest const& request) {
return stub_->PartitionRead(context, request);
[&stub](grpc::ClientContext& context,
spanner_proto::PartitionReadRequest const& request) {
return stub->PartitionRead(context, request);
},
request, __func__);
if (!response.ok()) {
Expand Down Expand Up @@ -452,11 +451,10 @@ ResultType ConnectionImpl::CommonQueryImpl(
if (!prepare_status.ok()) {
return MakeStatusOnlyResult<ResultType>(std::move(prepare_status));
}
// Capture a copy of of these member variables to ensure the `shared_ptr<>`
// remains valid through the lifetime of the lambda. Note that the local
// variables are a reference to avoid increasing refcounts twice, but the
// capture is by value.
auto const& stub = stub_;
// Capture a copy of of these to ensure the `shared_ptr<>` remains valid
// through the lifetime of the lambda. Note that the local variables are a
// reference to avoid increasing refcounts twice, but the capture is by value.
auto stub = session_pool_->GetStub(*session);
auto const& retry_policy = retry_policy_prototype_;
auto const& backoff_policy = backoff_policy_prototype_;

Expand Down Expand Up @@ -512,11 +510,10 @@ StatusOr<ResultType> ConnectionImpl::CommonDmlImpl(
if (!prepare_status.ok()) {
return prepare_status;
}
// Capture a copy of of these member variables to ensure the `shared_ptr<>`
// remains valid through the lifetime of the lambda. Note that the local
// variables are a reference to avoid increasing refcounts twice, but the
// capture is by value.
auto const& stub = stub_;
// Capture a copy of of these to ensure the `shared_ptr<>` remains valid
// through the lifetime of the lambda. Note that the local variables are a
// reference to avoid increasing refcounts twice, but the capture is by value.
auto stub = session_pool_->GetStub(*session);
auto const& retry_policy = retry_policy_prototype_;
auto const& backoff_policy = backoff_policy_prototype_;

Expand Down Expand Up @@ -586,12 +583,13 @@ StatusOr<std::vector<QueryPartition>> ConnectionImpl::PartitionQueryImpl(
std::move(*sql_statement.mutable_param_types());
*request.mutable_partition_options() = internal::ToProto(partition_options);

auto stub = session_pool_->GetStub(*session);
auto response = internal::RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
true,
[this](grpc::ClientContext& context,
spanner_proto::PartitionQueryRequest const& request) {
return stub_->PartitionQuery(context, request);
[&stub](grpc::ClientContext& context,
spanner_proto::PartitionQueryRequest const& request) {
return stub->PartitionQuery(context, request);
},
request, __func__);
if (!response.ok()) {
Expand Down Expand Up @@ -628,12 +626,13 @@ StatusOr<BatchDmlResult> ConnectionImpl::ExecuteBatchDmlImpl(
*request.add_statements() = internal::ToProto(std::move(sql));
}

auto stub = session_pool_->GetStub(*session);
auto response = internal::RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
true,
[this](grpc::ClientContext& context,
spanner_proto::ExecuteBatchDmlRequest const& request) {
return stub_->ExecuteBatchDml(context, request);
[&stub](grpc::ClientContext& context,
spanner_proto::ExecuteBatchDmlRequest const& request) {
return stub->ExecuteBatchDml(context, request);
},
request, __func__);
if (!response) {
Expand Down Expand Up @@ -665,12 +664,13 @@ StatusOr<PartitionedDmlResult> ConnectionImpl::ExecutePartitionedDmlImpl(
begin_request.set_session(session->session_name());
*begin_request.mutable_options()->mutable_partitioned_dml() =
spanner_proto::TransactionOptions_PartitionedDml();
auto stub = session_pool_->GetStub(*session);
auto begin_response = internal::RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
true,
[this](grpc::ClientContext& context,
spanner_proto::BeginTransactionRequest const& request) {
return stub_->BeginTransaction(context, request);
[&stub](grpc::ClientContext& context,
spanner_proto::BeginTransactionRequest const& request) {
return stub->BeginTransaction(context, request);
},
begin_request, __func__);
if (!begin_response) {
Expand All @@ -690,9 +690,9 @@ StatusOr<PartitionedDmlResult> ConnectionImpl::ExecutePartitionedDmlImpl(
auto response = internal::RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
true,
[this](grpc::ClientContext& context,
spanner_proto::ExecuteSqlRequest const& request) {
return stub_->ExecuteSql(context, request);
[&stub](grpc::ClientContext& context,
spanner_proto::ExecuteSqlRequest const& request) {
return stub->ExecuteSql(context, request);
},
request, __func__);
if (!response) {
Expand Down Expand Up @@ -729,12 +729,13 @@ StatusOr<CommitResult> ConnectionImpl::CommitImpl(
is_idempotent = true;
}

auto stub = session_pool_->GetStub(*session);
auto response = internal::RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
is_idempotent,
[this](grpc::ClientContext& context,
spanner_proto::CommitRequest const& request) {
return stub_->Commit(context, request);
[&stub](grpc::ClientContext& context,
spanner_proto::CommitRequest const& request) {
return stub->Commit(context, request);
},
request, __func__);
if (!response) {
Expand Down Expand Up @@ -764,12 +765,13 @@ Status ConnectionImpl::RollbackImpl(SessionHolder& session,
spanner_proto::RollbackRequest request;
request.set_session(session->session_name());
request.set_transaction_id(s.id());
auto stub = session_pool_->GetStub(*session);
return internal::RetryLoop(
retry_policy_prototype_->clone(), backoff_policy_prototype_->clone(),
true,
[this](grpc::ClientContext& context,
spanner_proto::RollbackRequest const& request) {
return stub_->Rollback(context, request);
[&stub](grpc::ClientContext& context,
spanner_proto::RollbackRequest const& request) {
return stub->Rollback(context, request);
},
request, __func__);
}
Expand Down
1 change: 0 additions & 1 deletion google/cloud/spanner/internal/connection_impl.h
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,6 @@ class ConnectionImpl : public Connection {
google::spanner::v1::ExecuteSqlRequest::QueryMode query_mode);

Database db_;
std::shared_ptr<SpannerStub> stub_;
std::shared_ptr<RetryPolicy const> retry_policy_prototype_;
std::shared_ptr<BackoffPolicy const> backoff_policy_prototype_;
std::shared_ptr<SessionPool> session_pool_;
Expand Down
32 changes: 32 additions & 0 deletions google/cloud/spanner/internal/session.cc
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
// Copyright 2019 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include "google/cloud/spanner/internal/session.h"

namespace google {
namespace cloud {
namespace spanner {
inline namespace SPANNER_CLIENT_NS {
namespace internal {

SessionHolder MakeDissociatedSessionHolder(std::string session_name) {
return SessionHolder(new Session(std::move(session_name), /*stub=*/nullptr),
std::default_delete<Session>());
}

} // namespace internal
} // namespace SPANNER_CLIENT_NS
} // namespace spanner
} // namespace cloud
} // namespace google
17 changes: 9 additions & 8 deletions google/cloud/spanner/internal/session.h
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
#ifndef GOOGLE_CLOUD_CPP_SPANNER_GOOGLE_CLOUD_SPANNER_INTERNAL_SESSION_H_
#define GOOGLE_CLOUD_CPP_SPANNER_GOOGLE_CLOUD_SPANNER_INTERNAL_SESSION_H_

#include "google/cloud/spanner/internal/spanner_stub.h"
#include "google/cloud/spanner/version.h"
#include <functional>
#include <memory>
Expand All @@ -32,8 +33,8 @@ namespace internal {
*/
class Session {
public:
Session(std::string session_name) noexcept
: session_name_(std::move(session_name)) {}
Session(std::string session_name, std::shared_ptr<SpannerStub> stub) noexcept
: session_name_(std::move(session_name)), stub_(std::move(stub)) {}

// Not copyable or moveable.
Session(Session const&) = delete;
Expand All @@ -44,7 +45,11 @@ class Session {
std::string const& session_name() const { return session_name_; }

private:
std::string session_name_;
friend class SessionPool; // for access to stub()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

PS: Is it worth it to try to hide stub()?

std::shared_ptr<SpannerStub> stub() const { return stub_; }

std::string const session_name_;
std::shared_ptr<SpannerStub> const stub_;
};

/**
Expand All @@ -60,11 +65,7 @@ using SessionHolder = std::unique_ptr<Session, std::function<void(Session*)>>;
* like partitioned operations where the `Session` may be used on multiple
* machines and should not be returned to the pool.
*/
template <typename... Args>
SessionHolder MakeDissociatedSessionHolder(Args&&... args) {
return SessionHolder(new Session(std::forward<Args>(args)...),
std::default_delete<Session>());
}
SessionHolder MakeDissociatedSessionHolder(std::string session_name);

} // namespace internal
} // namespace SPANNER_CLIENT_NS
Expand Down
11 changes: 10 additions & 1 deletion google/cloud/spanner/internal/session_pool.cc
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,15 @@ StatusOr<SessionHolder> SessionPool::Allocate(bool dissociate_from_pool) {
}
}

std::shared_ptr<SpannerStub> SessionPool::GetStub(Session const& session) {
std::shared_ptr<SpannerStub> stub = session.stub();
if (stub) return stub;

// Sessions that were created for partitioned Reads/Queries do not have
// their own stub, so return one to use.
return stub_;
}

void SessionPool::Release(Session* session) {
std::unique_lock<std::mutex> lk(mu_);
bool notify = sessions_.empty();
Expand Down Expand Up @@ -172,7 +181,7 @@ StatusOr<std::vector<std::unique_ptr<Session>>> SessionPool::CreateSessions(
sessions.reserve(response->session_size());
for (auto& session : *response->mutable_session()) {
sessions.push_back(google::cloud::internal::make_unique<Session>(
std::move(*session.mutable_name())));
std::move(*session.mutable_name()), stub_));
}
return {std::move(sessions)};
}
Expand Down
5 changes: 5 additions & 0 deletions google/cloud/spanner/internal/session_pool.h
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,11 @@ class SessionPool : public std::enable_shared_from_this<SessionPool> {
*/
StatusOr<SessionHolder> Allocate(bool dissociate_from_pool = false);

/**
* Return a `SpannerStub` to be used when making calls using `session`.
*/
std::shared_ptr<SpannerStub> GetStub(Session const& session);

private:
/**
* Release session back to the pool.
Expand Down
10 changes: 10 additions & 0 deletions google/cloud/spanner/internal/session_pool_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ TEST(SessionPool, Allocate) {
auto session = pool->Allocate();
ASSERT_STATUS_OK(session);
EXPECT_EQ((*session)->session_name(), "session1");
EXPECT_EQ(pool->GetStub(**session), mock);
}

TEST(SessionPool, CreateError) {
Expand Down Expand Up @@ -233,6 +234,15 @@ TEST(SessionPool, MaxSessionsBlockUntilRelease) {
t.join();
}

TEST(SessionPool, GetStubForStublessSession) {
auto mock = std::make_shared<spanner_testing::MockSpannerStub>();
auto db = Database("project", "instance", "database");
auto pool = MakeSessionPool(db, mock);
// ensure we get a stub even if we didn't allocate from the pool.
auto session = MakeDissociatedSessionHolder("session_id");
EXPECT_EQ(pool->GetStub(*session), mock);
}

} // namespace
} // namespace internal
} // namespace SPANNER_CLIENT_NS
Expand Down
1 change: 1 addition & 0 deletions google/cloud/spanner/spanner_client.bzl
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ spanner_client_srcs = [
"internal/partial_result_set_resume.cc",
"internal/partial_result_set_source.cc",
"internal/retry_loop.cc",
"internal/session.cc",
"internal/session_pool.cc",
"internal/spanner_stub.cc",
"internal/time.cc",
Expand Down