diff --git a/src/sinks/gcp/mod.rs b/src/sinks/gcp/mod.rs index 022482490fa53..5a7f64b84c076 100644 --- a/src/sinks/gcp/mod.rs +++ b/src/sinks/gcp/mod.rs @@ -6,7 +6,7 @@ use vector_config::configurable_component; pub mod chronicle_unstructured; pub mod cloud_storage; pub mod pubsub; -pub mod stackdriver_logs; +pub mod stackdriver; pub mod stackdriver_metrics; /// A monitored resource. diff --git a/src/sinks/gcp/stackdriver/logs/config.rs b/src/sinks/gcp/stackdriver/logs/config.rs new file mode 100644 index 0000000000000..71adf3d0ef778 --- /dev/null +++ b/src/sinks/gcp/stackdriver/logs/config.rs @@ -0,0 +1,278 @@ +//! Configuration for the `gcp_stackdriver_logs` sink. + +use crate::{ + gcp::{GcpAuthConfig, GcpAuthenticator, Scope}, + http::HttpClient, + schema, + sinks::{ + gcs_common::config::healthcheck_response, + prelude::*, + util::{ + http::{http_response_retry_logic, HttpService}, + BoxedRawValue, RealtimeSizeBasedDefaultBatchSettings, + }, + }, +}; +use http::{Request, Uri}; +use hyper::Body; +use lookup::lookup_v2::ConfigValuePath; +use snafu::Snafu; +use std::collections::HashMap; +use vrl::value::Kind; + +use super::{ + encoder::StackdriverLogsEncoder, request_builder::StackdriverLogsRequestBuilder, + service::StackdriverLogsServiceRequestBuilder, sink::StackdriverLogsSink, +}; + +#[derive(Debug, Snafu)] +enum HealthcheckError { + #[snafu(display("Resource not found"))] + NotFound, +} + +/// Configuration for the `gcp_stackdriver_logs` sink. +#[configurable_component(sink( + "gcp_stackdriver_logs", + "Deliver logs to GCP's Cloud Operations suite." +))] +#[derive(Clone, Debug, Default)] +#[serde(deny_unknown_fields)] +pub(super) struct StackdriverConfig { + #[serde(skip, default = "default_endpoint")] + pub(super) endpoint: String, + + #[serde(flatten)] + pub(super) log_name: StackdriverLogName, + + /// The log ID to which to publish logs. + /// + /// This is a name you create to identify this log stream. + pub(super) log_id: Template, + + /// The monitored resource to associate the logs with. + pub(super) resource: StackdriverResource, + + /// The field of the log event from which to take the outgoing log’s `severity` field. + /// + /// The named field is removed from the log event if present, and must be either an integer + /// between 0 and 800 or a string containing one of the [severity level names][sev_names] (case + /// is ignored) or a common prefix such as `err`. + /// + /// If no severity key is specified, the severity of outgoing records is set to 0 (`DEFAULT`). + /// + /// See the [GCP Stackdriver Logging LogSeverity description][logsev_docs] for more details on + /// the value of the `severity` field. + /// + /// [sev_names]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity + /// [logsev_docs]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity + #[configurable(metadata(docs::examples = "severity"))] + pub(super) severity_key: Option, + + #[serde(flatten)] + pub(super) auth: GcpAuthConfig, + + #[configurable(derived)] + #[serde( + default, + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + pub(super) encoding: Transformer, + + #[configurable(derived)] + #[serde(default)] + pub(super) batch: BatchConfig, + + #[configurable(derived)] + #[serde(default)] + pub(super) request: TowerRequestConfig, + + #[configurable(derived)] + pub(super) tls: Option, + + #[configurable(derived)] + #[serde( + default, + deserialize_with = "crate::serde::bool_or_struct", + skip_serializing_if = "crate::serde::skip_serializing_if_default" + )] + acknowledgements: AcknowledgementsConfig, +} + +pub(super) fn default_endpoint() -> String { + "https://logging.googleapis.com/v2/entries:write".to_string() +} + +// 10MB limit for entries.write: https://cloud.google.com/logging/quotas#api-limits +const MAX_BATCH_PAYLOAD_SIZE: usize = 10_000_000; + +/// Logging locations. +#[configurable_component] +#[derive(Clone, Debug, Derivative)] +#[derivative(Default)] +pub(super) enum StackdriverLogName { + /// The billing account ID to which to publish logs. + /// + /// Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set. + #[serde(rename = "billing_account_id")] + #[configurable(metadata(docs::examples = "012345-6789AB-CDEF01"))] + BillingAccount(String), + + /// The folder ID to which to publish logs. + /// + /// See the [Google Cloud Platform folder documentation][folder_docs] for more details. + /// + /// Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set. + /// + /// [folder_docs]: https://cloud.google.com/resource-manager/docs/creating-managing-folders + #[serde(rename = "folder_id")] + #[configurable(metadata(docs::examples = "My Folder"))] + Folder(String), + + /// The organization ID to which to publish logs. + /// + /// This would be the identifier assigned to your organization on Google Cloud Platform. + /// + /// Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set. + #[serde(rename = "organization_id")] + #[configurable(metadata(docs::examples = "622418129737"))] + Organization(String), + + /// The project ID to which to publish logs. + /// + /// See the [Google Cloud Platform project management documentation][project_docs] for more details. + /// + /// Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set. + /// + /// [project_docs]: https://cloud.google.com/resource-manager/docs/creating-managing-projects + #[derivative(Default)] + #[serde(rename = "project_id")] + #[configurable(metadata(docs::examples = "vector-123456"))] + Project(String), +} + +/// A monitored resource. +/// +/// Monitored resources in GCP allow associating logs and metrics specifically with native resources +/// within Google Cloud Platform. This takes the form of a "type" field which identifies the +/// resource, and a set of type-specific labels to uniquely identify a resource of that type. +/// +/// See [Monitored resource types][mon_docs] for more information. +/// +/// [mon_docs]: https://cloud.google.com/monitoring/api/resources +// TODO: this type is specific to the stackdrivers log sink because it allows for template-able +// label values, but we should consider replacing `sinks::gcp::GcpTypedResource` with this so both +// the stackdriver metrics _and_ logs sink can have template-able label values, and less duplication +#[configurable_component] +#[derive(Clone, Debug, Default)] +pub(super) struct StackdriverResource { + /// The monitored resource type. + /// + /// For example, the type of a Compute Engine VM instance is `gce_instance`. + /// See the [Google Cloud Platform monitored resource documentation][gcp_resources] for + /// more details. + /// + /// [gcp_resources]: https://cloud.google.com/monitoring/api/resources + #[serde(rename = "type")] + pub(super) type_: String, + + /// Type-specific labels. + #[serde(flatten)] + #[configurable(metadata(docs::additional_props_description = "A type-specific label."))] + #[configurable(metadata(docs::examples = "label_examples()"))] + pub(super) labels: HashMap, +} + +fn label_examples() -> HashMap { + let mut example = HashMap::new(); + example.insert("instanceId".to_string(), "Twilight".to_string()); + example.insert("zone".to_string(), "{{ zone }}".to_string()); + example +} + +impl_generate_config_from_default!(StackdriverConfig); + +#[async_trait::async_trait] +#[typetag::serde(name = "gcp_stackdriver_logs")] +impl SinkConfig for StackdriverConfig { + async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { + let auth = self.auth.build(Scope::LoggingWrite).await?; + + let request_builder = StackdriverLogsRequestBuilder { + encoder: StackdriverLogsEncoder::new( + self.encoding.clone(), + self.log_id.clone(), + self.log_name.clone(), + self.resource.clone(), + self.severity_key.clone(), + ), + }; + + let batch_settings = self + .batch + .validate()? + .limit_max_bytes(MAX_BATCH_PAYLOAD_SIZE)? + .into_batcher_settings()?; + + let request_limits = self.request.unwrap_with( + &TowerRequestConfig::default() + .rate_limit_duration_secs(1) + .rate_limit_num(1000), + ); + + let tls_settings = TlsSettings::from_options(&self.tls)?; + let client = HttpClient::new(tls_settings, cx.proxy())?; + + let uri: Uri = self.endpoint.parse()?; + + let stackdriver_logs_service_request_builder = StackdriverLogsServiceRequestBuilder { + uri: uri.clone(), + auth: auth.clone(), + }; + + let service = HttpService::new(client.clone(), stackdriver_logs_service_request_builder); + + let service = ServiceBuilder::new() + .settings(request_limits, http_response_retry_logic()) + .service(service); + + let sink = StackdriverLogsSink::new(service, batch_settings, request_builder); + + let healthcheck = healthcheck(client, auth.clone(), uri).boxed(); + + auth.spawn_regenerate_token(); + + Ok((VectorSink::from_event_streamsink(sink), healthcheck)) + } + + fn input(&self) -> Input { + let requirement = + schema::Requirement::empty().required_meaning("timestamp", Kind::timestamp()); + + Input::log().with_schema_requirement(requirement) + } + + fn acknowledgements(&self) -> &AcknowledgementsConfig { + &self.acknowledgements + } +} + +async fn healthcheck(client: HttpClient, auth: GcpAuthenticator, uri: Uri) -> crate::Result<()> { + let entries: Vec = Vec::new(); + let events = serde_json::json!({ "entries": entries }); + + let body = crate::serde::json::to_bytes(&events).unwrap().freeze(); + + let mut request = Request::post(uri) + .header("Content-Type", "application/json") + .body(body) + .unwrap(); + + auth.apply(&mut request); + + let request = request.map(Body::from); + + let response = client.send(request).await?; + + healthcheck_response(response, HealthcheckError::NotFound.into()) +} diff --git a/src/sinks/gcp/stackdriver/logs/encoder.rs b/src/sinks/gcp/stackdriver/logs/encoder.rs new file mode 100644 index 0000000000000..00888cf35997f --- /dev/null +++ b/src/sinks/gcp/stackdriver/logs/encoder.rs @@ -0,0 +1,188 @@ +//! Encoding for the `gcp_stackdriver_logs` sink. + +use std::{collections::HashMap, io}; + +use bytes::BytesMut; +use lookup::lookup_v2::ConfigValuePath; +use serde_json::{json, to_vec, Map}; +use vrl::path::PathPrefix; + +use crate::{ + sinks::{prelude::*, util::encoding::Encoder as SinkEncoder}, + template::TemplateRenderingError, +}; + +use super::config::{StackdriverLogName, StackdriverResource}; + +#[derive(Clone, Debug)] +pub(super) struct StackdriverLogsEncoder { + transformer: Transformer, + log_id: Template, + log_name: StackdriverLogName, + resource: StackdriverResource, + severity_key: Option, +} + +impl StackdriverLogsEncoder { + /// Creates a new `StackdriverLogsEncoder`. + pub(super) const fn new( + transformer: Transformer, + log_id: Template, + log_name: StackdriverLogName, + resource: StackdriverResource, + severity_key: Option, + ) -> Self { + Self { + transformer, + log_id, + log_name, + resource, + severity_key, + } + } + + pub(super) fn encode_event(&self, event: Event) -> Option { + let mut labels = HashMap::with_capacity(self.resource.labels.len()); + for (key, template) in &self.resource.labels { + let value = template + .render_string(&event) + .map_err(|error| { + emit!(crate::internal_events::TemplateRenderingError { + error, + field: Some("resource.labels"), + drop_event: true, + }); + }) + .ok()?; + labels.insert(key.clone(), value); + } + let log_name = self + .log_name(&event) + .map_err(|error| { + emit!(crate::internal_events::TemplateRenderingError { + error, + field: Some("log_id"), + drop_event: true, + }); + }) + .ok()?; + + let mut log = event.into_log(); + let severity = self + .severity_key + .as_ref() + .and_then(|key| log.remove((PathPrefix::Event, &key.0))) + .map(remap_severity) + .unwrap_or_else(|| 0.into()); + + let mut event = Event::Log(log); + self.transformer.transform(&mut event); + + let log = event.into_log(); + + let mut entry = Map::with_capacity(5); + entry.insert("logName".into(), json!(log_name)); + entry.insert("jsonPayload".into(), json!(log)); + entry.insert("severity".into(), json!(severity)); + entry.insert( + "resource".into(), + json!({ + "type": self.resource.type_, + "labels": labels, + }), + ); + + // If the event contains a timestamp, send it in the main message so gcp can pick it up. + if let Some(timestamp) = log.get_timestamp() { + entry.insert("timestamp".into(), json!(timestamp)); + } + + Some(json!(entry)) + } + + fn log_name(&self, event: &Event) -> Result { + use StackdriverLogName::*; + + let log_id = self.log_id.render_string(event)?; + + Ok(match &self.log_name { + BillingAccount(acct) => format!("billingAccounts/{}/logs/{}", acct, log_id), + Folder(folder) => format!("folders/{}/logs/{}", folder, log_id), + Organization(org) => format!("organizations/{}/logs/{}", org, log_id), + Project(project) => format!("projects/{}/logs/{}", project, log_id), + }) + } +} + +pub(super) fn remap_severity(severity: Value) -> Value { + let n = match severity { + Value::Integer(n) => n - n % 100, + Value::Bytes(s) => { + let s = String::from_utf8_lossy(&s); + match s.parse::() { + Ok(n) => (n - n % 100) as i64, + Err(_) => match s.to_uppercase() { + s if s.starts_with("EMERG") || s.starts_with("FATAL") => 800, + s if s.starts_with("ALERT") => 700, + s if s.starts_with("CRIT") => 600, + s if s.starts_with("ERR") || s == "ER" => 500, + s if s.starts_with("WARN") => 400, + s if s.starts_with("NOTICE") => 300, + s if s.starts_with("INFO") => 200, + s if s.starts_with("DEBUG") || s.starts_with("TRACE") => 100, + s if s.starts_with("DEFAULT") => 0, + _ => { + warn!( + message = "Unknown severity value string, using DEFAULT.", + value = %s, + internal_log_rate_limit = true + ); + 0 + } + }, + } + } + value => { + warn!( + message = "Unknown severity value type, using DEFAULT.", + ?value, + internal_log_rate_limit = true + ); + 0 + } + }; + Value::Integer(n) +} + +impl SinkEncoder> for StackdriverLogsEncoder { + fn encode_input( + &self, + events: Vec, + writer: &mut dyn io::Write, + ) -> io::Result<(usize, GroupedCountByteSize)> { + let mut byte_size = telemetry().create_request_count_byte_size(); + let mut n_events = events.len(); + let mut body = BytesMut::new(); + + let mut entries = Vec::with_capacity(n_events); + for event in &events { + let size = event.estimated_json_encoded_size_of(); + if let Some(data) = self.encode_event(event.clone()) { + byte_size.add_event(event, size); + entries.push(data) + } else { + // encode_event() emits the `TemplateRenderingError` internal event, + // which emits an `EventsDropped`, so no need to here. + n_events -= 1; + } + } + + let events = json!({ "entries": entries }); + + body.extend(&to_vec(&events)?); + + let body = body.freeze(); + + write_all(writer, n_events, body.as_ref()).map(|()| (body.len(), byte_size)) + } +} diff --git a/src/sinks/gcp/stackdriver/logs/mod.rs b/src/sinks/gcp/stackdriver/logs/mod.rs new file mode 100644 index 0000000000000..5f5ca2cf03ae5 --- /dev/null +++ b/src/sinks/gcp/stackdriver/logs/mod.rs @@ -0,0 +1,14 @@ +//! The GCP Stackdriver Logs [`vector_core::sink::VectorSink`]. +//! +//! This module contains the [`vector_core::sink::VectorSink`] instance that is responsible for +//! taking a stream of [`vector_core::event::Event`]s and forwarding them to the GCP +//! Stackdriver service. + +mod config; +mod encoder; +mod request_builder; +mod service; +mod sink; + +#[cfg(test)] +mod tests; diff --git a/src/sinks/gcp/stackdriver/logs/request_builder.rs b/src/sinks/gcp/stackdriver/logs/request_builder.rs new file mode 100644 index 0000000000000..25de168cbe3ac --- /dev/null +++ b/src/sinks/gcp/stackdriver/logs/request_builder.rs @@ -0,0 +1,47 @@ +//! `RequestBuilder` implementation for the `gcp_stackdriver_logs` sink. + +use bytes::Bytes; +use std::io; + +use crate::sinks::{prelude::*, util::http::HttpRequest}; + +use super::encoder::StackdriverLogsEncoder; + +pub(super) struct StackdriverLogsRequestBuilder { + pub(super) encoder: StackdriverLogsEncoder, +} + +impl RequestBuilder> for StackdriverLogsRequestBuilder { + type Metadata = EventFinalizers; + type Events = Vec; + type Encoder = StackdriverLogsEncoder; + type Payload = Bytes; + type Request = HttpRequest; + type Error = io::Error; + + fn compression(&self) -> Compression { + Compression::None + } + + fn encoder(&self) -> &Self::Encoder { + &self.encoder + } + + fn split_input( + &self, + mut events: Vec, + ) -> (Self::Metadata, RequestMetadataBuilder, Self::Events) { + let finalizers = events.take_finalizers(); + let builder = RequestMetadataBuilder::from_events(&events); + (finalizers, builder, events) + } + + fn build_request( + &self, + metadata: Self::Metadata, + request_metadata: RequestMetadata, + payload: EncodeResult, + ) -> Self::Request { + HttpRequest::new(payload.into_payload(), metadata, request_metadata) + } +} diff --git a/src/sinks/gcp/stackdriver/logs/service.rs b/src/sinks/gcp/stackdriver/logs/service.rs new file mode 100644 index 0000000000000..ee0a3d86e223f --- /dev/null +++ b/src/sinks/gcp/stackdriver/logs/service.rs @@ -0,0 +1,25 @@ +//! Service implementation for the `gcp_stackdriver_logs` sink. + +use bytes::Bytes; +use http::{Request, Uri}; + +use crate::{gcp::GcpAuthenticator, sinks::util::http::HttpServiceRequestBuilder}; + +#[derive(Debug, Clone)] +pub(super) struct StackdriverLogsServiceRequestBuilder { + pub(super) uri: Uri, + pub(super) auth: GcpAuthenticator, +} + +impl HttpServiceRequestBuilder for StackdriverLogsServiceRequestBuilder { + fn build(&self, body: Bytes) -> Request { + let mut request = Request::post(self.uri.clone()) + .header("Content-Type", "application/json") + .body(body) + .unwrap(); + + self.auth.apply(&mut request); + + request + } +} diff --git a/src/sinks/gcp/stackdriver/logs/sink.rs b/src/sinks/gcp/stackdriver/logs/sink.rs new file mode 100644 index 0000000000000..c15ed0f342ce6 --- /dev/null +++ b/src/sinks/gcp/stackdriver/logs/sink.rs @@ -0,0 +1,77 @@ +//! Implementation of the `gcp_stackdriver_logs` sink. + +use crate::sinks::{ + prelude::*, + util::http::{HttpJsonBatchSizer, HttpRequest}, +}; + +use super::request_builder::StackdriverLogsRequestBuilder; + +pub(super) struct StackdriverLogsSink { + service: S, + batch_settings: BatcherSettings, + request_builder: StackdriverLogsRequestBuilder, +} + +impl StackdriverLogsSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + /// Creates a new `StackdriverLogsSink`. + pub(super) const fn new( + service: S, + batch_settings: BatcherSettings, + request_builder: StackdriverLogsRequestBuilder, + ) -> Self { + Self { + service, + batch_settings, + request_builder, + } + } + + async fn run_inner(self: Box, input: BoxStream<'_, Event>) -> Result<(), ()> { + input + // Batch the input stream with size calculation based on the estimated encoded json size + .batched( + self.batch_settings + .into_item_size_config(HttpJsonBatchSizer), + ) + // Build requests with no concurrency limit. + .request_builder(None, self.request_builder) + // Filter out any errors that occurred in the request building. + .filter_map(|request| async move { + match request { + Err(error) => { + emit!(SinkRequestBuildError { error }); + None + } + Ok(req) => Some(req), + } + }) + // Generate the driver that will send requests and handle retries, + // event finalization, and logging/internal metric reporting. + .into_driver(self.service) + .run() + .await + } +} + +#[async_trait::async_trait] +impl StreamSink for StackdriverLogsSink +where + S: Service + Send + 'static, + S::Future: Send + 'static, + S::Response: DriverResponse + Send + 'static, + S::Error: std::fmt::Debug + Into + Send, +{ + async fn run( + self: Box, + input: futures_util::stream::BoxStream<'_, Event>, + ) -> Result<(), ()> { + self.run_inner(input).await + } +} diff --git a/src/sinks/gcp/stackdriver/logs/tests.rs b/src/sinks/gcp/stackdriver/logs/tests.rs new file mode 100644 index 0000000000000..ee56c7d1c5424 --- /dev/null +++ b/src/sinks/gcp/stackdriver/logs/tests.rs @@ -0,0 +1,297 @@ +//! Unit tests for the `gcp_stackdriver_logs` sink. + +use bytes::Bytes; +use chrono::{TimeZone, Utc}; +use futures::{future::ready, stream}; +use http::Uri; +use indoc::indoc; +use lookup::lookup_v2::ConfigValuePath; +use serde::Deserialize; +use std::collections::HashMap; + +use crate::{ + config::{GenerateConfig, SinkConfig, SinkContext}, + event::{LogEvent, Value}, + gcp::GcpAuthenticator, + sinks::{ + gcp::stackdriver::logs::{ + config::StackdriverLogName, encoder::remap_severity, + service::StackdriverLogsServiceRequestBuilder, + }, + prelude::*, + util::{encoding::Encoder as _, http::HttpServiceRequestBuilder}, + }, + test_util::{ + components::{run_and_assert_sink_compliance, HTTP_SINK_TAGS}, + http::{always_200_response, spawn_blackhole_http_server}, + }, +}; + +use super::{ + config::{default_endpoint, StackdriverConfig, StackdriverResource}, + encoder::StackdriverLogsEncoder, +}; + +#[test] +fn generate_config() { + crate::test_util::test_generate_config::(); +} + +#[tokio::test] +async fn component_spec_compliance() { + let mock_endpoint = spawn_blackhole_http_server(always_200_response).await; + + let config = StackdriverConfig::generate_config().to_string(); + let mut config = StackdriverConfig::deserialize(toml::de::ValueDeserializer::new(&config)) + .expect("config should be valid"); + + // If we don't override the credentials path/API key, it tries to directly call out to the Google Instance + // Metadata API, which we clearly don't have in unit tests. :) + config.auth.credentials_path = None; + config.auth.api_key = Some("fake".to_string().into()); + config.endpoint = mock_endpoint.to_string(); + + let context = SinkContext::default(); + let (sink, _healthcheck) = config.build(context).await.unwrap(); + + let event = Event::Log(LogEvent::from("simple message")); + run_and_assert_sink_compliance(sink, stream::once(ready(event)), &HTTP_SINK_TAGS).await; +} + +#[test] +fn encode_valid() { + let mut transformer = Transformer::default(); + transformer + .set_except_fields(Some(vec![ + "anumber".into(), + "node_id".into(), + "log_id".into(), + ])) + .unwrap(); + + let encoder = StackdriverLogsEncoder::new( + transformer, + Template::try_from("{{ log_id }}").unwrap(), + StackdriverLogName::Project("project".to_owned()), + StackdriverResource { + type_: "generic_node".to_owned(), + labels: HashMap::from([ + ( + "namespace".to_owned(), + Template::try_from("office").unwrap(), + ), + ( + "node_id".to_owned(), + Template::try_from("{{ node_id }}").unwrap(), + ), + ]), + }, + Some(ConfigValuePath::try_from("anumber".to_owned()).unwrap()), + ); + + let log = [ + ("message", "hello world"), + ("anumber", "100"), + ("node_id", "10.10.10.1"), + ("log_id", "testlogs"), + ] + .iter() + .copied() + .collect::(); + let json = encoder.encode_event(Event::from(log)).unwrap(); + assert_eq!( + json, + serde_json::json!({ + "logName":"projects/project/logs/testlogs", + "jsonPayload":{"message":"hello world"}, + "severity":100, + "resource":{ + "type":"generic_node", + "labels":{"namespace":"office","node_id":"10.10.10.1"} + } + }) + ); +} + +#[test] +fn encode_inserts_timestamp() { + let transformer = Transformer::default(); + + let encoder = StackdriverLogsEncoder::new( + transformer, + Template::try_from("testlogs").unwrap(), + StackdriverLogName::Project("project".to_owned()), + StackdriverResource { + type_: "generic_node".to_owned(), + labels: HashMap::from([( + "namespace".to_owned(), + Template::try_from("office").unwrap(), + )]), + }, + Some(ConfigValuePath::try_from("anumber".to_owned()).unwrap()), + ); + + let mut log = LogEvent::default(); + log.insert("message", Value::Bytes("hello world".into())); + log.insert("anumber", Value::Bytes("100".into())); + log.insert( + "timestamp", + Value::Timestamp( + Utc.with_ymd_and_hms(2020, 1, 1, 12, 30, 0) + .single() + .expect("invalid timestamp"), + ), + ); + + let json = encoder.encode_event(Event::from(log)).unwrap(); + assert_eq!( + json, + serde_json::json!({ + "logName":"projects/project/logs/testlogs", + "jsonPayload":{"message":"hello world","timestamp":"2020-01-01T12:30:00Z"}, + "severity":100, + "resource":{ + "type":"generic_node", + "labels":{"namespace":"office"}}, + "timestamp":"2020-01-01T12:30:00Z" + }) + ); +} + +#[test] +fn severity_remaps_strings() { + for &(s, n) in &[ + ("EMERGENCY", 800), // Handles full upper case + ("EMERG", 800), // Handles abbreviations + ("FATAL", 800), // Handles highest alternate + ("alert", 700), // Handles lower case + ("CrIt1c", 600), // Handles mixed case and suffixes + ("err404", 500), // Handles lower case and suffixes + ("warnings", 400), + ("notice", 300), + ("info", 200), + ("DEBUG2", 100), // Handles upper case and suffixes + ("trace", 100), // Handles lowest alternate + ("nothing", 0), // Maps unknown terms to DEFAULT + ("123", 100), // Handles numbers in strings + ("-100", 0), // Maps negatives to DEFAULT + ] { + assert_eq!( + remap_severity(s.into()), + Value::Integer(n), + "remap_severity({:?}) != {}", + s, + n + ); + } +} + +#[tokio::test] +async fn correct_request() { + let uri: Uri = default_endpoint().parse().unwrap(); + + let transformer = Transformer::default(); + let encoder = StackdriverLogsEncoder::new( + transformer, + Template::try_from("testlogs").unwrap(), + StackdriverLogName::Project("project".to_owned()), + StackdriverResource { + type_: "generic_node".to_owned(), + labels: HashMap::from([( + "namespace".to_owned(), + Template::try_from("office").unwrap(), + )]), + }, + None, + ); + + let log1 = [("message", "hello")].iter().copied().collect::(); + let log2 = [("message", "world")].iter().copied().collect::(); + + let events = vec![Event::from(log1), Event::from(log2)]; + + let mut writer = Vec::new(); + let (_, _) = encoder.encode_input(events, &mut writer).unwrap(); + + let body = Bytes::copy_from_slice(&writer); + + let stackdriver_logs_service_request_builder = StackdriverLogsServiceRequestBuilder { + uri: uri.clone(), + auth: GcpAuthenticator::None, + }; + + let request = stackdriver_logs_service_request_builder.build(body); + let (parts, body) = request.into_parts(); + let json: serde_json::Value = serde_json::from_slice(&body[..]).unwrap(); + + assert_eq!( + &parts.uri.to_string(), + "https://logging.googleapis.com/v2/entries:write" + ); + assert_eq!( + json, + serde_json::json!({ + "entries": [ + { + "logName": "projects/project/logs/testlogs", + "severity": 0, + "jsonPayload": { + "message": "hello" + }, + "resource": { + "type": "generic_node", + "labels": { + "namespace": "office" + } + } + }, + { + "logName": "projects/project/logs/testlogs", + "severity": 0, + "jsonPayload": { + "message": "world" + }, + "resource": { + "type": "generic_node", + "labels": { + "namespace": "office" + } + } + } + ] + }) + ); +} + +#[tokio::test] +async fn fails_missing_creds() { + let config: StackdriverConfig = toml::from_str(indoc! {r#" + project_id = "project" + log_id = "testlogs" + resource.type = "generic_node" + resource.namespace = "office" + "#}) + .unwrap(); + if config.build(SinkContext::default()).await.is_ok() { + panic!("config.build failed to error"); + } +} + +#[test] +fn fails_invalid_log_names() { + toml::from_str::(indoc! {r#" + log_id = "testlogs" + resource.type = "generic_node" + resource.namespace = "office" + "#}) + .expect_err("Config parsing failed to error with missing ids"); + + toml::from_str::(indoc! {r#" + project_id = "project" + folder_id = "folder" + log_id = "testlogs" + resource.type = "generic_node" + resource.namespace = "office" + "#}) + .expect_err("Config parsing failed to error with extraneous ids"); +} diff --git a/src/sinks/gcp/stackdriver/mod.rs b/src/sinks/gcp/stackdriver/mod.rs new file mode 100644 index 0000000000000..dff3ee587ba14 --- /dev/null +++ b/src/sinks/gcp/stackdriver/mod.rs @@ -0,0 +1 @@ +mod logs; diff --git a/src/sinks/gcp/stackdriver_logs.rs b/src/sinks/gcp/stackdriver_logs.rs deleted file mode 100644 index 6cf277405fafd..0000000000000 --- a/src/sinks/gcp/stackdriver_logs.rs +++ /dev/null @@ -1,687 +0,0 @@ -use std::collections::HashMap; - -use bytes::Bytes; -use futures::{FutureExt, SinkExt}; -use http::{Request, Uri}; -use hyper::Body; -use lookup::lookup_v2::ConfigValuePath; -use serde_json::{json, map}; -use snafu::Snafu; -use vector_config::configurable_component; -use vrl::path::PathPrefix; -use vrl::value::Kind; - -use crate::{ - codecs::Transformer, - config::{AcknowledgementsConfig, Input, SinkConfig, SinkContext}, - event::{Event, Value}, - gcp::{GcpAuthConfig, GcpAuthenticator, Scope}, - http::HttpClient, - schema, - sinks::{ - gcs_common::config::healthcheck_response, - util::{ - http::{BatchedHttpSink, HttpEventEncoder, HttpSink}, - BatchConfig, BoxedRawValue, JsonArrayBuffer, RealtimeSizeBasedDefaultBatchSettings, - TowerRequestConfig, - }, - Healthcheck, VectorSink, - }, - template::{Template, TemplateRenderingError}, - tls::{TlsConfig, TlsSettings}, -}; - -#[derive(Debug, Snafu)] -enum HealthcheckError { - #[snafu(display("Resource not found"))] - NotFound, -} - -/// Configuration for the `gcp_stackdriver_logs` sink. -#[configurable_component(sink( - "gcp_stackdriver_logs", - "Deliver logs to GCP's Cloud Operations suite." -))] -#[derive(Clone, Debug, Default)] -#[serde(deny_unknown_fields)] -pub struct StackdriverConfig { - #[serde(skip, default = "default_endpoint")] - endpoint: String, - - #[serde(flatten)] - pub log_name: StackdriverLogName, - - /// The log ID to which to publish logs. - /// - /// This is a name you create to identify this log stream. - pub log_id: Template, - - /// The monitored resource to associate the logs with. - pub resource: StackdriverResource, - - /// The field of the log event from which to take the outgoing log’s `severity` field. - /// - /// The named field is removed from the log event if present, and must be either an integer - /// between 0 and 800 or a string containing one of the [severity level names][sev_names] (case - /// is ignored) or a common prefix such as `err`. - /// - /// If no severity key is specified, the severity of outgoing records is set to 0 (`DEFAULT`). - /// - /// See the [GCP Stackdriver Logging LogSeverity description][logsev_docs] for more details on - /// the value of the `severity` field. - /// - /// [sev_names]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity - /// [logsev_docs]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity - #[configurable(metadata(docs::examples = "severity"))] - pub severity_key: Option, - - #[serde(flatten)] - pub auth: GcpAuthConfig, - - #[configurable(derived)] - #[serde( - default, - skip_serializing_if = "crate::serde::skip_serializing_if_default" - )] - pub encoding: Transformer, - - #[configurable(derived)] - #[serde(default)] - pub batch: BatchConfig, - - #[configurable(derived)] - #[serde(default)] - pub request: TowerRequestConfig, - - #[configurable(derived)] - pub tls: Option, - - #[configurable(derived)] - #[serde( - default, - deserialize_with = "crate::serde::bool_or_struct", - skip_serializing_if = "crate::serde::skip_serializing_if_default" - )] - acknowledgements: AcknowledgementsConfig, -} - -fn default_endpoint() -> String { - "https://logging.googleapis.com/v2/entries:write".to_string() -} - -#[derive(Clone, Debug)] -struct StackdriverSink { - config: StackdriverConfig, - auth: GcpAuthenticator, - severity_key: Option, - uri: Uri, -} - -// 10MB limit for entries.write: https://cloud.google.com/logging/quotas#api-limits -const MAX_BATCH_PAYLOAD_SIZE: usize = 10_000_000; - -/// Logging locations. -#[configurable_component] -#[derive(Clone, Debug, Derivative)] -#[derivative(Default)] -pub enum StackdriverLogName { - /// The billing account ID to which to publish logs. - /// - /// Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set. - #[serde(rename = "billing_account_id")] - #[configurable(metadata(docs::examples = "012345-6789AB-CDEF01"))] - BillingAccount(String), - - /// The folder ID to which to publish logs. - /// - /// See the [Google Cloud Platform folder documentation][folder_docs] for more details. - /// - /// Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set. - /// - /// [folder_docs]: https://cloud.google.com/resource-manager/docs/creating-managing-folders - #[serde(rename = "folder_id")] - #[configurable(metadata(docs::examples = "My Folder"))] - Folder(String), - - /// The organization ID to which to publish logs. - /// - /// This would be the identifier assigned to your organization on Google Cloud Platform. - /// - /// Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set. - #[serde(rename = "organization_id")] - #[configurable(metadata(docs::examples = "622418129737"))] - Organization(String), - - /// The project ID to which to publish logs. - /// - /// See the [Google Cloud Platform project management documentation][project_docs] for more details. - /// - /// Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set. - /// - /// [project_docs]: https://cloud.google.com/resource-manager/docs/creating-managing-projects - #[derivative(Default)] - #[serde(rename = "project_id")] - #[configurable(metadata(docs::examples = "vector-123456"))] - Project(String), -} - -/// A monitored resource. -/// -/// Monitored resources in GCP allow associating logs and metrics specifically with native resources -/// within Google Cloud Platform. This takes the form of a "type" field which identifies the -/// resource, and a set of type-specific labels to uniquely identify a resource of that type. -/// -/// See [Monitored resource types][mon_docs] for more information. -/// -/// [mon_docs]: https://cloud.google.com/monitoring/api/resources -// TODO: this type is specific to the stackdrivers log sink because it allows for template-able -// label values, but we should consider replacing `sinks::gcp::GcpTypedResource` with this so both -// the stackdriver metrics _and_ logs sink can have template-able label values, and less duplication -#[configurable_component] -#[derive(Clone, Debug, Default)] -pub struct StackdriverResource { - /// The monitored resource type. - /// - /// For example, the type of a Compute Engine VM instance is `gce_instance`. - /// See the [Google Cloud Platform monitored resource documentation][gcp_resources] for - /// more details. - /// - /// [gcp_resources]: https://cloud.google.com/monitoring/api/resources - #[serde(rename = "type")] - pub type_: String, - - /// Type-specific labels. - #[serde(flatten)] - #[configurable(metadata(docs::additional_props_description = "A type-specific label."))] - #[configurable(metadata(docs::examples = "label_examples()"))] - pub labels: HashMap, -} - -fn label_examples() -> HashMap { - let mut example = HashMap::new(); - example.insert("instanceId".to_string(), "Twilight".to_string()); - example.insert("zone".to_string(), "{{ zone }}".to_string()); - example -} - -impl_generate_config_from_default!(StackdriverConfig); - -#[async_trait::async_trait] -#[typetag::serde(name = "gcp_stackdriver_logs")] -impl SinkConfig for StackdriverConfig { - async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> { - let auth = self.auth.build(Scope::LoggingWrite).await?; - - let batch = self - .batch - .validate()? - .limit_max_bytes(MAX_BATCH_PAYLOAD_SIZE)? - .into_batch_settings()?; - let request = self.request.unwrap_with( - &TowerRequestConfig::default() - .rate_limit_duration_secs(1) - .rate_limit_num(1000), - ); - let tls_settings = TlsSettings::from_options(&self.tls)?; - let client = HttpClient::new(tls_settings, cx.proxy())?; - - let sink = StackdriverSink { - config: self.clone(), - auth: auth.clone(), - severity_key: self.severity_key.clone(), - uri: self.endpoint.parse().unwrap(), - }; - - let healthcheck = healthcheck(client.clone(), sink.clone()).boxed(); - auth.spawn_regenerate_token(); - let sink = BatchedHttpSink::new( - sink, - JsonArrayBuffer::new(batch.size), - request, - batch.timeout, - client, - ) - .sink_map_err(|error| error!(message = "Fatal gcp_stackdriver_logs sink error.", %error)); - - #[allow(deprecated)] - Ok((VectorSink::from_event_sink(sink), healthcheck)) - } - - fn input(&self) -> Input { - let requirement = - schema::Requirement::empty().required_meaning("timestamp", Kind::timestamp()); - - Input::log().with_schema_requirement(requirement) - } - - fn acknowledgements(&self) -> &AcknowledgementsConfig { - &self.acknowledgements - } -} - -struct StackdriverEventEncoder { - config: StackdriverConfig, - severity_key: Option, -} - -impl HttpEventEncoder for StackdriverEventEncoder { - fn encode_event(&mut self, event: Event) -> Option { - let mut labels = HashMap::with_capacity(self.config.resource.labels.len()); - for (key, template) in &self.config.resource.labels { - let value = template - .render_string(&event) - .map_err(|error| { - emit!(crate::internal_events::TemplateRenderingError { - error, - field: Some("resource.labels"), - drop_event: true, - }); - }) - .ok()?; - labels.insert(key.clone(), value); - } - let log_name = self - .config - .log_name(&event) - .map_err(|error| { - emit!(crate::internal_events::TemplateRenderingError { - error, - field: Some("log_id"), - drop_event: true, - }); - }) - .ok()?; - - let mut log = event.into_log(); - let severity = self - .severity_key - .as_ref() - .and_then(|key| log.remove((PathPrefix::Event, &key.0))) - .map(remap_severity) - .unwrap_or_else(|| 0.into()); - - let mut event = Event::Log(log); - self.config.encoding.transform(&mut event); - - let log = event.into_log(); - - let mut entry = map::Map::with_capacity(5); - entry.insert("logName".into(), json!(log_name)); - entry.insert("jsonPayload".into(), json!(log)); - entry.insert("severity".into(), json!(severity)); - entry.insert( - "resource".into(), - json!({ - "type": self.config.resource.type_, - "labels": labels, - }), - ); - - // If the event contains a timestamp, send it in the main message so gcp can pick it up. - if let Some(timestamp) = log.get_timestamp() { - entry.insert("timestamp".into(), json!(timestamp)); - } - - Some(json!(entry)) - } -} - -#[async_trait::async_trait] -impl HttpSink for StackdriverSink { - type Input = serde_json::Value; - type Output = Vec; - type Encoder = StackdriverEventEncoder; - - fn build_encoder(&self) -> Self::Encoder { - StackdriverEventEncoder { - config: self.config.clone(), - severity_key: self.severity_key.clone(), - } - } - - async fn build_request(&self, events: Self::Output) -> crate::Result> { - let events = serde_json::json!({ "entries": events }); - - let body = crate::serde::json::to_bytes(&events).unwrap().freeze(); - - let mut request = Request::post(self.uri.clone()) - .header("Content-Type", "application/json") - .body(body) - .unwrap(); - self.auth.apply(&mut request); - - Ok(request) - } -} - -fn remap_severity(severity: Value) -> Value { - let n = match severity { - Value::Integer(n) => n - n % 100, - Value::Bytes(s) => { - let s = String::from_utf8_lossy(&s); - match s.parse::() { - Ok(n) => (n - n % 100) as i64, - Err(_) => match s.to_uppercase() { - s if s.starts_with("EMERG") || s.starts_with("FATAL") => 800, - s if s.starts_with("ALERT") => 700, - s if s.starts_with("CRIT") => 600, - s if s.starts_with("ERR") || s == "ER" => 500, - s if s.starts_with("WARN") => 400, - s if s.starts_with("NOTICE") => 300, - s if s.starts_with("INFO") => 200, - s if s.starts_with("DEBUG") || s.starts_with("TRACE") => 100, - s if s.starts_with("DEFAULT") => 0, - _ => { - warn!( - message = "Unknown severity value string, using DEFAULT.", - value = %s, - internal_log_rate_limit = true - ); - 0 - } - }, - } - } - value => { - warn!( - message = "Unknown severity value type, using DEFAULT.", - ?value, - internal_log_rate_limit = true - ); - 0 - } - }; - Value::Integer(n) -} - -async fn healthcheck(client: HttpClient, sink: StackdriverSink) -> crate::Result<()> { - let request = sink.build_request(vec![]).await?.map(Body::from); - - let response = client.send(request).await?; - healthcheck_response(response, HealthcheckError::NotFound.into()) -} - -impl StackdriverConfig { - fn log_name(&self, event: &Event) -> Result { - use StackdriverLogName::*; - - let log_id = self.log_id.render_string(event)?; - - Ok(match &self.log_name { - BillingAccount(acct) => format!("billingAccounts/{}/logs/{}", acct, log_id), - Folder(folder) => format!("folders/{}/logs/{}", folder, log_id), - Organization(org) => format!("organizations/{}/logs/{}", org, log_id), - Project(project) => format!("projects/{}/logs/{}", project, log_id), - }) - } -} - -#[cfg(test)] -mod tests { - use chrono::{TimeZone, Utc}; - use futures::{future::ready, stream}; - use indoc::indoc; - use serde::Deserialize; - use serde_json::value::RawValue; - - use super::*; - use crate::{ - config::{GenerateConfig, SinkConfig, SinkContext}, - event::{LogEvent, Value}, - test_util::{ - components::{run_and_assert_sink_compliance, SINK_TAGS}, - http::{always_200_response, spawn_blackhole_http_server}, - }, - }; - - #[test] - fn generate_config() { - crate::test_util::test_generate_config::(); - } - - #[tokio::test] - async fn component_spec_compliance() { - let mock_endpoint = spawn_blackhole_http_server(always_200_response).await; - - let config = StackdriverConfig::generate_config().to_string(); - let mut config = StackdriverConfig::deserialize(toml::de::ValueDeserializer::new(&config)) - .expect("config should be valid"); - - // If we don't override the credentials path/API key, it tries to directly call out to the Google Instance - // Metadata API, which we clearly don't have in unit tests. :) - config.auth.credentials_path = None; - config.auth.api_key = Some("fake".to_string().into()); - config.endpoint = mock_endpoint.to_string(); - - let context = SinkContext::default(); - let (sink, _healthcheck) = config.build(context).await.unwrap(); - - let event = Event::Log(LogEvent::from("simple message")); - run_and_assert_sink_compliance(sink, stream::once(ready(event)), &SINK_TAGS).await; - } - - #[test] - fn encode_valid() { - let config: StackdriverConfig = toml::from_str(indoc! {r#" - project_id = "project" - log_id = "{{ log_id }}" - resource.type = "generic_node" - resource.namespace = "office" - resource.node_id = "{{ node_id }}" - encoding.except_fields = ["anumber", "node_id", "log_id"] - "#}) - .unwrap(); - - let sink = StackdriverSink { - config, - auth: GcpAuthenticator::None, - severity_key: Some("anumber".into()), - uri: default_endpoint().parse().unwrap(), - }; - let mut encoder = sink.build_encoder(); - - let log = [ - ("message", "hello world"), - ("anumber", "100"), - ("node_id", "10.10.10.1"), - ("log_id", "testlogs"), - ] - .iter() - .copied() - .collect::(); - let json = encoder.encode_event(Event::from(log)).unwrap(); - assert_eq!( - json, - serde_json::json!({ - "logName":"projects/project/logs/testlogs", - "jsonPayload":{"message":"hello world"}, - "severity":100, - "resource":{ - "type":"generic_node", - "labels":{"namespace":"office","node_id":"10.10.10.1"} - } - }) - ); - } - - #[test] - fn encode_inserts_timestamp() { - let config: StackdriverConfig = toml::from_str(indoc! {r#" - project_id = "project" - log_id = "testlogs" - resource.type = "generic_node" - resource.namespace = "office" - "#}) - .unwrap(); - - let sink = StackdriverSink { - config, - auth: GcpAuthenticator::None, - severity_key: Some("anumber".into()), - uri: default_endpoint().parse().unwrap(), - }; - let mut encoder = sink.build_encoder(); - - let mut log = LogEvent::default(); - log.insert("message", Value::Bytes("hello world".into())); - log.insert("anumber", Value::Bytes("100".into())); - log.insert( - "timestamp", - Value::Timestamp( - Utc.with_ymd_and_hms(2020, 1, 1, 12, 30, 0) - .single() - .expect("invalid timestamp"), - ), - ); - - let json = encoder.encode_event(Event::from(log)).unwrap(); - assert_eq!( - json, - serde_json::json!({ - "logName":"projects/project/logs/testlogs", - "jsonPayload":{"message":"hello world","timestamp":"2020-01-01T12:30:00Z"}, - "severity":100, - "resource":{ - "type":"generic_node", - "labels":{"namespace":"office"}}, - "timestamp":"2020-01-01T12:30:00Z" - }) - ); - } - - #[test] - fn severity_remaps_strings() { - for &(s, n) in &[ - ("EMERGENCY", 800), // Handles full upper case - ("EMERG", 800), // Handles abbreviations - ("FATAL", 800), // Handles highest alternate - ("alert", 700), // Handles lower case - ("CrIt1c", 600), // Handles mixed case and suffixes - ("err404", 500), // Handles lower case and suffixes - ("warnings", 400), - ("notice", 300), - ("info", 200), - ("DEBUG2", 100), // Handles upper case and suffixes - ("trace", 100), // Handles lowest alternate - ("nothing", 0), // Maps unknown terms to DEFAULT - ("123", 100), // Handles numbers in strings - ("-100", 0), // Maps negatives to DEFAULT - ] { - assert_eq!( - remap_severity(s.into()), - Value::Integer(n), - "remap_severity({:?}) != {}", - s, - n - ); - } - } - - #[tokio::test] - async fn correct_request() { - let config: StackdriverConfig = toml::from_str(indoc! {r#" - project_id = "project" - log_id = "testlogs" - resource.type = "generic_node" - resource.namespace = "office" - "#}) - .unwrap(); - - let sink = StackdriverSink { - config, - auth: GcpAuthenticator::None, - severity_key: None, - uri: default_endpoint().parse().unwrap(), - }; - let mut encoder = sink.build_encoder(); - - let log1 = [("message", "hello")].iter().copied().collect::(); - let log2 = [("message", "world")].iter().copied().collect::(); - let event1 = encoder.encode_event(Event::from(log1)).unwrap(); - let event2 = encoder.encode_event(Event::from(log2)).unwrap(); - - let json1 = serde_json::to_string(&event1).unwrap(); - let json2 = serde_json::to_string(&event2).unwrap(); - let raw1 = RawValue::from_string(json1).unwrap(); - let raw2 = RawValue::from_string(json2).unwrap(); - - let events = vec![raw1, raw2]; - - let request = sink.build_request(events).await.unwrap(); - - let (parts, body) = request.into_parts(); - - let json: serde_json::Value = serde_json::from_slice(&body[..]).unwrap(); - - assert_eq!( - &parts.uri.to_string(), - "https://logging.googleapis.com/v2/entries:write" - ); - assert_eq!( - json, - serde_json::json!({ - "entries": [ - { - "logName": "projects/project/logs/testlogs", - "severity": 0, - "jsonPayload": { - "message": "hello" - }, - "resource": { - "type": "generic_node", - "labels": { - "namespace": "office" - } - } - }, - { - "logName": "projects/project/logs/testlogs", - "severity": 0, - "jsonPayload": { - "message": "world" - }, - "resource": { - "type": "generic_node", - "labels": { - "namespace": "office" - } - } - } - ] - }) - ); - } - - #[tokio::test] - async fn fails_missing_creds() { - let config: StackdriverConfig = toml::from_str(indoc! {r#" - project_id = "project" - log_id = "testlogs" - resource.type = "generic_node" - resource.namespace = "office" - "#}) - .unwrap(); - if config.build(SinkContext::default()).await.is_ok() { - panic!("config.build failed to error"); - } - } - - #[test] - fn fails_invalid_log_names() { - toml::from_str::(indoc! {r#" - log_id = "testlogs" - resource.type = "generic_node" - resource.namespace = "office" - "#}) - .expect_err("Config parsing failed to error with missing ids"); - - toml::from_str::(indoc! {r#" - project_id = "project" - folder_id = "folder" - log_id = "testlogs" - resource.type = "generic_node" - resource.namespace = "office" - "#}) - .expect_err("Config parsing failed to error with extraneous ids"); - } -} diff --git a/src/sinks/util/http.rs b/src/sinks/util/http.rs index 4dbfcab0ab4f8..93a56f5bf6097 100644 --- a/src/sinks/util/http.rs +++ b/src/sinks/util/http.rs @@ -20,7 +20,9 @@ use snafu::{ResultExt, Snafu}; use tower::{Service, ServiceBuilder}; use tower_http::decompression::DecompressionLayer; use vector_config::configurable_component; -use vector_core::{ByteSizeOf, EstimatedJsonEncodedSizeOf}; +use vector_core::{ + stream::batcher::limiter::ItemBatchSize, ByteSizeOf, EstimatedJsonEncodedSizeOf, +}; use super::{ retries::{RetryAction, RetryLogic}, @@ -678,6 +680,24 @@ impl DriverResponse for HttpResponse { } } +/// Creates a `RetryLogic` for use with `HttpResponse`. +pub fn http_response_retry_logic() -> HttpStatusRetryLogic< + impl Fn(&HttpResponse) -> StatusCode + Clone + Send + Sync + 'static, + HttpResponse, +> { + HttpStatusRetryLogic::new(|req: &HttpResponse| req.http_response.status()) +} + +/// Uses the estimated json encoded size to determine batch sizing. +#[derive(Default)] +pub struct HttpJsonBatchSizer; + +impl ItemBatchSize for HttpJsonBatchSizer { + fn size(&self, item: &Event) -> usize { + item.estimated_json_encoded_size_of().get() + } +} + /// HTTP request builder for HTTP stream sinks using the generic `HttpService` pub trait HttpServiceRequestBuilder { fn build(&self, body: Bytes) -> Request;