Skip to content

Commit a06c711

Browse files
Daniel599pront
andauthored
feat(codecs): add support for protobuf decoding (#18019)
* feat(codecs): add support for protobuf decoding, WIP still have some TODO to resolve * feat(codecs): add support for protobuf. code-review fixes: handle unwraps and fix support for empty buffer as a message, allowed in protobuf. * feat(codecs): add support for protobuf. code-review fixes: use `kind::any()` instead of `kind::json()`. use `unimplemented!()` instead of `todo!()`. in tests, add checks for List and Map. in `ProtobufDeserializer::new`, refactor out creation of MessageDescriptor. run `cargo fmt`. * feat(codecs): add support for protobuf. code-review fixes: apply suggested refactor to `to_vrl`, it's slightly slower, might improve in following PR. * clippy fixes and minor refactoring * address Bruce's comments * update test code to use new log schema interface * generate docs --------- Co-authored-by: Pavlos Rontidis <[email protected]>
1 parent 8a2f8f6 commit a06c711

32 files changed

+742
-1
lines changed

Cargo.lock

+4
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/codecs/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@ memchr = { version = "2", default-features = false }
1717
once_cell = { version = "1.18", default-features = false }
1818
ordered-float = { version = "3.7.0", default-features = false }
1919
prost = { version = "0.11.8", default-features = false, features = ["std"] }
20+
prost-reflect = { version = "0.11", default-features = false, features = ["serde"] }
2021
regex = { version = "1.9.1", default-features = false, features = ["std", "perf"] }
2122
serde = { version = "1", default-features = false, features = ["derive"] }
2223
serde_json = { version = "1", default-features = false }

lib/codecs/src/decoding/format/mod.rs

+2
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ mod gelf;
88
mod json;
99
mod native;
1010
mod native_json;
11+
mod protobuf;
1112
#[cfg(feature = "syslog")]
1213
mod syslog;
1314

@@ -19,6 +20,7 @@ pub use native::{NativeDeserializer, NativeDeserializerConfig};
1920
pub use native_json::{
2021
NativeJsonDeserializer, NativeJsonDeserializerConfig, NativeJsonDeserializerOptions,
2122
};
23+
pub use protobuf::{ProtobufDeserializer, ProtobufDeserializerConfig};
2224
use smallvec::SmallVec;
2325
#[cfg(feature = "syslog")]
2426
pub use syslog::{SyslogDeserializer, SyslogDeserializerConfig, SyslogDeserializerOptions};
+353
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,353 @@
1+
use std::collections::BTreeMap;
2+
use std::fs;
3+
use std::path::PathBuf;
4+
5+
use bytes::Bytes;
6+
use chrono::Utc;
7+
use ordered_float::NotNan;
8+
use prost_reflect::{DescriptorPool, DynamicMessage, MessageDescriptor, ReflectMessage};
9+
use smallvec::{smallvec, SmallVec};
10+
use vector_config::configurable_component;
11+
use vector_core::event::LogEvent;
12+
use vector_core::{
13+
config::{log_schema, DataType, LogNamespace},
14+
event::Event,
15+
schema,
16+
};
17+
use vrl::value::Kind;
18+
19+
use super::Deserializer;
20+
21+
/// Config used to build a `ProtobufDeserializer`.
22+
#[configurable_component]
23+
#[derive(Debug, Clone, Default)]
24+
pub struct ProtobufDeserializerConfig {
25+
/// Path to desc file
26+
desc_file: PathBuf,
27+
28+
/// message type. e.g package.message
29+
message_type: String,
30+
}
31+
32+
impl ProtobufDeserializerConfig {
33+
/// Build the `ProtobufDeserializer` from this configuration.
34+
pub fn build(&self) -> ProtobufDeserializer {
35+
// TODO return a Result instead.
36+
ProtobufDeserializer::try_from(self).unwrap()
37+
}
38+
39+
/// Return the type of event build by this deserializer.
40+
pub fn output_type(&self) -> DataType {
41+
DataType::Log
42+
}
43+
44+
/// The schema produced by the deserializer.
45+
pub fn schema_definition(&self, log_namespace: LogNamespace) -> schema::Definition {
46+
match log_namespace {
47+
LogNamespace::Legacy => {
48+
let mut definition =
49+
schema::Definition::empty_legacy_namespace().unknown_fields(Kind::any());
50+
51+
if let Some(timestamp_key) = log_schema().timestamp_key() {
52+
definition = definition.try_with_field(
53+
timestamp_key,
54+
// The protobuf decoder will try to insert a new `timestamp`-type value into the
55+
// "timestamp_key" field, but only if that field doesn't already exist.
56+
Kind::any().or_timestamp(),
57+
Some("timestamp"),
58+
);
59+
}
60+
definition
61+
}
62+
LogNamespace::Vector => {
63+
schema::Definition::new_with_default_metadata(Kind::any(), [log_namespace])
64+
}
65+
}
66+
}
67+
}
68+
69+
/// Deserializer that builds `Event`s from a byte frame containing protobuf.
70+
#[derive(Debug, Clone)]
71+
pub struct ProtobufDeserializer {
72+
message_descriptor: MessageDescriptor,
73+
}
74+
75+
impl ProtobufDeserializer {
76+
/// Creates a new `ProtobufDeserializer`.
77+
pub fn new(message_descriptor: MessageDescriptor) -> Self {
78+
Self { message_descriptor }
79+
}
80+
81+
fn get_message_descriptor(
82+
desc_file: &PathBuf,
83+
message_type: String,
84+
) -> vector_common::Result<MessageDescriptor> {
85+
let b = fs::read(desc_file)
86+
.map_err(|e| format!("Failed to open protobuf desc file '{desc_file:?}': {e}",))?;
87+
let pool = DescriptorPool::decode(b.as_slice())
88+
.map_err(|e| format!("Failed to parse protobuf desc file '{desc_file:?}': {e}"))?;
89+
Ok(pool.get_message_by_name(&message_type).unwrap_or_else(|| {
90+
panic!("The message type '{message_type}' could not be found in '{desc_file:?}'")
91+
}))
92+
}
93+
}
94+
95+
impl Deserializer for ProtobufDeserializer {
96+
fn parse(
97+
&self,
98+
bytes: Bytes,
99+
log_namespace: LogNamespace,
100+
) -> vector_common::Result<SmallVec<[Event; 1]>> {
101+
let dynamic_message = DynamicMessage::decode(self.message_descriptor.clone(), bytes)
102+
.map_err(|error| format!("Error parsing protobuf: {:?}", error))?;
103+
104+
let proto_vrl = to_vrl(&prost_reflect::Value::Message(dynamic_message), None)?;
105+
let mut event = Event::Log(LogEvent::from(proto_vrl));
106+
let event = match log_namespace {
107+
LogNamespace::Vector => event,
108+
LogNamespace::Legacy => {
109+
let timestamp = Utc::now();
110+
if let Some(timestamp_key) = log_schema().timestamp_key_target_path() {
111+
let log = event.as_mut_log();
112+
if !log.contains(timestamp_key) {
113+
log.insert(timestamp_key, timestamp);
114+
}
115+
}
116+
event
117+
}
118+
};
119+
120+
Ok(smallvec![event])
121+
}
122+
}
123+
124+
impl TryFrom<&ProtobufDeserializerConfig> for ProtobufDeserializer {
125+
type Error = vector_common::Error;
126+
fn try_from(config: &ProtobufDeserializerConfig) -> vector_common::Result<Self> {
127+
let message_descriptor = ProtobufDeserializer::get_message_descriptor(
128+
&config.desc_file,
129+
config.message_type.clone(),
130+
)?;
131+
Ok(Self::new(message_descriptor))
132+
}
133+
}
134+
135+
fn to_vrl(
136+
prost_reflect_value: &prost_reflect::Value,
137+
field_descriptor: Option<&prost_reflect::FieldDescriptor>,
138+
) -> vector_common::Result<vrl::value::Value> {
139+
let vrl_value = match prost_reflect_value {
140+
prost_reflect::Value::Bool(v) => vrl::value::Value::from(*v),
141+
prost_reflect::Value::I32(v) => vrl::value::Value::from(*v),
142+
prost_reflect::Value::I64(v) => vrl::value::Value::from(*v),
143+
prost_reflect::Value::U32(v) => vrl::value::Value::from(*v),
144+
prost_reflect::Value::U64(v) => vrl::value::Value::from(*v),
145+
prost_reflect::Value::F32(v) => vrl::value::Value::Float(
146+
NotNan::new(f64::from(*v)).map_err(|_e| "Float number cannot be Nan")?,
147+
),
148+
prost_reflect::Value::F64(v) => {
149+
vrl::value::Value::Float(NotNan::new(*v).map_err(|_e| "F64 number cannot be Nan")?)
150+
}
151+
prost_reflect::Value::String(v) => vrl::value::Value::from(v.as_str()),
152+
prost_reflect::Value::Bytes(v) => vrl::value::Value::from(v.clone()),
153+
prost_reflect::Value::EnumNumber(v) => {
154+
if let Some(field_descriptor) = field_descriptor {
155+
let kind = field_descriptor.kind();
156+
let enum_desc = kind.as_enum().ok_or_else(|| {
157+
format!(
158+
"Internal error while parsing protobuf enum. Field descriptor: {:?}",
159+
field_descriptor
160+
)
161+
})?;
162+
vrl::value::Value::from(
163+
enum_desc
164+
.get_value(*v)
165+
.ok_or_else(|| {
166+
format!("The number {} cannot be in '{}'", v, enum_desc.name())
167+
})?
168+
.name(),
169+
)
170+
} else {
171+
Err("Expected valid field descriptor")?
172+
}
173+
}
174+
prost_reflect::Value::Message(v) => {
175+
let mut obj_map = BTreeMap::new();
176+
for field_desc in v.descriptor().fields() {
177+
let field_value = v.get_field(&field_desc);
178+
let out = to_vrl(field_value.as_ref(), Some(&field_desc))?;
179+
obj_map.insert(field_desc.name().to_string(), out);
180+
}
181+
vrl::value::Value::from(obj_map)
182+
}
183+
prost_reflect::Value::List(v) => {
184+
let vec = v
185+
.iter()
186+
.map(|o| to_vrl(o, field_descriptor))
187+
.collect::<Result<Vec<_>, vector_common::Error>>()?;
188+
vrl::value::Value::from(vec)
189+
}
190+
prost_reflect::Value::Map(v) => {
191+
if let Some(field_descriptor) = field_descriptor {
192+
let kind = field_descriptor.kind();
193+
let message_desc = kind.as_message().ok_or_else(|| {
194+
format!(
195+
"Internal error while parsing protobuf field descriptor: {:?}",
196+
field_descriptor
197+
)
198+
})?;
199+
vrl::value::Value::from(
200+
v.iter()
201+
.map(|kv| {
202+
Ok((
203+
kv.0.as_str()
204+
.ok_or_else(|| {
205+
format!(
206+
"Internal error while parsing protobuf map. Field descriptor: {:?}",
207+
field_descriptor
208+
)
209+
})?
210+
.to_string(),
211+
to_vrl(kv.1, Some(&message_desc.map_entry_value_field()))?,
212+
))
213+
})
214+
.collect::<vector_common::Result<BTreeMap<String, _>>>()?,
215+
)
216+
} else {
217+
Err("Expected valid field descriptor")?
218+
}
219+
}
220+
};
221+
Ok(vrl_value)
222+
}
223+
224+
#[cfg(test)]
225+
mod tests {
226+
// TODO: add test for bad file path & invalid message_type
227+
228+
use std::path::PathBuf;
229+
use std::{env, fs};
230+
use vector_core::config::log_schema;
231+
232+
use super::*;
233+
234+
fn test_data_dir() -> PathBuf {
235+
PathBuf::from(env::var_os("CARGO_MANIFEST_DIR").unwrap())
236+
.join("tests/data/decoding/protobuf")
237+
}
238+
239+
fn parse_and_validate(
240+
protobuf_bin_message: String,
241+
protobuf_desc_path: PathBuf,
242+
message_type: &str,
243+
validate_log: fn(&LogEvent),
244+
) {
245+
let input = Bytes::from(protobuf_bin_message);
246+
let message_descriptor = ProtobufDeserializer::get_message_descriptor(
247+
&protobuf_desc_path,
248+
message_type.to_string(),
249+
)
250+
.unwrap();
251+
let deserializer = ProtobufDeserializer::new(message_descriptor);
252+
253+
for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
254+
let events = deserializer.parse(input.clone(), namespace).unwrap();
255+
let mut events = events.into_iter();
256+
257+
{
258+
let event = events.next().unwrap();
259+
let log = event.as_log();
260+
validate_log(log);
261+
assert_eq!(
262+
log.get(log_schema().timestamp_key_target_path().unwrap())
263+
.is_some(),
264+
namespace == LogNamespace::Legacy
265+
);
266+
}
267+
268+
assert_eq!(events.next(), None);
269+
}
270+
}
271+
272+
#[test]
273+
fn deserialize_protobuf() {
274+
let protobuf_bin_message_path = test_data_dir().join("person_someone.pb");
275+
let protobuf_desc_path = test_data_dir().join("test_protobuf.desc");
276+
let message_type = "test_protobuf.Person";
277+
let validate_log = |log: &LogEvent| {
278+
assert_eq!(log["name"], "someone".into());
279+
assert_eq!(
280+
log["phones"].as_array().unwrap()[0].as_object().unwrap()["number"]
281+
.as_str()
282+
.unwrap(),
283+
"123456"
284+
);
285+
};
286+
287+
parse_and_validate(
288+
fs::read_to_string(protobuf_bin_message_path).unwrap(),
289+
protobuf_desc_path,
290+
message_type,
291+
validate_log,
292+
);
293+
}
294+
295+
#[test]
296+
fn deserialize_protobuf3() {
297+
let protobuf_bin_message_path = test_data_dir().join("person_someone3.pb");
298+
let protobuf_desc_path = test_data_dir().join("test_protobuf3.desc");
299+
let message_type = "test_protobuf3.Person";
300+
let validate_log = |log: &LogEvent| {
301+
assert_eq!(log["name"], "someone".into());
302+
assert_eq!(
303+
log["phones"].as_array().unwrap()[0].as_object().unwrap()["number"]
304+
.as_str()
305+
.unwrap(),
306+
"1234"
307+
);
308+
assert_eq!(
309+
log["data"].as_object().unwrap()["data_phone"],
310+
"HOME".into()
311+
);
312+
};
313+
314+
parse_and_validate(
315+
fs::read_to_string(protobuf_bin_message_path).unwrap(),
316+
protobuf_desc_path,
317+
message_type,
318+
validate_log,
319+
);
320+
}
321+
322+
#[test]
323+
fn deserialize_empty_buffer() {
324+
let protobuf_bin_message = "".to_string();
325+
let protobuf_desc_path = test_data_dir().join("test_protobuf.desc");
326+
let message_type = "test_protobuf.Person";
327+
let validate_log = |log: &LogEvent| {
328+
assert_eq!(log["name"], "".into());
329+
};
330+
331+
parse_and_validate(
332+
protobuf_bin_message,
333+
protobuf_desc_path,
334+
message_type,
335+
validate_log,
336+
);
337+
}
338+
339+
#[test]
340+
fn deserialize_error_invalid_protobuf() {
341+
let input = Bytes::from("{ foo");
342+
let message_descriptor = ProtobufDeserializer::get_message_descriptor(
343+
&test_data_dir().join("test_protobuf.desc"),
344+
"test_protobuf.Person".to_string(),
345+
)
346+
.unwrap();
347+
let deserializer = ProtobufDeserializer::new(message_descriptor);
348+
349+
for namespace in [LogNamespace::Legacy, LogNamespace::Vector] {
350+
assert!(deserializer.parse(input.clone(), namespace).is_err());
351+
}
352+
}
353+
}

0 commit comments

Comments
 (0)