-
Notifications
You must be signed in to change notification settings - Fork 1.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat(new source): add initial websocket
source
#17856
Conversation
✅ Deploy Preview for vrl-playground canceled.
|
✅ Deploy Preview for vector-project canceled.
|
Hi @torrefatto, thanks for your proposed new integration! Just an FYI, per https://github.com/vectordotdev/vector/blob/master/CONTRIBUTING.md#new-sources-sinks-and-transforms , we will begin with proceduing through that checklist prior to reviewing the code. No need to write up answers to the checklist questions at this stage, I will inquire about anything in this PR's comment thread.
Where possible, we like to avoid adding integrations that are specific to services (though obviously there have been cases of that historically). I would ask- what elements particularly in the source configuration for the Relatedly, we hopefully someday soon will have a plugin system where by some integrations could by maintained outside of the main Vector repo. That could be one potentiality for a future
Nice work!
I will help out with this post completion of the aforementioned checklist. Regarding our checklist-
We may reach out with follow-up queries in the coming days. |
Hi @neuronull!
Sorry! I totally overlooked that one.
The protocol is the same (
We think it cloud be pretty painful. Of course, we would provide an adequate free-of-charge robot account to perform integration tests, if the second
That would be awesome! Is there any roadmap, any plan in which the team commits to implementing such plugin system?
Thanks! And again, sorry for missing it.
I for sure can commit to maintain the
I am looking forward for them! Thanks again! |
websocket
source
No worries! There is no realistic way to make sure everyone is aware of it.
Just hypothesizing here to make sure I understand the pain- would the same thing be accomplishable by having some kind of shell script that queried that other API to retrieve the opaque ID , and generate the vector configuration for the
Having a means to integration-test the prospective
This has been an aspiration for a while now. It's not tracked on a public roadmap but it is definitely something we want to do, it's mostly a question of how soon. I can say we are not planning on it in the next 3 months.
Super! This helps. |
Hi @torrefatto ! We had an internal discussion on this and wanted to lay out some more topics to consider:
Thanks! |
Hi! Sorry for taking so long to reply. The heat wave hit hard 🥵
We run a serverless platform: we basically let people run containers on our servers. We
Our systems hold a backlog of the whole data and we expose in the API a
This touches another part of why we would like to also include a
This would really require something more elaborate than the
I get your point, but I might also add that our API is really not much more than a Again, sorry for the late reply. Let me know what do you think of the picture I Thanks again! |
Hey @torrefatto ! Thanks for providing all those details, that really helps us frame it, and also helps us understand better the value of a
This is an interesting development. We have had a solid demand for a Would that mean the In the meantime, I will share these new details with the team. Thanks! |
One other thing to follow up on:
Curious if there was any traction on a commitment at the company-level, to maintain this/these sources? |
Hi @neuronull!
Exactly!
I talked with @bchatelard and he confirmed that @koyeb is willing to commit to maintain these sources, were them be accepted upstream 💪 |
Hi @torrefatto , wanted to convey an update- we are still finalizing input from stakeholders but we're pretty confident that we would accept this new source, and the following ones. 🎉 I'll be taking a look at your code in this PR for some initial feedback. |
That's awesome @neuronull! I see that I have a conflict. Would you like me to rebase or merge from master? |
Merging is preferred to keep the commit history and make reviews easier (reviewers can just review new changes). When the PR merges it'll be squashed down to one commit. |
In the same vein, avoiding force-pushing is greatly appreciated 🙏 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for extracting shared code with the sink! Overall this looks pretty good, I did a first pass and while it looks like a lot of suggestions I made, they aren't massive changes needed.
You mentioned in the comment unit tests, and yes those will be needed. You can probably do something reciprocal to the websocket
sink. We have some assert_source_compliance
test helpers you can grep for, you can run the source with this wrapper and it validates that the correct internal telemetry is emitted to adhere to the component specification.
Beyond that it's ideal to cover a happy path and as many error paths as reasonable.
I think this component is OK without an integration test.
@@ -30,6 +30,7 @@ pub mod unix; | |||
mod unix_datagram; | |||
#[cfg(all(unix, feature = "sources-utils-net-unix"))] | |||
mod unix_stream; | |||
pub mod websocket; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💬 suggestion: We'll want to gate this on a new feature defined in the Cargo.toml , sources-websocket
.
@@ -17,3 +17,8 @@ pub(crate) mod sqs; | |||
|
|||
#[cfg(any(feature = "sources-aws_s3", feature = "sinks-aws_s3"))] | |||
pub(crate) mod s3; | |||
|
|||
pub mod websocket; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pub mod websocket; | |
#[cfg(any(feature = "sources-websocket", feature = "sinks-websocket"))] | |
pub mod websocket; |
pub mod websocket; | ||
|
||
pub(crate) mod backoff; | ||
pub(crate) mod ping; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💬 suggestion: This one should arguably collapsed into the websocket module here. I understand the reasoning to make it available since it's generic enough, but it's easy enough to do if we need that later but otherwise it's just lost compilation time for any config not utilizing a websocket component.
let maybe_tls = self.tls_connect().await?; | ||
|
||
let ws_config = WebSocketConfig { | ||
max_send_queue: None, // don't buffer messages |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💭 thought: If I recall, this will be the merge conflict to resolve
@@ -89,3 +89,24 @@ impl InternalEvent for WsConnectionError { | |||
Some("WsConnectionError") | |||
} | |||
} | |||
|
|||
pub struct WsMessageReceived { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💬 suggestion: This should adhere more closely to the component spec (https://github.com/vectordotdev/vector/blob/master/docs/specs/component.md)
, the properties of count
and byte_size
are missing.
Additionally, the source should emit https://github.com/vectordotdev/vector/blob/master/docs/specs/component.md#componentbytesreceived
, which the protocol
should be websocket
.
That might make sense to have as a separate event, because (this is another thing and isn't directly related to your changes but) I think the websocket sink might not be emitting the EventsReceived , in which case it could use this WsEventReceived
. But the sink is using the run_and_assert_compliance_
unit test helper that should be validating that 🤔 hmm so that might not be valid.
See the HttpEventsReceived
and HttpBytesReceived
, for reference.
}).map_err(|err| {error!("Failed to process binary message: {}", err);}) { | ||
Ok(_) => Ok(()), | ||
Err(e) => { | ||
error!("Failed to send binary message: {:?}", e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💬 suggestion: I think these error cases should have a internal event structure that is emitted, and increments component_errors_total
(https://github.com/vectordotdev/vector/blob/master/docs/specs/instrumentation.md#Error)
}).map(|evt| async { | ||
handle_text_message(&mut out.clone(), evt, config.uri.clone()).await | ||
}).ok_or(())?.await; | ||
Ok::<(), ()>(()) | ||
}).map_err(|err| {error!("Failed to process binary message: {}", err);}) { | ||
Ok(_) => Ok(()), | ||
Err(e) => { | ||
error!("Failed to send binary message: {:?}", e); | ||
Ok(()) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🥜 nitpick: this is a bit ugly from a readability perspective. Extracting the Message::Binary case to a helper function might help a little bit, but I can't help but wonder if there is a little cleaner way to write this.
Ok(Message::Frame(_)) => { | ||
warn!("Unsupported message type received: frame"); | ||
Ok(()) | ||
}, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Do you know what scenario(s) this might occur in?
}, | ||
|
||
Ok(Message::Close(_)) => { | ||
info!("Received message: connection closed from server"); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💭 thought: I think since we are emitting the WsConnectionShutdown
below which logs a warn
, I think we should remove this one. WDYT?
async fn handle_text_message<'a>( | ||
out: &mut SourceSender, | ||
msg: WebSocketEvent<'a>, | ||
endpoint: String, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💬 suggestion: I think the endpoint can be passed in as a reference to avoid the clone, and the WsMessageReceived
can specify the lifetime like
pub uri: &'a str,
Hi! I just wanted to say that I am working on the PR. Thanks for the useful comments, @neuronull! @jszwedko don't worry, I won't force-push :pinky-promise: |
Hi team! Sorry for being silent for so long. The summer took quite some time away 🌴 🍹 |
very excited about the generic websocket source which would be amazing to have (esp if it handles reconnecting and such properly) <3 |
Hi @torrefatto! Wondering if you are still planning to work on this? We would also love to see this source added! |
Hi @yalinglee Apologies for the long silence and thanks for reanimating this conversation. I am still willing to work on this, but I am unfortunately not able to do so during working hours anymore (priorities changed at $DAYJOB). I have to fit this in my scarce free time. The first thing that I need to do is to update this PR with the upstream changed that have happened so far. I will try to update you by the end of next week. |
@torrefatto That's totally understandable! I was just curious about the status of this PR so no pressure! And really appreciate you using your precious free time to work on this! |
Thank you for your contribution to Vector! To keep the repository tidy and focused, we are closing this PR due to inactivity. We greatly appreciate the time and effort you've put into this PR.If you'd like to continue working on it, we encourage you to re-open the PR and we would be delighted to review it again. Before re-opening, please use |
Hi, and thanks for this nice software!
At koyeb we are interested in using
vector
to allow our users to forward the logs of their applications towards external destinations. Our API for receiving logs is exposed as a websocket source. Our plan is to add support for a genericwebsocket
source, and then, in a following PR, adding support for a customkoyeb
source in order to make easier for our users to configure vector to use with our API, in the same spirit as the heroku_logs source. Do you think this is an acceptable plan?I noticed that
vector
has an open issue (#6491) to track the addition of awebsocket
source, and some preliminary (although rough) work was done in this closed PR.I did not start from there, as the fork from which it started was quite old. I tried instead to look at other existing sources and draw from there.
That said, rust is not my primary language and I'd really like some guidance, in order to improve the code currently submitted. I know that something missing are the tests (unit and integration). Can you point me to some code that you deem a good example for these? What else is missing?
Let me know your thoughts. And thanks again!