Skip to content

Commit

Permalink
fixes to stream auth
Browse files Browse the repository at this point in the history
  • Loading branch information
dpcollins-google committed Dec 3, 2021
1 parent 76ae62f commit 80edd64
Showing 1 changed file with 17 additions and 12 deletions.
29 changes: 17 additions & 12 deletions google/cloud/internal/async_read_write_stream_auth.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#include "google/cloud/version.h"
#include <functional>
#include <memory>
#include <thread>
#include <mutex>

namespace google {
namespace cloud {
Expand All @@ -47,14 +47,8 @@ class AsyncStreamingReadWriteRpcAuth

future<bool> Start() override {
using Result = StatusOr<std::unique_ptr<grpc::ClientContext>>;

std::unique_ptr<grpc::ClientContext> context;
{
std::lock_guard<std::mutex> g{state_->mu};
context = std::move(state_->initial_context);
}
auto weak = std::weak_ptr<SharedState>(state_);
return auth_->AsyncConfigureContext(std::move(context))
return auth_->AsyncConfigureContext(state_->ReleaseInitialContext())
.then([weak](future<Result> f) mutable {
if (auto state = weak.lock()) return state->OnStart(f.get());
return make_ready_future(false);
Expand Down Expand Up @@ -83,7 +77,18 @@ class AsyncStreamingReadWriteRpcAuth

private:
struct SharedState {
SharedState(StreamFactory factory, std::unique_ptr<grpc::ClientContext> initial_context) : factory(std::move(factory)), initial_context(std::move(initial_context)), stream(absl::make_unique<AsyncStreamingReadWriteRpcError<Request, Response>>(Status(StatusCode::kFailedPrecondition, "Stream is not yet started."))) {}
SharedState(StreamFactory factory,
std::unique_ptr<grpc::ClientContext> initial_context)
: factory(std::move(factory)),
initial_context(std::move(initial_context)),
stream(absl::make_unique<
AsyncStreamingReadWriteRpcError<Request, Response>>(
Status(StatusCode::kInternal, "Stream is not yet started."))) {}

std::unique_ptr<grpc::ClientContext> ReleaseInitialContext() {
std::lock_guard<std::mutex> g{mu};
return std::move(initial_context);
}

future<bool> OnStart(StatusOr<std::unique_ptr<grpc::ClientContext>> context) {
std::lock_guard<std::mutex> g{mu};
Expand Down Expand Up @@ -111,15 +116,15 @@ class AsyncStreamingReadWriteRpcAuth
stream->Cancel();
}

const StreamFactory factory;
StreamFactory const factory;
std::mutex mu;
std::unique_ptr<grpc::ClientContext> initial_context; // ABSL_GUARDED_BY(mu)
std::unique_ptr<AsyncStreamingReadWriteRpc<Request, Response>> stream; // ABSL_GUARDED_BY(mu)
bool cancelled = false; // ABSL_GUARDED_BY(mu)
};

const std::shared_ptr<GrpcAuthenticationStrategy> auth_;
const std::shared_ptr<SharedState> state_;
std::shared_ptr<GrpcAuthenticationStrategy> const auth_;
std::shared_ptr<SharedState> const state_;
};

} // namespace internal
Expand Down

0 comments on commit 80edd64

Please sign in to comment.