Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(new source): add initial websocket source #17856

Closed
wants to merge 7 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
81 changes: 81 additions & 0 deletions src/common/backoff.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use std::time::Duration;

// `tokio-retry` crate
// MIT License
// Copyright (c) 2017 Sam Rijs
//
/// A retry strategy driven by exponential back-off.
///
/// The power corresponds to the number of past attempts.
#[derive(Debug, Clone)]
pub struct ExponentialBackoff {
current: u64,
base: u64,
factor: u64,
max_delay: Option<Duration>,
}

impl ExponentialBackoff {
/// Constructs a new exponential back-off strategy,
/// given a base duration in milliseconds.
///
/// The resulting duration is calculated by taking the base to the `n`-th power,
/// where `n` denotes the number of past attempts.
pub const fn from_millis(base: u64) -> ExponentialBackoff {
ExponentialBackoff {
current: base,
base,
factor: 1u64,
max_delay: None,
}
}

/// A multiplicative factor that will be applied to the retry delay.
///
/// For example, using a factor of `1000` will make each delay in units of seconds.
///
/// Default factor is `1`.
pub const fn factor(mut self, factor: u64) -> ExponentialBackoff {
self.factor = factor;
self
}

/// Apply a maximum delay. No retry delay will be longer than this `Duration`.
pub const fn max_delay(mut self, duration: Duration) -> ExponentialBackoff {
self.max_delay = Some(duration);
self
}

/// Resents the exponential back-off strategy to its initial state.
pub fn reset(&mut self) {
self.current = self.base;
}
}

impl Iterator for ExponentialBackoff {
type Item = Duration;

fn next(&mut self) -> Option<Duration> {
// set delay duration by applying factor
let duration = if let Some(duration) = self.current.checked_mul(self.factor) {
Duration::from_millis(duration)
} else {
Duration::from_millis(std::u64::MAX)
};

// check if we reached max delay
if let Some(ref max_delay) = self.max_delay {
if duration > *max_delay {
return Some(*max_delay);
}
}

if let Some(next) = self.current.checked_mul(self.base) {
self.current = next;
} else {
self.current = std::u64::MAX;
}

Some(duration)
}
}
5 changes: 5 additions & 0 deletions src/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,8 @@ pub(crate) mod sqs;

#[cfg(any(feature = "sources-aws_s3", feature = "sinks-aws_s3"))]
pub(crate) mod s3;

pub mod websocket;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
pub mod websocket;
#[cfg(any(feature = "sources-websocket", feature = "sinks-websocket"))]
pub mod websocket;


pub(crate) mod backoff;
pub(crate) mod ping;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💬 suggestion: ‏This one should arguably collapsed into the websocket module here. I understand the reasoning to make it available since it's generic enough, but it's easy enough to do if we need that later but otherwise it's just lost compilation time for any config not utilizing a websocket component.

29 changes: 29 additions & 0 deletions src/common/ping.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
use std::{
task::{Context, Poll},
time::Duration,
};

use tokio::time;

pub struct PingInterval {
interval: Option<time::Interval>,
}

impl PingInterval {
pub fn new(period: Option<u64>) -> Self {
Self {
interval: period.map(|period| time::interval(Duration::from_secs(period))),
}
}

pub fn poll_tick(&mut self, cx: &mut Context<'_>) -> Poll<time::Instant> {
match self.interval.as_mut() {
Some(interval) => interval.poll_tick(cx),
None => Poll::Pending,
}
}

pub async fn tick(&mut self) -> time::Instant {
std::future::poll_fn(|cx| self.poll_tick(cx)).await
}
}
154 changes: 154 additions & 0 deletions src/common/websocket.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,154 @@
use std::{fmt::Debug, net::SocketAddr, time::Duration};

use snafu::{ResultExt, Snafu};
use tokio::{net::TcpStream, time};
use tokio_tungstenite::{
client_async_with_config,
tungstenite::{
client::{uri_mode, IntoClientRequest},
error::{Error as WsError, ProtocolError, UrlError},
handshake::client::Request as WsRequest,
protocol::WebSocketConfig,
stream::Mode as UriMode,
},
WebSocketStream as WsStream,
};

use crate::{
common::backoff::ExponentialBackoff,
dns,
http::Auth,
internal_events::{WsConnectionEstablished, WsConnectionFailedError},
tls::{MaybeTlsSettings, MaybeTlsStream, TlsError},
};

#[derive(Debug, Snafu)]
#[snafu(visibility(pub))]
pub enum WebSocketError {
#[snafu(display("Creating WebSocket client failed: {}", source))]
CreateFailed { source: WsError },
#[snafu(display("Connect error: {}", source))]
ConnectError { source: TlsError },
#[snafu(display("Unable to resolve DNS: {}", source))]
DnsError { source: dns::DnsError },
#[snafu(display("No addresses returned."))]
NoAddresses,
}

#[derive(Clone)]
pub(crate) struct WebSocketConnector {
uri: String,
host: String,
port: u16,
tls: MaybeTlsSettings,
auth: Option<Auth>,
}

impl WebSocketConnector {
pub(crate) fn new(
uri: String,
tls: MaybeTlsSettings,
auth: Option<Auth>,
) -> Result<Self, WebSocketError> {
let request = (&uri).into_client_request().context(CreateFailedSnafu)?;
let (host, port) = Self::extract_host_and_port(&request).context(CreateFailedSnafu)?;

Ok(Self {
uri,
host,
port,
tls,
auth,
})
}

fn extract_host_and_port(request: &WsRequest) -> Result<(String, u16), WsError> {
let host = request
.uri()
.host()
.ok_or(WsError::Url(UrlError::NoHostName))?
.to_string();
let mode = uri_mode(request.uri())?;
let port = request.uri().port_u16().unwrap_or(match mode {
UriMode::Tls => 443,
UriMode::Plain => 80,
});

Ok((host, port))
}

const fn fresh_backoff() -> ExponentialBackoff {
ExponentialBackoff::from_millis(2)
.factor(250)
.max_delay(Duration::from_secs(60))
}

async fn tls_connect(&self) -> Result<MaybeTlsStream<TcpStream>, WebSocketError> {
let ip = dns::Resolver
.lookup_ip(self.host.clone())
.await
.context(DnsSnafu)?
.next()
.ok_or(WebSocketError::NoAddresses)?;

let addr = SocketAddr::new(ip, self.port);
self.tls
.connect(&self.host, &addr)
.await
.context(ConnectSnafu)
}

async fn connect(&self) -> Result<WsStream<MaybeTlsStream<TcpStream>>, WebSocketError> {
let mut request = (&self.uri)
.into_client_request()
.context(CreateFailedSnafu)?;

if let Some(auth) = &self.auth {
auth.apply(&mut request);
}

let maybe_tls = self.tls_connect().await?;

let ws_config = WebSocketConfig {
max_send_queue: None, // don't buffer messages
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💭 thought: ‏If I recall, this will be the merge conflict to resolve

..Default::default()
};

let (ws_stream, _response) = client_async_with_config(request, maybe_tls, Some(ws_config))
.await
.context(CreateFailedSnafu)?;

Ok(ws_stream)
}

pub(crate) async fn connect_backoff(&self) -> WsStream<MaybeTlsStream<TcpStream>> {
let mut backoff = Self::fresh_backoff();
loop {
match self.connect().await {
Ok(ws_stream) => {
emit!(WsConnectionEstablished {});
return ws_stream;
}
Err(error) => {
emit!(WsConnectionFailedError {
error: Box::new(error)
});
time::sleep(backoff.next().unwrap()).await;
}
}
}
}

pub(crate) async fn healthcheck(&self) -> crate::Result<()> {
self.connect().await.map(|_| ()).map_err(Into::into)
}
}

pub(crate) const fn is_closed(error: &WsError) -> bool {
matches!(
error,
WsError::ConnectionClosed
| WsError::AlreadyClosed
| WsError::Protocol(ProtocolError::ResetWithoutClosingHandshake)
)
}
3 changes: 2 additions & 1 deletion src/config/enterprise.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use super::{
SourceOuter, TransformOuter,
};
use crate::{
common::backoff::ExponentialBackoff,
common::datadog::{get_api_base_endpoint, get_base_domain_region, Region},
conditions::AnyCondition,
http::{HttpClient, HttpError},
Expand All @@ -29,7 +30,7 @@ use crate::{
default_site, logs::DatadogLogsConfig, metrics::DatadogMetricsConfig,
DatadogCommonConfig,
},
util::{http::RequestConfig, retries::ExponentialBackoff},
util::http::RequestConfig,
},
sources::{
host_metrics::{Collector, HostMetricsConfig},
Expand Down
21 changes: 21 additions & 0 deletions src/internal_events/websocket.rs
Original file line number Diff line number Diff line change
Expand Up @@ -89,3 +89,24 @@ impl InternalEvent for WsConnectionError {
Some("WsConnectionError")
}
}

pub struct WsMessageReceived {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💬 suggestion: ‏This should adhere more closely to the component spec (https://github.com/vectordotdev/vector/blob/master/docs/specs/component.md)

, the properties of count and byte_size are missing.

Additionally, the source should emit https://github.com/vectordotdev/vector/blob/master/docs/specs/component.md#componentbytesreceived
, which the protocol should be websocket.

That might make sense to have as a separate event, because (this is another thing and isn't directly related to your changes but) I think the websocket sink might not be emitting the EventsReceived , in which case it could use this WsEventReceived. But the sink is using the run_and_assert_compliance_ unit test helper that should be validating that 🤔 hmm so that might not be valid.

See the HttpEventsReceived and HttpBytesReceived, for reference.

pub url: String,
}

impl InternalEvent for WsMessageReceived {
fn emit(self) {
trace!(
message = "Event received.",
url = %self.url,
);
counter!(
"component_received_events_total", 1,
"uri" => self.url.clone(),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🔨 warning: ‏I believe this clone() isn't necessary. If you haven't already, definitely run make check-clippy, good practice to do before each push, as that is checked in CI.

);
}

fn name(&self) -> Option<&'static str> {
Some("WsMessageReceived")
}
}
1 change: 1 addition & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ pub mod async_read;
pub mod aws;
#[allow(unreachable_pub)]
pub mod codecs;
#[allow(unreachable_pub)]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💬 suggestion: ‏I think we'd typically prefer to resolve this by using pub(crate) on the affected items.

pub(crate) mod common;
pub mod encoding_transcode;
pub mod enrichment_tables;
Expand Down
Loading