-
Notifications
You must be signed in to change notification settings - Fork 388
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
impl(pubsublite): resumable async streaming read write rpc #8233
impl(pubsublite): resumable async streaming read write rpc #8233
Conversation
…sync_streaming_read_write_rpc
…sync_streaming_read_write_rpc
google/cloud/internal/resumable_async_streaming_read_write_rpc.h
Outdated
Show resolved
Hide resolved
google/cloud/internal/resumable_async_streaming_read_write_rpc.h
Outdated
Show resolved
Hide resolved
google/cloud/internal/resumable_async_streaming_read_write_rpc.h
Outdated
Show resolved
Hide resolved
google/cloud/internal/resumable_async_streaming_read_write_rpc.h
Outdated
Show resolved
Hide resolved
google/cloud/internal/resumable_async_streaming_read_write_rpc.h
Outdated
Show resolved
Hide resolved
google/cloud/internal/resumable_async_streaming_read_write_rpc.h
Outdated
Show resolved
Hide resolved
google/cloud/internal/resumable_async_streaming_read_write_rpc.h
Outdated
Show resolved
Hide resolved
google/cloud/internal/resumable_async_streaming_read_write_rpc.h
Outdated
Show resolved
Hide resolved
google/cloud/internal/resumable_async_streaming_read_write_rpc.h
Outdated
Show resolved
Hide resolved
google/cloud/internal/resumable_async_streaming_read_write_rpc.h
Outdated
Show resolved
Hide resolved
google/cloud/internal/resumable_async_streaming_read_write_rpc.h
Outdated
Show resolved
Hide resolved
google/cloud/internal/resumable_async_streaming_read_write_rpc.h
Outdated
Show resolved
Hide resolved
google/cloud/internal/resumable_async_streaming_read_write_rpc.h
Outdated
Show resolved
Hide resolved
google/cloud/internal/resumable_async_streaming_read_write_rpc.h
Outdated
Show resolved
Hide resolved
google/cloud/internal/resumable_async_streaming_read_write_rpc.h
Outdated
Show resolved
Hide resolved
google/cloud/internal/resumable_async_streaming_read_write_rpc.h
Outdated
Show resolved
Hide resolved
google/cloud/internal/resumable_async_streaming_read_write_rpc.h
Outdated
Show resolved
Hide resolved
Google Cloud Build Logs
ℹ️ NOTE: Kokoro logs are linked from "Details" below. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am sorry, but this is still too large for me to grok. Could we break it down further? For example, what if we implement only Start()
and Shutdown()
?
I added a few comments, but they are fairly superficial. I have not even looked at the tests.
google/cloud/pubsublite/internal/resumable_async_streaming_read_write_rpc.h
Outdated
Show resolved
Hide resolved
google/cloud/pubsublite/internal/resumable_async_streaming_read_write_rpc.h
Outdated
Show resolved
Hide resolved
google/cloud/pubsublite/internal/resumable_async_streaming_read_write_rpc.h
Outdated
Show resolved
Hide resolved
google/cloud/pubsublite/internal/resumable_async_streaming_read_write_rpc.h
Outdated
Show resolved
Hide resolved
google/cloud/pubsublite/internal/resumable_async_streaming_read_write_rpc.h
Outdated
Show resolved
Hide resolved
Google Cloud Build Logs
ℹ️ NOTE: Kokoro logs are linked from "Details" below. |
Google Cloud Build Logs
ℹ️ NOTE: Kokoro logs are linked from "Details" below. |
Google Cloud Build Logs
ℹ️ NOTE: Kokoro logs are linked from "Details" below. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think there are a few outstanding questions left. I am using Reviewable to track them since it does a better job than GitHub when you get to these many things to talk about.
Reviewed 5 of 8 files at r2, 2 of 5 files at r7, 1 of 7 files at r8, 6 of 8 files at r10, 1 of 3 files at r13, 9 of 15 files at r15, 7 of 8 files at r18, 1 of 1 files at r19, all commit messages.
Reviewable status: all files reviewed, 128 unresolved discussions (waiting on @18suresha and @dpcollins-google)
google/cloud/pubsublite/internal/resumable_async_streaming_read_write_rpc.h, line 234 at r2 (raw file):
Previously, dpcollins-google wrote…
this future needs to be protected in some way. it is out of band for the lifecycle of this object, and doesn't hold a reference to this object.
I think all state for this class unfortunately needs to be encapsulated in a "struct State" with a shared_ptr.
Alternatively, 1) use a public base of std::enable_shared_from_this and 2) create a wrapper implementation that delegates to a shared_ptr owned version and 3) friend the factory function and make constructors private.
What did you decide w.r.t. the lifecycle of this class?
google/cloud/pubsublite/internal/resumable_async_streaming_read_write_rpc.h, line 57 at r10 (raw file):
using AsyncSleeper = std::function<future<void>(std::chrono::duration<double>)>; using RetryPolicyFactory = std::function<std::unique_ptr<RetryPolicy>()>;
That is not how we create policies in all the other libraries. What is the motivation to have a different mechanism to create policies?
google/cloud/pubsublite/internal/resumable_async_streaming_read_write_rpc.h, line 60 at r10 (raw file):
template <typename RequestType, typename ResponseType> class ResumableAsyncStreamingReadWriteRpc {
This looks awfully similar to google::cloud::internal::AsyncStreamingReadWriteRpc<>
. What is the motivation to have a new interface? Can we just use the existing interface and maybe not use the methods that are not needed? Or make them trivial?
google/cloud/pubsublite/internal/resumable_async_streaming_read_write_rpc.h, line 142 at r10 (raw file):
GCP_LOG(WARNING) << "`Finish` must be called and finished before object " "goes out of scope."; assert(false);
Please use GCP_LOG(FATAL)
instead of assert()
and use those very sparingly. Crashing customer code is not something we should do unless there is no other option.
google/cloud/pubsublite/internal/resumable_async_streaming_read_write_rpc.h, line 35 at r14 (raw file):
Previously, dpcollins-google wrote…
We don't expect these retry policies to be user-specifyable. Instead, we will define the retry policies for different stream types internally.
Not for this PR: that would be different from all the other services. I expect users should be able to specify the policies controlling the initial connection.
google/cloud/pubsublite/internal/resumable_async_streaming_read_write_rpc.h, line 141 at r14 (raw file):
Previously, dpcollins-google wrote…
This is a sanity check assert in debug mode, hence why it's a LOG(WARNING) when not in NDEBUG.
This is the equivalent of
LOG(DFATAL)
Be aware that there is no guarantee the application is being build with the same -DNDEBUG
(or whatever other macro) as the library. The library could be built in release mode and the application in debug mode or vice-versa. Generally, we do not rely on assert()
or GCP_LOG(FATAL)
. If there is a possible error condition, return an error. Don't be surprised if I go through this code and rip out all the assert()
statements.
google/cloud/pubsublite/internal/resumable_async_streaming_read_write_rpc.h, line 206 at r14 (raw file):
write_reinit_done_.emplace(); return write_reinit_done_->get_future().then( [](future<void>) { return make_ready_future(false); });
You can just return false
from a .then()
, but it is unclear to me why you need this.
google/cloud/pubsublite/internal/resumable_async_streaming_read_write_rpc.h, line 389 at r16 (raw file):
return write_reinit_done.then( [](future<void>) { return make_ready_future(false); });
I think you can just do return false;
in this lambda
google/cloud/pubsublite/internal/resumable_async_streaming_read_write_rpc.h, line 413 at r16 (raw file):
Previously, dpcollins-google wrote…
For complex reasons. This should have been fixed per above, but doesn't look like it was.
Ping
.gitignore, line 25 at r10 (raw file):
.idea/ cmake-build-*/ .clwb/
nit: this should be in a separate CL.
google/cloud/pubsublite/CMakeLists.txt, line 188 at r10 (raw file):
INTERFACE ${CMAKE_CURRENT_SOURCE_DIR}/mocks/mock_admin_connection.h ${CMAKE_CURRENT_SOURCE_DIR}/internal/mock_async_reader_writer.h ${CMAKE_CURRENT_SOURCE_DIR}/internal/mock_backoff_policy.h
Typically we do not expose mocks for our own use in the *_mocks
libraries, which are intended for customers to use. I suspect the retry and backoff policy mocks really belong in a *_testing
library.
google/cloud/pubsublite/internal/futures.h, line 26 at r14 (raw file):
template <class T> struct ChainFutureImpl {
Can you add some comments explaining what this is for or why it is needed?
google/cloud/pubsublite/internal/resumable_async_streaming_read_write_rpc.h, line 413 at r16 (raw file): Previously, coryan (Carlos O'Ryan) wrote…
Restructured to no longer have this assertion |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: all files reviewed, 127 unresolved discussions (waiting on @18suresha, @coryan, and @dpcollins-google)
google/cloud/pubsublite/internal/resumable_async_streaming_read_write_rpc.h, line 57 at r10 (raw file):
Previously, coryan (Carlos O'Ryan) wrote…
That is not how we create policies in all the other libraries. What is the motivation to have a different mechanism to create policies?
Other libraries use a template to accept any kind of RetryPolicy as RetryPolicy does not have clone() in its interface (just in TraitBasedRetryPolicy, which is itself a template). This makes it possible to construct retry policies without needing to add an additional template parameter.
google/cloud/pubsublite/internal/resumable_async_streaming_read_write_rpc.h, line 60 at r10 (raw file):
Previously, coryan (Carlos O'Ryan) wrote…
This looks awfully similar to
google::cloud::internal::AsyncStreamingReadWriteRpc<>
. What is the motivation to have a new interface? Can we just use the existing interface and maybe not use the methods that are not needed? Or make them trivial?
Sorry, didn't we discuss this already in the brainstorming doc? Is this comment old?
google/cloud/pubsublite/internal/resumable_async_streaming_read_write_rpc.h, line 142 at r10 (raw file):
Previously, coryan (Carlos O'Ryan) wrote…
Please use
GCP_LOG(FATAL)
instead ofassert()
and use those very sparingly. Crashing customer code is not something we should do unless there is no other option.
See above: this is not supposed to crash customer code if not built in debug mode, just LOG(WARNING).
google/cloud/pubsublite/internal/resumable_async_streaming_read_write_rpc.h, line 35 at r14 (raw file):
Previously, coryan (Carlos O'Ryan) wrote…
Not for this PR: that would be different from all the other services. I expect users should be able to specify the policies controlling the initial connection.
This is expected- we don't want users to be able to reconfigure which errors are considered retryable or to retry immediately on failures and potentially overload the backend even more than they would otherwise.
google/cloud/pubsublite/internal/resumable_async_streaming_read_write_rpc.h, line 141 at r14 (raw file):
Don't be surprised if I go through this code and rip out all the assert() statements.
We're not relying on the assert() statements for correctness- they're intended to be sanity checks for tests that the state of optionals is what we expect.
google/cloud/pubsublite/internal/resumable_async_streaming_read_write_rpc.h, line 206 at r14 (raw file):
Previously, coryan (Carlos O'Ryan) wrote…
You can just
return false
from a.then()
, but it is unclear to me why you need this.
You need to delay the Write() future on reinitialization completing so they can retry the Write() immediately. Basically, this avoids thrashing when in the retry loop when compared to the alternative of returning false
immediately.
The lifecycle constraints are satisfied by the semantics of this class. |
google/cloud/pubsublite/internal/resumable_async_streaming_read_write_rpc.h, line 60 at r10 (raw file): Previously, dpcollins-google wrote…
Stale comment, please ignore. |
google/cloud/pubsublite/internal/resumable_async_streaming_read_write_rpc.h, line 206 at r14 (raw file): Previously, dpcollins-google wrote…
How is |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: all files reviewed, 124 unresolved discussions (waiting on @18suresha, @coryan, and @dpcollins-google)
google/cloud/pubsublite/CMakeLists.txt, line 188 at r10 (raw file):
Previously, coryan (Carlos O'Ryan) wrote…
Typically we do not expose mocks for our own use in the
*_mocks
libraries, which are intended for customers to use. I suspect the retry and backoff policy mocks really belong in a*_testing
library.
stale comment?
#8441
google/cloud/pubsublite/internal/futures.h, line 26 at r14 (raw file):
Previously, coryan (Carlos O'Ryan) wrote…
Can you add some comments explaining what this is for or why it is needed?
This is resolved here #8441 right?
google/cloud/pubsublite/CMakeLists.txt, line 188 at r10 (raw file): Previously, 18suresha (Adarsh Suresh) wrote…
Yes, stale. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewable status: all files reviewed, 123 unresolved discussions (waiting on @18suresha, @coryan, and @dpcollins-google)
google/cloud/pubsublite/internal/resumable_async_streaming_read_write_rpc.h, line 206 at r14 (raw file):
Previously, coryan (Carlos O'Ryan) wrote…
How is
some_future.then([](auto) { return false; });
different fromsome_future.then([](auto) { return make_ready_future(false); });
Both returnfuture<bool>
and in both cases it is satisfied as soon as (and no earlier than) whensome_future
is satisfied?
Sorry, I thought by it is unclear to me why you need this.
the this
was chaining off of write_reinit_done. Agreed that some_future.then([](auto) { return false; });
is preferred.
google/cloud/pubsublite/internal/futures.h, line 26 at r14 (raw file): Previously, 18suresha (Adarsh Suresh) wrote…
Yes, stale. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry about the stale comments. I think there are only a couple of things around make_ready_future
left?
Reviewable status: all files reviewed, 118 unresolved discussions (waiting on @18suresha, @coryan, and @dpcollins-google)
Google Cloud Build Logs
ℹ️ NOTE: Kokoro logs are linked from "Details" below. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Reviewed 1 of 1 files at r20, 1 of 1 files at r21, all commit messages.
Reviewable status: all files reviewed, 114 unresolved discussions (waiting on @18suresha and @dpcollins-google)
google/cloud/pubsublite/internal/resumable_async_streaming_read_write_rpc.h, line 57 at r10 (raw file):
Previously, dpcollins-google wrote…
Other libraries use a template to accept any kind of RetryPolicy as RetryPolicy does not have clone() in its interface (just in TraitBasedRetryPolicy, which is itself a template). This makes it possible to construct retry policies without needing to add an additional template parameter.
Other libraries fix the retry error codes that are allowed, and define something from TraitsBasedRetryPolicy
to enforce their use for each service. I am not too worried at the API at this level, we can do whatever you need. But the API exposed to the application developers need to match what we expose in other libraries. We can discuss that in detail when we get there.
google/cloud/pubsublite/internal/resumable_async_streaming_read_write_rpc.h, line 35 at r14 (raw file):
Previously, dpcollins-google wrote…
This is expected- we don't want users to be able to reconfigure which errors are considered retryable or to retry immediately on failures and potentially overload the backend even more than they would otherwise.
I understand the desire to control these things. In general, the C++ libraries offer good retry and backoff policies by default, and we recommend that customers use them. But we do allow changes. If folks want to change the number of retry attempts (or the maximum duration of the retries) or want to tweak the maximum backoff then we will let them. Again, we can discuss this later.
Google Cloud Build Logs
ℹ️ NOTE: Kokoro logs are linked from "Details" below. |
Implements
Read
andWrite
of a generic resumable bidi rpc async stream following #8450.This change isdata:image/s3,"s3://crabby-images/d0bb7/d0bb7f7625ca5bf5c3cf7a2b7a514cf841ab8395" alt="Reviewable"