diff --git a/Cargo.toml b/Cargo.toml index 33616b5f7b2e4..753871b3a979a 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -744,6 +744,7 @@ sinks-logs = [ "sinks-vector", "sinks-webhdfs", "sinks-websocket", + "sinks-websocket-server", ] sinks-metrics = [ "sinks-appsignal", @@ -813,6 +814,7 @@ sinks-statsd = ["sinks-utils-udp", "tokio-util/net"] sinks-utils-udp = [] sinks-vector = ["sinks-utils-udp", "dep:tonic", "protobuf-build", "dep:prost"] sinks-websocket = ["dep:tokio-tungstenite"] +sinks-websocket-server = ["dep:tokio-tungstenite", "sources-utils-http-auth", "sources-utils-http-error", "sources-utils-http-prelude"] sinks-webhdfs = ["dep:opendal"] # Identifies that the build is a nightly build diff --git a/changelog.d/22213_websocket_server_sink.feature.md b/changelog.d/22213_websocket_server_sink.feature.md new file mode 100644 index 0000000000000..3fc211f51dfb6 --- /dev/null +++ b/changelog.d/22213_websocket_server_sink.feature.md @@ -0,0 +1,3 @@ +Add `websocket_server` sink that acts as a websocket server and broadcasts events to all clients. + +authors: esensar diff --git a/src/internal_events/mod.rs b/src/internal_events/mod.rs index 85bdbad78bef1..12a323214ec6d 100644 --- a/src/internal_events/mod.rs +++ b/src/internal_events/mod.rs @@ -125,8 +125,10 @@ mod template; mod throttle; mod udp; mod unix; -#[cfg(feature = "sinks-websocket")] +#[cfg(any(feature = "sinks-websocket", feature = "sinks-websocket-server"))] mod websocket; +#[cfg(feature = "sinks-websocket-server")] +mod websocket_server; #[cfg(any( feature = "sources-file", @@ -259,8 +261,10 @@ pub(crate) use self::tag_cardinality_limit::*; pub(crate) use self::throttle::*; #[cfg(unix)] pub(crate) use self::unix::*; -#[cfg(feature = "sinks-websocket")] +#[cfg(any(feature = "sinks-websocket", feature = "sinks-websocket-server"))] pub(crate) use self::websocket::*; +#[cfg(feature = "sinks-websocket-server")] +pub(crate) use self::websocket_server::*; #[cfg(windows)] pub(crate) use self::windows::*; pub use self::{ diff --git a/src/internal_events/websocket_server.rs b/src/internal_events/websocket_server.rs new file mode 100644 index 0000000000000..39fb1ebee5e19 --- /dev/null +++ b/src/internal_events/websocket_server.rs @@ -0,0 +1,71 @@ +use std::fmt::Debug; + +use futures::channel::mpsc::TrySendError; +use metrics::{counter, gauge}; +use tokio_tungstenite::tungstenite::Message; +use vector_lib::internal_event::InternalEvent; + +use vector_lib::internal_event::{error_stage, error_type}; + +#[derive(Debug)] +pub struct WsListenerConnectionEstablished { + pub client_count: usize, +} + +impl InternalEvent for WsListenerConnectionEstablished { + fn emit(self) { + debug!(message = "Websocket client connected."); + counter!("connection_established_total").increment(1); + gauge!("active_clients").set(self.client_count as f64); + } + + fn name(&self) -> Option<&'static str> { + Some("WsListenerConnectionEstablished") + } +} + +#[derive(Debug)] +pub struct WsListenerConnectionShutdown { + pub client_count: usize, +} + +impl InternalEvent for WsListenerConnectionShutdown { + fn emit(self) { + warn!(message = "Client connection closed."); + counter!("connection_shutdown_total").increment(1); + gauge!("active_clients").set(self.client_count as f64); + } + + fn name(&self) -> Option<&'static str> { + Some("WsListenerConnectionShutdown") + } +} + +#[derive(Debug)] +pub struct WsListenerSendError { + pub error: TrySendError, +} + +impl InternalEvent for WsListenerSendError { + fn emit(self) { + error!( + message = "WebSocket message send error.", + error = %self.error, + error_code = "ws_server_connection_error", + error_type = error_type::WRITER_FAILED, + stage = error_stage::SENDING, + internal_log_rate_limit = true, + ); + counter!( + "component_errors_total", + "error_code" => "ws_server_connection_error", + "error_type" => error_type::WRITER_FAILED, + "stage" => error_stage::SENDING, + ) + .increment(1); + } + + fn name(&self) -> Option<&'static str> { + Some("WsListenerConnectionError") + } +} diff --git a/src/sinks/mod.rs b/src/sinks/mod.rs index 3cf5613324bbe..784f74b1e6005 100644 --- a/src/sinks/mod.rs +++ b/src/sinks/mod.rs @@ -110,6 +110,8 @@ pub mod vector; pub mod webhdfs; #[cfg(feature = "sinks-websocket")] pub mod websocket; +#[cfg(feature = "sinks-websocket-server")] +pub mod websocket_server; pub use vector_lib::{config::Input, sink::VectorSink}; diff --git a/src/sinks/websocket_server/config.rs b/src/sinks/websocket_server/config.rs new file mode 100644 index 0000000000000..cf082e7d3c946 --- /dev/null +++ b/src/sinks/websocket_server/config.rs @@ -0,0 +1,91 @@ +use std::net::SocketAddr; + +use vector_lib::codecs::JsonSerializerConfig; +use vector_lib::configurable::configurable_component; + +use crate::{ + codecs::EncodingConfig, + common::http::server_auth::HttpServerAuthConfig, + config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext}, + sinks::{Healthcheck, VectorSink}, + tls::TlsEnableableConfig, +}; + +use super::sink::WebSocketListenerSink; + +/// Configuration for the `websocket_server` sink. +#[configurable_component(sink( + "websocket_server", + "Deliver observability event data to websocket clients." +))] +#[derive(Clone, Debug)] +pub struct WebSocketListenerSinkConfig { + /// The socket address to listen for connections on. + /// + /// This value _must_ include a port. + #[configurable(metadata(docs::examples = "0.0.0.0:80"))] + #[configurable(metadata(docs::examples = "localhost:80"))] + pub address: SocketAddr, + + #[configurable(derived)] + pub tls: Option, + + #[configurable(derived)] + pub encoding: EncodingConfig, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::is_default" + )] + pub acknowledgements: AcknowledgementsConfig, + + #[configurable(derived)] + pub auth: Option, +} + +impl Default for WebSocketListenerSinkConfig { + fn default() -> Self { + Self { + address: "0.0.0.0:8080".parse().unwrap(), + encoding: JsonSerializerConfig::default().into(), + tls: None, + acknowledgements: Default::default(), + auth: None, + } + } +} + +#[async_trait::async_trait] +#[typetag::serde(name = "websocket_server")] +impl SinkConfig for WebSocketListenerSinkConfig { + async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let ws_sink = WebSocketListenerSink::new(self.clone(), cx)?; + + Ok(( + VectorSink::from_event_streamsink(ws_sink), + Box::pin(async move { Ok(()) }), + )) + } + + fn input(&self) -> Input { + Input::new(self.encoding.config().input_type()) + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} + +impl_generate_config_from_default!(WebSocketListenerSinkConfig); + +#[cfg(test)] +mod test { + use super::*; + + #[test] + fn generate_config() { + crate::test_util::test_generate_config::(); + } +} diff --git a/src/sinks/websocket_server/mod.rs b/src/sinks/websocket_server/mod.rs new file mode 100644 index 0000000000000..4c850871271ca --- /dev/null +++ b/src/sinks/websocket_server/mod.rs @@ -0,0 +1,4 @@ +mod config; +mod sink; + +pub use config::WebSocketListenerSinkConfig; diff --git a/src/sinks/websocket_server/sink.rs b/src/sinks/websocket_server/sink.rs new file mode 100644 index 0000000000000..eb9b4f7008cfd --- /dev/null +++ b/src/sinks/websocket_server/sink.rs @@ -0,0 +1,400 @@ +use std::{ + collections::HashMap, + net::SocketAddr, + sync::{Arc, Mutex}, +}; + +use async_trait::async_trait; +use bytes::BytesMut; +use futures::{ + channel::mpsc::{unbounded, UnboundedSender}, + pin_mut, + stream::BoxStream, + StreamExt, +}; +use http::StatusCode; +use tokio::net::TcpStream; +use tokio_tungstenite::tungstenite::{ + handshake::server::{ErrorResponse, Request, Response}, + Message, +}; +use tokio_util::codec::Encoder as _; +use tracing::Instrument; +use vector_lib::{ + event::{Event, EventStatus}, + finalization::Finalizable, + internal_event::{ + ByteSize, BytesSent, CountByteSize, EventsSent, InternalEventHandle, Output, Protocol, + }, + sink::StreamSink, + tls::{MaybeTlsIncomingStream, MaybeTlsListener, MaybeTlsSettings}, + EstimatedJsonEncodedSizeOf, +}; + +use crate::{ + codecs::{Encoder, Transformer}, + common::http::server_auth::HttpServerAuthMatcher, + internal_events::{ + ConnectionOpen, OpenGauge, WsConnectionFailedError, WsListenerConnectionEstablished, + WsListenerConnectionShutdown, WsListenerSendError, + }, + sinks::prelude::*, +}; + +use super::WebSocketListenerSinkConfig; + +pub struct WebSocketListenerSink { + peers: Arc>>>, + tls: MaybeTlsSettings, + transformer: Transformer, + encoder: Encoder<()>, + address: SocketAddr, + auth: Option, +} + +impl WebSocketListenerSink { + pub fn new(config: WebSocketListenerSinkConfig, cx: SinkContext) -> crate::Result { + let tls = MaybeTlsSettings::from_config(config.tls.as_ref(), true)?; + let transformer = config.encoding.transformer(); + let serializer = config.encoding.build()?; + let encoder = Encoder::<()>::new(serializer); + let auth = config + .auth + .map(|config| config.build(&cx.enrichment_tables)) + .transpose()?; + Ok(Self { + peers: Arc::new(Mutex::new(HashMap::new())), + tls, + address: config.address, + transformer, + encoder, + auth, + }) + } + + const fn should_encode_as_binary(&self) -> bool { + use vector_lib::codecs::encoding::Serializer::{ + Avro, Cef, Csv, Gelf, Json, Logfmt, Native, NativeJson, Protobuf, RawMessage, Text, + }; + + match self.encoder.serializer() { + RawMessage(_) | Avro(_) | Native(_) | Protobuf(_) => true, + Cef(_) | Csv(_) | Logfmt(_) | Gelf(_) | Json(_) | Text(_) | NativeJson(_) => false, + } + } + + async fn handle_connections( + auth: Option, + peers: Arc>>>, + mut listener: MaybeTlsListener, + ) { + let open_gauge = OpenGauge::new(); + + while let Ok(stream) = listener.accept().await { + tokio::spawn( + Self::handle_connection( + auth.clone(), + Arc::clone(&peers), + stream, + open_gauge.clone(), + ) + .in_current_span(), + ); + } + } + + async fn handle_connection( + auth: Option, + peers: Arc>>>, + stream: MaybeTlsIncomingStream, + open_gauge: OpenGauge, + ) -> Result<(), ()> { + let addr = stream.peer_addr(); + debug!("Incoming TCP connection from: {}", addr); + + let header_callback = |req: &Request, response: Response| { + let Some(auth) = auth else { + return Ok(response); + }; + match auth.handle_auth(req.headers()) { + Ok(_) => Ok(response), + Err(message) => { + let mut response = ErrorResponse::default(); + *response.status_mut() = StatusCode::UNAUTHORIZED; + *response.body_mut() = Some(message.message().to_string()); + debug!("Websocket handshake auth validation failed: {}", message); + Err(response) + } + } + }; + + let ws_stream = tokio_tungstenite::accept_hdr_async(stream, header_callback) + .await + .map_err(|err| { + debug!("Error during websocket handshake: {}", err); + emit!(WsConnectionFailedError { + error: Box::new(err) + }) + })?; + + let _open_token = open_gauge.open(|count| emit!(ConnectionOpen { count })); + + // Insert the write part of this peer to the peer map. + let (tx, rx) = unbounded(); + + { + let mut peers = peers.lock().unwrap(); + debug!("WebSocket connection established: {}", addr); + + peers.insert(addr, tx); + emit!(WsListenerConnectionEstablished { + client_count: peers.len() + }); + } + + let (outgoing, _incoming) = ws_stream.split(); + + let forward_data_to_client = rx.map(Ok).forward(outgoing); + + pin_mut!(forward_data_to_client); + let _ = forward_data_to_client.await; + + { + let mut peers = peers.lock().unwrap(); + debug!("{} disconnected.", &addr); + peers.remove(&addr); + emit!(WsListenerConnectionShutdown { + client_count: peers.len() + }); + } + + Ok(()) + } +} + +#[async_trait] +impl StreamSink for WebSocketListenerSink { + async fn run(mut self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + let input = input.fuse().peekable(); + pin_mut!(input); + + let bytes_sent = register!(BytesSent::from(Protocol("websocket".into()))); + let events_sent = register!(EventsSent::from(Output(None))); + let encode_as_binary = self.should_encode_as_binary(); + + let listener = self.tls.bind(&self.address).await.map_err(|_| ())?; + + tokio::spawn( + Self::handle_connections(self.auth, Arc::clone(&self.peers), listener) + .in_current_span(), + ); + + while input.as_mut().peek().await.is_some() { + let mut event = input.next().await.unwrap(); + let finalizers = event.take_finalizers(); + + self.transformer.transform(&mut event); + + let event_byte_size = event.estimated_json_encoded_size_of(); + + let mut bytes = BytesMut::new(); + match self.encoder.encode(event, &mut bytes) { + Ok(()) => { + finalizers.update_status(EventStatus::Delivered); + + let message = if encode_as_binary { + Message::binary(bytes) + } else { + Message::text(String::from_utf8_lossy(&bytes)) + }; + let message_len = message.len(); + + let peers = self.peers.lock().unwrap(); + let broadcast_recipients = peers.iter().map(|(_, ws_sink)| ws_sink); + for recp in broadcast_recipients { + if let Err(error) = recp.unbounded_send(message.clone()) { + emit!(WsListenerSendError { error }); + } else { + events_sent.emit(CountByteSize(1, event_byte_size)); + bytes_sent.emit(ByteSize(message_len)); + } + } + } + Err(_) => { + // Error is handled by `Encoder`. + finalizers.update_status(EventStatus::Errored); + } + }; + } + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use futures::{channel::mpsc::UnboundedReceiver, SinkExt, Stream, StreamExt}; + use futures_util::stream; + use std::future::ready; + + use tokio::{task::JoinHandle, time}; + use vector_lib::sink::VectorSink; + + use super::*; + + use crate::{ + event::{Event, LogEvent}, + test_util::{ + components::{run_and_assert_sink_compliance, SINK_TAGS}, + next_addr, + }, + }; + + #[tokio::test] + async fn test_single_client() { + let event = Event::Log(LogEvent::from("foo")); + + let (mut sender, input_events) = build_test_event_channel(); + let address = next_addr(); + let port = address.port(); + + let websocket_sink = start_websocket_server_sink( + WebSocketListenerSinkConfig { + address, + ..Default::default() + }, + input_events, + ) + .await; + + let client_handle = attach_websocket_client(port, vec![event.clone()]).await; + sender.send(event).await.expect("Failed to send."); + + client_handle.await.unwrap(); + drop(sender); + websocket_sink.await.unwrap(); + } + + #[tokio::test] + async fn test_single_client_late_connect() { + let event1 = Event::Log(LogEvent::from("foo1")); + let event2 = Event::Log(LogEvent::from("foo2")); + + let (mut sender, input_events) = build_test_event_channel(); + let address = next_addr(); + let port = address.port(); + + let websocket_sink = start_websocket_server_sink( + WebSocketListenerSinkConfig { + address, + ..Default::default() + }, + input_events, + ) + .await; + + // Sending event 1 before client joined, the client should not received it + sender.send(event1).await.expect("Failed to send."); + + // Now connect the client + let client_handle = attach_websocket_client(port, vec![event2.clone()]).await; + + // Sending event 2, this one should be received by the client + sender.send(event2).await.expect("Failed to send."); + + client_handle.await.unwrap(); + drop(sender); + websocket_sink.await.unwrap(); + } + + #[tokio::test] + async fn test_multiple_clients() { + let event = Event::Log(LogEvent::from("foo")); + + let (mut sender, input_events) = build_test_event_channel(); + let address = next_addr(); + let port = address.port(); + + let websocket_sink = start_websocket_server_sink( + WebSocketListenerSinkConfig { + address, + ..Default::default() + }, + input_events, + ) + .await; + + let client_handle_1 = attach_websocket_client(port, vec![event.clone()]).await; + let client_handle_2 = attach_websocket_client(port, vec![event.clone()]).await; + sender.send(event).await.expect("Failed to send."); + + client_handle_1.await.unwrap(); + client_handle_2.await.unwrap(); + drop(sender); + websocket_sink.await.unwrap(); + } + + #[tokio::test] + async fn sink_spec_compliance() { + let event = Event::Log(LogEvent::from("foo")); + + let sink = WebSocketListenerSink::new( + WebSocketListenerSinkConfig { + address: next_addr(), + ..Default::default() + }, + SinkContext::default(), + ) + .unwrap(); + + run_and_assert_sink_compliance( + VectorSink::from_event_streamsink(sink), + stream::once(ready(event)), + &SINK_TAGS, + ) + .await; + } + + async fn start_websocket_server_sink( + config: WebSocketListenerSinkConfig, + events: S, + ) -> JoinHandle<()> + where + S: Stream + Send + 'static, + { + let sink = WebSocketListenerSink::new(config, SinkContext::default()).unwrap(); + + let compliance_assertion = tokio::spawn(run_and_assert_sink_compliance( + VectorSink::from_event_streamsink(sink), + events, + &SINK_TAGS, + )); + + time::sleep(time::Duration::from_millis(100)).await; + + compliance_assertion + } + + async fn attach_websocket_client(port: u16, expected_events: Vec) -> JoinHandle<()> { + let (ws_stream, _) = tokio_tungstenite::connect_async(format!("ws://localhost:{port}")) + .await + .expect("Client failed to connect."); + let (_, rx) = ws_stream.split(); + tokio::spawn(async move { + let events = expected_events.clone(); + rx.take(events.len()) + .zip(stream::iter(events)) + .for_each(|(msg, expected)| async { + let msg_text = msg.unwrap().into_text().unwrap(); + let expected = serde_json::to_string(expected.into_log().value()).unwrap(); + assert_eq!(expected, msg_text); + }) + .await; + }) + } + + fn build_test_event_channel() -> (UnboundedSender, UnboundedReceiver) { + let (tx, rx) = futures::channel::mpsc::unbounded(); + (tx, rx) + } +} diff --git a/website/content/en/docs/reference/configuration/sinks/websocket_server.md b/website/content/en/docs/reference/configuration/sinks/websocket_server.md new file mode 100644 index 0000000000000..b30978e23b744 --- /dev/null +++ b/website/content/en/docs/reference/configuration/sinks/websocket_server.md @@ -0,0 +1,14 @@ +--- +title: Websocket Server +description: Deliver observability event data to websocket clients +component_kind: sink +layout: component +tags: ["websocket", "component", "sink"] +--- + +{{/* +This doc is generated using: + +1. The template in layouts/docs/component.html +2. The relevant CUE data in cue/reference/components/... +*/}} diff --git a/website/cue/reference/components/sinks/base/websocket_server.cue b/website/cue/reference/components/sinks/base/websocket_server.cue new file mode 100644 index 0000000000000..ced47c9c1b96d --- /dev/null +++ b/website/cue/reference/components/sinks/base/websocket_server.cue @@ -0,0 +1,501 @@ +package metadata + +base: components: sinks: websocket_server: configuration: { + acknowledgements: { + description: """ + Controls how acknowledgements are handled for this sink. + + See [End-to-end Acknowledgements][e2e_acks] for more information on how event acknowledgement is handled. + + [e2e_acks]: https://vector.dev/docs/about/under-the-hood/architecture/end-to-end-acknowledgements/ + """ + required: false + type: object: options: enabled: { + description: """ + Whether or not end-to-end acknowledgements are enabled. + + When enabled for a sink, any source connected to that sink, where the source supports + end-to-end acknowledgements as well, waits for events to be acknowledged by **all + connected** sinks before acknowledging them at the source. + + Enabling or disabling acknowledgements at the sink level takes precedence over any global + [`acknowledgements`][global_acks] configuration. + + [global_acks]: https://vector.dev/docs/reference/configuration/global-options/#acknowledgements + """ + required: false + type: bool: {} + } + } + address: { + description: """ + The socket address to listen for connections on. + + This value _must_ include a port. + """ + required: true + type: string: examples: ["0.0.0.0:80", "localhost:80"] + } + auth: { + description: "HTTP Basic authentication configuration." + required: false + type: object: options: { + password: { + description: "The password for basic authentication." + required: true + type: string: examples: ["hunter2", "${PASSWORD}"] + } + username: { + description: "The username for basic authentication." + required: true + type: string: examples: ["AzureDiamond", "admin"] + } + } + } + encoding: { + description: "Configures how events are encoded into raw bytes." + required: true + type: object: options: { + avro: { + description: "Apache Avro-specific encoder options." + relevant_when: "codec = \"avro\"" + required: true + type: object: options: schema: { + description: "The Avro schema." + required: true + type: string: examples: ["{ \"type\": \"record\", \"name\": \"log\", \"fields\": [{ \"name\": \"message\", \"type\": \"string\" }] }"] + } + } + cef: { + description: "The CEF Serializer Options." + relevant_when: "codec = \"cef\"" + required: true + type: object: options: { + device_event_class_id: { + description: """ + Unique identifier for each event type. Identifies the type of event reported. + The value length must be less than or equal to 1023. + """ + required: true + type: string: {} + } + device_product: { + description: """ + Identifies the product of a vendor. + The part of a unique device identifier. No two products can use the same combination of device vendor and device product. + The value length must be less than or equal to 63. + """ + required: true + type: string: {} + } + device_vendor: { + description: """ + Identifies the vendor of the product. + The part of a unique device identifier. No two products can use the same combination of device vendor and device product. + The value length must be less than or equal to 63. + """ + required: true + type: string: {} + } + device_version: { + description: """ + Identifies the version of the problem. In combination with device product and vendor, it composes the unique id of the device that sends messages. + The value length must be less than or equal to 31. + """ + required: true + type: string: {} + } + extensions: { + description: """ + The collection of key-value pairs. Keys are the keys of the extensions, and values are paths that point to the extension values of a log event. + The event can have any number of key-value pairs in any order. + """ + required: false + type: object: options: "*": { + description: "This is a path that points to the extension value of a log event." + required: true + type: string: {} + } + } + name: { + description: """ + This is a path that points to the human-readable description of a log event. + The value length must be less than or equal to 512. + Equals "cef.name" by default. + """ + required: true + type: string: {} + } + severity: { + description: """ + This is a path that points to the field of a log event that reflects importance of the event. + Reflects importance of the event. + + It must point to a number from 0 to 10. + 0 = Lowest, 10 = Highest. + Equals to "cef.severity" by default. + """ + required: true + type: string: {} + } + version: { + description: """ + CEF Version. Can be either 0 or 1. + Equals to "0" by default. + """ + required: true + type: string: enum: { + V0: "CEF specification version 0.1." + V1: "CEF specification version 1.x." + } + } + } + } + codec: { + description: "The codec to use for encoding events." + required: true + type: string: enum: { + avro: """ + Encodes an event as an [Apache Avro][apache_avro] message. + + [apache_avro]: https://avro.apache.org/ + """ + cef: "Encodes an event as a CEF (Common Event Format) formatted message." + csv: """ + Encodes an event as a CSV message. + + This codec must be configured with fields to encode. + """ + gelf: """ + Encodes an event as a [GELF][gelf] message. + + This codec is experimental for the following reason: + + The GELF specification is more strict than the actual Graylog receiver. + Vector's encoder currently adheres more strictly to the GELF spec, with + the exception that some characters such as `@` are allowed in field names. + + Other GELF codecs such as Loki's, use a [Go SDK][implementation] that is maintained + by Graylog, and is much more relaxed than the GELF spec. + + Going forward, Vector will use that [Go SDK][implementation] as the reference implementation, which means + the codec may continue to relax the enforcement of specification. + + [gelf]: https://docs.graylog.org/docs/gelf + [implementation]: https://github.com/Graylog2/go-gelf/blob/v2/gelf/reader.go + """ + json: """ + Encodes an event as [JSON][json]. + + [json]: https://www.json.org/ + """ + logfmt: """ + Encodes an event as a [logfmt][logfmt] message. + + [logfmt]: https://brandur.org/logfmt + """ + native: """ + Encodes an event in the [native Protocol Buffers format][vector_native_protobuf]. + + This codec is **[experimental][experimental]**. + + [vector_native_protobuf]: https://github.com/vectordotdev/vector/blob/master/lib/vector-core/proto/event.proto + [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs + """ + native_json: """ + Encodes an event in the [native JSON format][vector_native_json]. + + This codec is **[experimental][experimental]**. + + [vector_native_json]: https://github.com/vectordotdev/vector/blob/master/lib/codecs/tests/data/native_encoding/schema.cue + [experimental]: https://vector.dev/highlights/2022-03-31-native-event-codecs + """ + protobuf: """ + Encodes an event as a [Protobuf][protobuf] message. + + [protobuf]: https://protobuf.dev/ + """ + raw_message: """ + No encoding. + + This encoding uses the `message` field of a log event. + + Be careful if you are modifying your log events (for example, by using a `remap` + transform) and removing the message field while doing additional parsing on it, as this + could lead to the encoding emitting empty strings for the given event. + """ + text: """ + Plain text encoding. + + This encoding uses the `message` field of a log event. For metrics, it uses an + encoding that resembles the Prometheus export format. + + Be careful if you are modifying your log events (for example, by using a `remap` + transform) and removing the message field while doing additional parsing on it, as this + could lead to the encoding emitting empty strings for the given event. + """ + } + } + csv: { + description: "The CSV Serializer Options." + relevant_when: "codec = \"csv\"" + required: true + type: object: options: { + capacity: { + description: """ + Set the capacity (in bytes) of the internal buffer used in the CSV writer. + This defaults to a reasonable setting. + """ + required: false + type: uint: default: 8192 + } + delimiter: { + description: "The field delimiter to use when writing CSV." + required: false + type: ascii_char: default: "," + } + double_quote: { + description: """ + Enable double quote escapes. + + This is enabled by default, but it may be disabled. When disabled, quotes in + field data are escaped instead of doubled. + """ + required: false + type: bool: default: true + } + escape: { + description: """ + The escape character to use when writing CSV. + + In some variants of CSV, quotes are escaped using a special escape character + like \\ (instead of escaping quotes by doubling them). + + To use this, `double_quotes` needs to be disabled as well otherwise it is ignored. + """ + required: false + type: ascii_char: default: "\"" + } + fields: { + description: """ + Configures the fields that will be encoded, as well as the order in which they + appear in the output. + + If a field is not present in the event, the output will be an empty string. + + Values of type `Array`, `Object`, and `Regex` are not supported and the + output will be an empty string. + """ + required: true + type: array: items: type: string: {} + } + quote: { + description: "The quote character to use when writing CSV." + required: false + type: ascii_char: default: "\"" + } + quote_style: { + description: "The quoting style to use when writing CSV data." + required: false + type: string: { + default: "necessary" + enum: { + always: "Always puts quotes around every field." + necessary: """ + Puts quotes around fields only when necessary. + They are necessary when fields contain a quote, delimiter, or record terminator. + Quotes are also necessary when writing an empty record + (which is indistinguishable from a record with one empty field). + """ + never: "Never writes quotes, even if it produces invalid CSV data." + non_numeric: """ + Puts quotes around all fields that are non-numeric. + Namely, when writing a field that does not parse as a valid float or integer, + then quotes are used even if they aren't strictly necessary. + """ + } + } + } + } + } + except_fields: { + description: "List of fields that are excluded from the encoded event." + required: false + type: array: items: type: string: {} + } + json: { + description: "Options for the JsonSerializer." + relevant_when: "codec = \"json\"" + required: false + type: object: options: pretty: { + description: "Whether to use pretty JSON formatting." + required: false + type: bool: default: false + } + } + metric_tag_values: { + description: """ + Controls how metric tag values are encoded. + + When set to `single`, only the last non-bare value of tags are displayed with the + metric. When set to `full`, all metric tags are exposed as separate assignments. + """ + relevant_when: "codec = \"json\" or codec = \"text\"" + required: false + type: string: { + default: "single" + enum: { + full: "All tags are exposed as arrays of either string or null values." + single: """ + Tag values are exposed as single strings, the same as they were before this config + option. Tags with multiple values show the last assigned value, and null values + are ignored. + """ + } + } + } + only_fields: { + description: "List of fields that are included in the encoded event." + required: false + type: array: items: type: string: {} + } + protobuf: { + description: "Options for the Protobuf serializer." + relevant_when: "codec = \"protobuf\"" + required: true + type: object: options: { + desc_file: { + description: """ + The path to the protobuf descriptor set file. + + This file is the output of `protoc -o ...` + """ + required: true + type: string: examples: ["/etc/vector/protobuf_descriptor_set.desc"] + } + message_type: { + description: "The name of the message type to use for serializing." + required: true + type: string: examples: ["package.Message"] + } + } + } + timestamp_format: { + description: "Format used for timestamp fields." + required: false + type: string: enum: { + rfc3339: "Represent the timestamp as a RFC 3339 timestamp." + unix: "Represent the timestamp as a Unix timestamp." + unix_float: "Represent the timestamp as a Unix timestamp in floating point." + unix_ms: "Represent the timestamp as a Unix timestamp in milliseconds." + unix_ns: "Represent the timestamp as a Unix timestamp in nanoseconds." + unix_us: "Represent the timestamp as a Unix timestamp in microseconds" + } + } + } + } + tls: { + description: "Configures the TLS options for incoming/outgoing connections." + required: false + type: object: options: { + alpn_protocols: { + description: """ + Sets the list of supported ALPN protocols. + + Declare the supported ALPN protocols, which are used during negotiation with peer. They are prioritized in the order + that they are defined. + """ + required: false + type: array: items: type: string: examples: ["h2"] + } + ca_file: { + description: """ + Absolute path to an additional CA certificate file. + + The certificate must be in the DER or PEM (X.509) format. Additionally, the certificate can be provided as an inline string in PEM format. + """ + required: false + type: string: examples: ["/path/to/certificate_authority.crt"] + } + crt_file: { + description: """ + Absolute path to a certificate file used to identify this server. + + The certificate must be in DER, PEM (X.509), or PKCS#12 format. Additionally, the certificate can be provided as + an inline string in PEM format. + + If this is set, and is not a PKCS#12 archive, `key_file` must also be set. + """ + required: false + type: string: examples: ["/path/to/host_certificate.crt"] + } + enabled: { + description: """ + Whether or not to require TLS for incoming or outgoing connections. + + When enabled and used for incoming connections, an identity certificate is also required. See `tls.crt_file` for + more information. + """ + required: false + type: bool: {} + } + key_file: { + description: """ + Absolute path to a private key file used to identify this server. + + The key must be in DER or PEM (PKCS#8) format. Additionally, the key can be provided as an inline string in PEM format. + """ + required: false + type: string: examples: ["/path/to/host_certificate.key"] + } + key_pass: { + description: """ + Passphrase used to unlock the encrypted key file. + + This has no effect unless `key_file` is set. + """ + required: false + type: string: examples: ["${KEY_PASS_ENV_VAR}", "PassWord1"] + } + server_name: { + description: """ + Server name to use when using Server Name Indication (SNI). + + Only relevant for outgoing connections. + """ + required: false + type: string: examples: ["www.example.com"] + } + verify_certificate: { + description: """ + Enables certificate verification. For components that create a server, this requires that the + client connections have a valid client certificate. For components that initiate requests, + this validates that the upstream has a valid certificate. + + If enabled, certificates must not be expired and must be issued by a trusted + issuer. This verification operates in a hierarchical manner, checking that the leaf certificate (the + certificate presented by the client/server) is not only valid, but that the issuer of that certificate is also valid, and + so on until the verification process reaches a root certificate. + + Do NOT set this to `false` unless you understand the risks of not verifying the validity of certificates. + """ + required: false + type: bool: {} + } + verify_hostname: { + description: """ + Enables hostname verification. + + If enabled, the hostname used to connect to the remote host must be present in the TLS certificate presented by + the remote host, either as the Common Name or as an entry in the Subject Alternative Name extension. + + Only relevant for outgoing connections. + + Do NOT set this to `false` unless you understand the risks of not verifying the remote hostname. + """ + required: false + type: bool: {} + } + } + } +} diff --git a/website/cue/reference/components/sinks/websocket_server.cue b/website/cue/reference/components/sinks/websocket_server.cue new file mode 100644 index 0000000000000..13f65a8419ec6 --- /dev/null +++ b/website/cue/reference/components/sinks/websocket_server.cue @@ -0,0 +1,86 @@ +package metadata + +components: sinks: websocket_server: { + _port: 8080 + title: "WebSocket server" + + classes: { + commonly_used: false + delivery: "best_effort" + development: "beta" + egress_method: "stream" + service_providers: [] + stateful: false + } + + features: { + acknowledgements: true + auto_generated: true + healthcheck: enabled: true + send: { + compression: enabled: false + encoding: { + enabled: true + codec: { + enabled: true + enum: ["json", "text"] + } + } + request: enabled: false + tls: { + enabled: true + can_verify_certificate: true + can_verify_hostname: true + enabled_default: false + enabled_by_scheme: true + } + to: { + service: services.websocket_client + interface: { + socket: { + direction: "incoming" + protocols: ["tcp"] + ssl: "optional" + port: _port + } + } + } + } + } + + support: { + targets: { + "aarch64-unknown-linux-gnu": true + "aarch64-unknown-linux-musl": true + "armv7-unknown-linux-gnueabihf": true + "armv7-unknown-linux-musleabihf": true + "x86_64-apple-darwin": true + "x86_64-pc-windows-msv": true + "x86_64-unknown-linux-gnu": true + "x86_64-unknown-linux-musl": true + } + requirements: [] + warnings: [] + notices: [] + } + + input: { + logs: true + metrics: { + counter: true + distribution: true + gauge: true + histogram: true + summary: true + set: true + } + traces: true + } + + telemetry: metrics: { + active_clients: components.sources.internal_metrics.output.metrics.active_clients + open_connections: components.sources.internal_metrics.output.metrics.open_connections + connection_established_total: components.sources.internal_metrics.output.metrics.connection_established_total + connection_shutdown_total: components.sources.internal_metrics.output.metrics.connection_shutdown_total + } +} diff --git a/website/cue/reference/components/sources/internal_metrics.cue b/website/cue/reference/components/sources/internal_metrics.cue index 91641a74cb357..4ed3856d866af 100644 --- a/website/cue/reference/components/sources/internal_metrics.cue +++ b/website/cue/reference/components/sources/internal_metrics.cue @@ -54,6 +54,12 @@ components: sources: internal_metrics: { } // Instance-level "process" metrics + active_clients: { + description: "Number of clients attached to a component." + type: "gauge" + default_namespace: "vector" + tags: _component_tags + } aggregate_events_recorded_total: { description: "The number of events recorded by the aggregate transform." type: "counter" diff --git a/website/cue/reference/services/websocket_client.cue b/website/cue/reference/services/websocket_client.cue new file mode 100644 index 0000000000000..4648292970d67 --- /dev/null +++ b/website/cue/reference/services/websocket_client.cue @@ -0,0 +1,8 @@ +package metadata + +services: websocket_client: { + name: "WebSocket client" + thing: "a \(name)" + url: urls.websocket + versions: null +}