1
1
use std:: collections:: BTreeMap ;
2
2
use std:: fs;
3
+ use std:: path:: PathBuf ;
3
4
4
5
use bytes:: Bytes ;
5
6
use chrono:: Utc ;
6
- use lookup:: PathPrefix ;
7
7
use ordered_float:: NotNan ;
8
8
use prost_reflect:: { DescriptorPool , DynamicMessage , MessageDescriptor , ReflectMessage } ;
9
9
use smallvec:: { smallvec, SmallVec } ;
@@ -23,7 +23,7 @@ use super::Deserializer;
23
23
#[ derive( Debug , Clone , Default ) ]
24
24
pub struct ProtobufDeserializerConfig {
25
25
/// Path to desc file
26
- desc_file : String ,
26
+ desc_file : PathBuf ,
27
27
28
28
/// message type. e.g package.message
29
29
message_type : String ,
@@ -32,6 +32,7 @@ pub struct ProtobufDeserializerConfig {
32
32
impl ProtobufDeserializerConfig {
33
33
/// Build the `ProtobufDeserializer` from this configuration.
34
34
pub fn build ( & self ) -> ProtobufDeserializer {
35
+ // TODO return a Result instead.
35
36
ProtobufDeserializer :: try_from ( self ) . unwrap ( )
36
37
}
37
38
@@ -65,13 +66,6 @@ impl ProtobufDeserializerConfig {
65
66
}
66
67
}
67
68
68
- impl ProtobufDeserializerConfig {
69
- /// Creates a new `ProtobufDeserializerConfig`.
70
- pub fn new ( ) -> Self {
71
- Default :: default ( )
72
- }
73
- }
74
-
75
69
/// Deserializer that builds `Event`s from a byte frame containing protobuf.
76
70
#[ derive( Debug , Clone ) ]
77
71
pub struct ProtobufDeserializer {
@@ -85,15 +79,15 @@ impl ProtobufDeserializer {
85
79
}
86
80
87
81
fn get_message_descriptor (
88
- desc_file : String ,
82
+ desc_file : & PathBuf ,
89
83
message_type : String ,
90
84
) -> vector_common:: Result < MessageDescriptor > {
91
- let b = fs:: read ( desc_file. clone ( ) )
92
- . map_err ( |e| format ! ( "Failed to open protobuf desc file '{desc_file}': {e}" ) ) ?;
85
+ let b = fs:: read ( desc_file)
86
+ . map_err ( |e| format ! ( "Failed to open protobuf desc file '{desc_file:? }': {e}" , ) ) ?;
93
87
let pool = DescriptorPool :: decode ( b. as_slice ( ) )
94
- . map_err ( |e| format ! ( "Failed to parse protobuf desc file '{desc_file}': {e}" ) ) ?;
88
+ . map_err ( |e| format ! ( "Failed to parse protobuf desc file '{desc_file:? }': {e}" ) ) ?;
95
89
Ok ( pool. get_message_by_name ( & message_type) . unwrap_or_else ( || {
96
- panic ! ( "The message type '{message_type}' could not be found in '{desc_file}'" )
90
+ panic ! ( "The message type '{message_type}' could not be found in '{desc_file:? }'" )
97
91
} ) )
98
92
}
99
93
}
@@ -113,14 +107,12 @@ impl Deserializer for ProtobufDeserializer {
113
107
LogNamespace :: Vector => event,
114
108
LogNamespace :: Legacy => {
115
109
let timestamp = Utc :: now ( ) ;
116
-
117
- if let Some ( timestamp_key) = log_schema ( ) . timestamp_key ( ) {
110
+ if let Some ( timestamp_key) = log_schema ( ) . timestamp_key_target_path ( ) {
118
111
let log = event. as_mut_log ( ) ;
119
- if !log. contains ( ( PathPrefix :: Event , timestamp_key) ) {
120
- log. insert ( ( PathPrefix :: Event , timestamp_key) , timestamp) ;
112
+ if !log. contains ( timestamp_key) {
113
+ log. insert ( timestamp_key, timestamp) ;
121
114
}
122
115
}
123
-
124
116
event
125
117
}
126
118
} ;
@@ -133,7 +125,7 @@ impl TryFrom<&ProtobufDeserializerConfig> for ProtobufDeserializer {
133
125
type Error = vector_common:: Error ;
134
126
fn try_from ( config : & ProtobufDeserializerConfig ) -> vector_common:: Result < Self > {
135
127
let message_descriptor = ProtobufDeserializer :: get_message_descriptor (
136
- config. desc_file . clone ( ) ,
128
+ & config. desc_file ,
137
129
config. message_type . clone ( ) ,
138
130
) ?;
139
131
Ok ( Self :: new ( message_descriptor) )
@@ -252,7 +244,7 @@ mod tests {
252
244
) {
253
245
let input = Bytes :: from ( protobuf_bin_message) ;
254
246
let message_descriptor = ProtobufDeserializer :: get_message_descriptor (
255
- protobuf_desc_path. to_str ( ) . unwrap ( ) . to_string ( ) ,
247
+ & protobuf_desc_path,
256
248
message_type. to_string ( ) ,
257
249
)
258
250
. unwrap ( ) ;
@@ -348,11 +340,7 @@ mod tests {
348
340
fn deserialize_error_invalid_protobuf ( ) {
349
341
let input = Bytes :: from ( "{ foo" ) ;
350
342
let message_descriptor = ProtobufDeserializer :: get_message_descriptor (
351
- test_data_dir ( )
352
- . join ( "test_protobuf.desc" )
353
- . to_str ( )
354
- . unwrap ( )
355
- . to_string ( ) ,
343
+ & test_data_dir ( ) . join ( "test_protobuf.desc" ) ,
356
344
"test_protobuf.Person" . to_string ( ) ,
357
345
)
358
346
. unwrap ( ) ;
0 commit comments