Skip to content

Commit 0d8ab26

Browse files
authored
chore(gcp_stackdriver_logs sink): refactor to new style (#18335)
* First iteration, unit tests pass. * Extract common Service functionality for HTTP based stream sinks. * docs touchup * Touch ups * spell checker * fix rust doc * hopefully fix encoding regression * Try item sized batching * Refactor a bit * cleanup * cleanup * doc clean up * extract common service code for re use in other HTTP sinks * feeback sw * duplicate * feedback ds * feedback ds * clippy * unit tests pass * First iteration, unit tests pass. * Extract common Service functionality for HTTP based stream sinks. * docs touchup * Touch ups * spell checker * fix rust doc * hopefully fix encoding regression * Try item sized batching * Refactor a bit * cleanup * cleanup * doc clean up * extract common service code for re use in other HTTP sinks * feeback sw * duplicate * feedback ds * feedback ds * clippy * unit tests pass * cleanup * use the common batcher * merge conflicts * clippy * feedback ds
1 parent e104dc4 commit 0d8ab26

File tree

10 files changed

+928
-688
lines changed

10 files changed

+928
-688
lines changed

src/sinks/gcp/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ use vector_config::configurable_component;
66
pub mod chronicle_unstructured;
77
pub mod cloud_storage;
88
pub mod pubsub;
9-
pub mod stackdriver_logs;
9+
pub mod stackdriver;
1010
pub mod stackdriver_metrics;
1111

1212
/// A monitored resource.
+278
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,278 @@
1+
//! Configuration for the `gcp_stackdriver_logs` sink.
2+
3+
use crate::{
4+
gcp::{GcpAuthConfig, GcpAuthenticator, Scope},
5+
http::HttpClient,
6+
schema,
7+
sinks::{
8+
gcs_common::config::healthcheck_response,
9+
prelude::*,
10+
util::{
11+
http::{http_response_retry_logic, HttpService},
12+
BoxedRawValue, RealtimeSizeBasedDefaultBatchSettings,
13+
},
14+
},
15+
};
16+
use http::{Request, Uri};
17+
use hyper::Body;
18+
use lookup::lookup_v2::ConfigValuePath;
19+
use snafu::Snafu;
20+
use std::collections::HashMap;
21+
use vrl::value::Kind;
22+
23+
use super::{
24+
encoder::StackdriverLogsEncoder, request_builder::StackdriverLogsRequestBuilder,
25+
service::StackdriverLogsServiceRequestBuilder, sink::StackdriverLogsSink,
26+
};
27+
28+
#[derive(Debug, Snafu)]
29+
enum HealthcheckError {
30+
#[snafu(display("Resource not found"))]
31+
NotFound,
32+
}
33+
34+
/// Configuration for the `gcp_stackdriver_logs` sink.
35+
#[configurable_component(sink(
36+
"gcp_stackdriver_logs",
37+
"Deliver logs to GCP's Cloud Operations suite."
38+
))]
39+
#[derive(Clone, Debug, Default)]
40+
#[serde(deny_unknown_fields)]
41+
pub(super) struct StackdriverConfig {
42+
#[serde(skip, default = "default_endpoint")]
43+
pub(super) endpoint: String,
44+
45+
#[serde(flatten)]
46+
pub(super) log_name: StackdriverLogName,
47+
48+
/// The log ID to which to publish logs.
49+
///
50+
/// This is a name you create to identify this log stream.
51+
pub(super) log_id: Template,
52+
53+
/// The monitored resource to associate the logs with.
54+
pub(super) resource: StackdriverResource,
55+
56+
/// The field of the log event from which to take the outgoing log’s `severity` field.
57+
///
58+
/// The named field is removed from the log event if present, and must be either an integer
59+
/// between 0 and 800 or a string containing one of the [severity level names][sev_names] (case
60+
/// is ignored) or a common prefix such as `err`.
61+
///
62+
/// If no severity key is specified, the severity of outgoing records is set to 0 (`DEFAULT`).
63+
///
64+
/// See the [GCP Stackdriver Logging LogSeverity description][logsev_docs] for more details on
65+
/// the value of the `severity` field.
66+
///
67+
/// [sev_names]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity
68+
/// [logsev_docs]: https://cloud.google.com/logging/docs/reference/v2/rest/v2/LogEntry#logseverity
69+
#[configurable(metadata(docs::examples = "severity"))]
70+
pub(super) severity_key: Option<ConfigValuePath>,
71+
72+
#[serde(flatten)]
73+
pub(super) auth: GcpAuthConfig,
74+
75+
#[configurable(derived)]
76+
#[serde(
77+
default,
78+
skip_serializing_if = "crate::serde::skip_serializing_if_default"
79+
)]
80+
pub(super) encoding: Transformer,
81+
82+
#[configurable(derived)]
83+
#[serde(default)]
84+
pub(super) batch: BatchConfig<RealtimeSizeBasedDefaultBatchSettings>,
85+
86+
#[configurable(derived)]
87+
#[serde(default)]
88+
pub(super) request: TowerRequestConfig,
89+
90+
#[configurable(derived)]
91+
pub(super) tls: Option<TlsConfig>,
92+
93+
#[configurable(derived)]
94+
#[serde(
95+
default,
96+
deserialize_with = "crate::serde::bool_or_struct",
97+
skip_serializing_if = "crate::serde::skip_serializing_if_default"
98+
)]
99+
acknowledgements: AcknowledgementsConfig,
100+
}
101+
102+
pub(super) fn default_endpoint() -> String {
103+
"https://logging.googleapis.com/v2/entries:write".to_string()
104+
}
105+
106+
// 10MB limit for entries.write: https://cloud.google.com/logging/quotas#api-limits
107+
const MAX_BATCH_PAYLOAD_SIZE: usize = 10_000_000;
108+
109+
/// Logging locations.
110+
#[configurable_component]
111+
#[derive(Clone, Debug, Derivative)]
112+
#[derivative(Default)]
113+
pub(super) enum StackdriverLogName {
114+
/// The billing account ID to which to publish logs.
115+
///
116+
/// Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set.
117+
#[serde(rename = "billing_account_id")]
118+
#[configurable(metadata(docs::examples = "012345-6789AB-CDEF01"))]
119+
BillingAccount(String),
120+
121+
/// The folder ID to which to publish logs.
122+
///
123+
/// See the [Google Cloud Platform folder documentation][folder_docs] for more details.
124+
///
125+
/// Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set.
126+
///
127+
/// [folder_docs]: https://cloud.google.com/resource-manager/docs/creating-managing-folders
128+
#[serde(rename = "folder_id")]
129+
#[configurable(metadata(docs::examples = "My Folder"))]
130+
Folder(String),
131+
132+
/// The organization ID to which to publish logs.
133+
///
134+
/// This would be the identifier assigned to your organization on Google Cloud Platform.
135+
///
136+
/// Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set.
137+
#[serde(rename = "organization_id")]
138+
#[configurable(metadata(docs::examples = "622418129737"))]
139+
Organization(String),
140+
141+
/// The project ID to which to publish logs.
142+
///
143+
/// See the [Google Cloud Platform project management documentation][project_docs] for more details.
144+
///
145+
/// Exactly one of `billing_account_id`, `folder_id`, `organization_id`, or `project_id` must be set.
146+
///
147+
/// [project_docs]: https://cloud.google.com/resource-manager/docs/creating-managing-projects
148+
#[derivative(Default)]
149+
#[serde(rename = "project_id")]
150+
#[configurable(metadata(docs::examples = "vector-123456"))]
151+
Project(String),
152+
}
153+
154+
/// A monitored resource.
155+
///
156+
/// Monitored resources in GCP allow associating logs and metrics specifically with native resources
157+
/// within Google Cloud Platform. This takes the form of a "type" field which identifies the
158+
/// resource, and a set of type-specific labels to uniquely identify a resource of that type.
159+
///
160+
/// See [Monitored resource types][mon_docs] for more information.
161+
///
162+
/// [mon_docs]: https://cloud.google.com/monitoring/api/resources
163+
// TODO: this type is specific to the stackdrivers log sink because it allows for template-able
164+
// label values, but we should consider replacing `sinks::gcp::GcpTypedResource` with this so both
165+
// the stackdriver metrics _and_ logs sink can have template-able label values, and less duplication
166+
#[configurable_component]
167+
#[derive(Clone, Debug, Default)]
168+
pub(super) struct StackdriverResource {
169+
/// The monitored resource type.
170+
///
171+
/// For example, the type of a Compute Engine VM instance is `gce_instance`.
172+
/// See the [Google Cloud Platform monitored resource documentation][gcp_resources] for
173+
/// more details.
174+
///
175+
/// [gcp_resources]: https://cloud.google.com/monitoring/api/resources
176+
#[serde(rename = "type")]
177+
pub(super) type_: String,
178+
179+
/// Type-specific labels.
180+
#[serde(flatten)]
181+
#[configurable(metadata(docs::additional_props_description = "A type-specific label."))]
182+
#[configurable(metadata(docs::examples = "label_examples()"))]
183+
pub(super) labels: HashMap<String, Template>,
184+
}
185+
186+
fn label_examples() -> HashMap<String, String> {
187+
let mut example = HashMap::new();
188+
example.insert("instanceId".to_string(), "Twilight".to_string());
189+
example.insert("zone".to_string(), "{{ zone }}".to_string());
190+
example
191+
}
192+
193+
impl_generate_config_from_default!(StackdriverConfig);
194+
195+
#[async_trait::async_trait]
196+
#[typetag::serde(name = "gcp_stackdriver_logs")]
197+
impl SinkConfig for StackdriverConfig {
198+
async fn build(&self, cx: SinkContext) -> crate::Result<(VectorSink, Healthcheck)> {
199+
let auth = self.auth.build(Scope::LoggingWrite).await?;
200+
201+
let request_builder = StackdriverLogsRequestBuilder {
202+
encoder: StackdriverLogsEncoder::new(
203+
self.encoding.clone(),
204+
self.log_id.clone(),
205+
self.log_name.clone(),
206+
self.resource.clone(),
207+
self.severity_key.clone(),
208+
),
209+
};
210+
211+
let batch_settings = self
212+
.batch
213+
.validate()?
214+
.limit_max_bytes(MAX_BATCH_PAYLOAD_SIZE)?
215+
.into_batcher_settings()?;
216+
217+
let request_limits = self.request.unwrap_with(
218+
&TowerRequestConfig::default()
219+
.rate_limit_duration_secs(1)
220+
.rate_limit_num(1000),
221+
);
222+
223+
let tls_settings = TlsSettings::from_options(&self.tls)?;
224+
let client = HttpClient::new(tls_settings, cx.proxy())?;
225+
226+
let uri: Uri = self.endpoint.parse()?;
227+
228+
let stackdriver_logs_service_request_builder = StackdriverLogsServiceRequestBuilder {
229+
uri: uri.clone(),
230+
auth: auth.clone(),
231+
};
232+
233+
let service = HttpService::new(client.clone(), stackdriver_logs_service_request_builder);
234+
235+
let service = ServiceBuilder::new()
236+
.settings(request_limits, http_response_retry_logic())
237+
.service(service);
238+
239+
let sink = StackdriverLogsSink::new(service, batch_settings, request_builder);
240+
241+
let healthcheck = healthcheck(client, auth.clone(), uri).boxed();
242+
243+
auth.spawn_regenerate_token();
244+
245+
Ok((VectorSink::from_event_streamsink(sink), healthcheck))
246+
}
247+
248+
fn input(&self) -> Input {
249+
let requirement =
250+
schema::Requirement::empty().required_meaning("timestamp", Kind::timestamp());
251+
252+
Input::log().with_schema_requirement(requirement)
253+
}
254+
255+
fn acknowledgements(&self) -> &AcknowledgementsConfig {
256+
&self.acknowledgements
257+
}
258+
}
259+
260+
async fn healthcheck(client: HttpClient, auth: GcpAuthenticator, uri: Uri) -> crate::Result<()> {
261+
let entries: Vec<BoxedRawValue> = Vec::new();
262+
let events = serde_json::json!({ "entries": entries });
263+
264+
let body = crate::serde::json::to_bytes(&events).unwrap().freeze();
265+
266+
let mut request = Request::post(uri)
267+
.header("Content-Type", "application/json")
268+
.body(body)
269+
.unwrap();
270+
271+
auth.apply(&mut request);
272+
273+
let request = request.map(Body::from);
274+
275+
let response = client.send(request).await?;
276+
277+
healthcheck_response(response, HealthcheckError::NotFound.into())
278+
}

0 commit comments

Comments
 (0)