-
Notifications
You must be signed in to change notification settings - Fork 315
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Update event stream #6853
Update event stream #6853
Conversation
Signed-off-by: David McNeil <[email protected]>
Signed-off-by: David McNeil <[email protected]>
Signed-off-by: David McNeil <[email protected]>
Signed-off-by: David McNeil <[email protected]>
Signed-off-by: David McNeil <[email protected]>
Signed-off-by: David McNeil <[email protected]>
Hello davidMcneil! Thanks for the pull request! Here is what will happen next:
Thank you for contributing! |
This resolves #6740 by adding the |
Signed-off-by: David McNeil <[email protected]>
f299d31
to
60df8c6
Compare
Unfortunately, we were not able to avoid forking the
The fork can be found here. |
Signed-off-by: David McNeil <[email protected]>
35e4fdb
to
d137ddc
Compare
Signed-off-by: David McNeil <[email protected]>
Signed-off-by: David McNeil <[email protected]>
Signed-off-by: David McNeil <[email protected]>
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Looks good overall... I had a couple observations / documentation tweaks, though. I think we may need to adjust how we're handling subjects, too.
Nice work!
components/sup/src/event.rs
Outdated
trace!("About to queue an event: {:?}", event); | ||
if let Err(e) = self.0.unbounded_send(event) { | ||
error!("Failed to queue event: {:?}", e); | ||
if let Err(e) = self.0.try_send(event) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Probably worth documenting here that if we fill up the channel (because we're not currently connected to the NATS server), then we'll drop additional messages on the floor here because try_send
will return an error.
Actually, it'd be good to use TrySendError::is_full()
to get more information in our error logging.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The current error message we give when try_send
fails due to a full channel is Failed to queue event: send failed because channel is full
. Is there something more you would like to see?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, that's good!
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think a comment in the code about dropping messages would still be useful as a documentation of intent.
components/sup/src/event/stream.rs
Outdated
Ok(()) | ||
}); | ||
|
||
Runtime::new().expect("Couldn't create event stream runtime!") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's probably worth noting here: the reason all this was initially running in a thread in the first place is that the nitox
library had an issue where it didn't play well with other futures on its reactor. To work around that, I just put it off on its own reactor on a separate thread.
Since rust-nats
presumably doesn't have that issue, we could theoretically move all this into running directly on the Supervisor's main reactor. If we were to do that, however, we'd need to do it in such a way that we could cleanly shut it down when the Supervisor needs to come down, or run into the same underlying issue that was behind #6712 and fixed by #6717.
(It's perfectly fine to leave it as is; I'm just thinking it would be good to leave a comment here to ensure that any well-meaning refactoring engineer that comes after us knows what the trade-offs are.)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These commits here and here address this.
It seems like we need a more robust and ergonomic solution for ensuring all futures are stopped before shutting down. This is outside the scope of this PR, but just to get some ideas out there.
What if we made a "wrapper" that wraps two runtimes. This "wrapper" could have a spawn_divergent
(or something to indicate that this future does not end) call. This would spawn the future on a runtime that calls shutdown_now
instead of shutdown_on_idle
when we shutdown.
Not sold on this solution, but it would be nice to not have to keep a handle for every divergent future. I wonder how other tokio projects handle correctly ending unbounded futures. Thoughts?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Neat 😄
I'm not sure what the best way going forward should be for this. I like the "handle" approach, since it's explicit, but it does require a little bookkeeping. I haven't seen other approaches for this, though (which is what motivated that handle solution initially).
Your "two Runtimes" approach is also an interesting one, and is worth digging into, I think.
As long as the code in this PR doesn't get us back into a 0.83.0 bug situation, I'm 👍 on merging it.
components/sup/src/event/stream.rs
Outdated
@@ -11,7 +11,7 @@ use tokio::{prelude::Stream, | |||
runtime::current_thread::Runtime}; | |||
|
|||
/// All messages are published under this subject. | |||
const HABITAT_SUBJECT: &str = "habitat"; | |||
const HABITAT_SUBJECT: &str = "habitat.event.healthcheck"; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We send out more events than just healthchecks, though... if we're going to have different subjects per event, we'll need to handle that a little differently.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this! I will follow up with the automate team and see what they expect.
Signed-off-by: David McNeil <[email protected]>
Signed-off-by: David McNeil <[email protected]>
e62b6ef
to
b9d80a2
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Once the event subject situation is resolved, I'm 👍
Signed-off-by: David McNeil <[email protected]>
@davidMcneil if a user sets Automate may be unavailable because of an upgrade, failure, etc and users may want to still start the supervisor & services. It sounds like setting the value to |
@ericcalabretta Regardless of the value of |
@davidMcneil That's perfect, thanks for the clarifications. |
Resolves #6761 #6740
This PR first addresses #6740 using the existing
natsio
andnitox
nats clients. The PR then removes those clients and switches to using a non-streaming clientrust-nats
. In initial testing,rust-nats
and non-streaming in general appear much more reliable. However, testing was done on a standalone nats server instead of directly with automate. This is blocked on automate supporting a plain nats connection.There are several TODOs in this PR. Those should be straightforward to address once we verify this is the direction we want to go for our nats client. #6770 is the spike to evaluate the nats clients.