1
1
use crate :: encoding:: BuildError ;
2
2
use bytes:: BytesMut ;
3
3
use chrono:: SecondsFormat ;
4
+ use csv_core:: { WriteResult , Writer , WriterBuilder } ;
4
5
use lookup:: lookup_v2:: ConfigTargetPath ;
5
6
use tokio_util:: codec:: Encoder ;
6
7
use vector_core:: {
@@ -180,7 +181,7 @@ impl CsvSerializerOptions {
180
181
pub struct CsvSerializer {
181
182
// Box because of clippy error: 'large size difference between variants'
182
183
// in SerializerConfig enum
183
- writer : Box < csv_core :: Writer > ,
184
+ writer : Box < Writer > ,
184
185
fields : Vec < ConfigTargetPath > ,
185
186
internal_buffer : Vec < u8 > ,
186
187
}
@@ -190,7 +191,7 @@ impl CsvSerializer {
190
191
pub fn new ( config : CsvSerializerConfig ) -> Self {
191
192
// 'flexible' is not needed since every event is a single context free csv line
192
193
let writer = Box :: new (
193
- csv_core :: WriterBuilder :: new ( )
194
+ WriterBuilder :: new ( )
194
195
. delimiter ( config. csv . delimiter )
195
196
. double_quote ( config. csv . double_quote )
196
197
. escape ( config. csv . escape )
@@ -213,89 +214,113 @@ impl CsvSerializer {
213
214
}
214
215
}
215
216
217
+ fn write_field (
218
+ writer : & mut Writer ,
219
+ mut field_value : & [ u8 ] ,
220
+ internal_buffer : & mut Vec < u8 > ,
221
+ buffer : & mut BytesMut ,
222
+ mut used_buffer_bytes : usize ,
223
+ ) -> usize {
224
+ loop {
225
+ let ( res, bytes_read, bytes_written) =
226
+ writer. field ( field_value, & mut internal_buffer[ used_buffer_bytes..] ) ;
227
+
228
+ field_value = & field_value[ bytes_read..] ;
229
+ used_buffer_bytes += bytes_written;
230
+
231
+ match res {
232
+ WriteResult :: InputEmpty => return used_buffer_bytes,
233
+ WriteResult :: OutputFull => {
234
+ buffer. extend_from_slice ( & internal_buffer[ ..used_buffer_bytes] ) ;
235
+ used_buffer_bytes = 0 ;
236
+ }
237
+ }
238
+ }
239
+ }
240
+
241
+ fn finish_field (
242
+ writer : & mut Writer ,
243
+ finish_func : fn ( & mut Writer , & mut [ u8 ] ) -> ( WriteResult , usize ) ,
244
+ internal_buffer : & mut Vec < u8 > ,
245
+ buffer : & mut BytesMut ,
246
+ mut used_buffer_bytes : usize ,
247
+ ) -> usize {
248
+ loop {
249
+ let ( res, bytes_written) = finish_func ( writer, & mut internal_buffer[ used_buffer_bytes..] ) ;
250
+ used_buffer_bytes += bytes_written;
251
+ match res {
252
+ WriteResult :: InputEmpty => return used_buffer_bytes,
253
+ WriteResult :: OutputFull => {
254
+ buffer. extend_from_slice ( & internal_buffer[ ..used_buffer_bytes] ) ;
255
+ used_buffer_bytes = 0 ;
256
+ }
257
+ }
258
+ }
259
+ }
260
+
261
+ fn write_delimiter ( writer : & mut Writer , field : & mut [ u8 ] ) -> ( WriteResult , usize ) {
262
+ writer. delimiter ( field)
263
+ }
264
+
265
+ fn finish_event ( writer : & mut Writer , field : & mut [ u8 ] ) -> ( WriteResult , usize ) {
266
+ writer. finish ( field)
267
+ }
268
+
269
+ fn field_to_csv_string ( field_value : Option < & Value > ) -> String {
270
+ match field_value {
271
+ Some ( Value :: Bytes ( bytes) ) => String :: from_utf8_lossy ( bytes) . into_owned ( ) ,
272
+ Some ( Value :: Integer ( int) ) => int. to_string ( ) ,
273
+ Some ( Value :: Float ( float) ) => float. to_string ( ) ,
274
+ Some ( Value :: Boolean ( bool) ) => bool. to_string ( ) ,
275
+ Some ( Value :: Timestamp ( timestamp) ) => timestamp. to_rfc3339_opts ( SecondsFormat :: AutoSi , true ) ,
276
+ Some ( Value :: Null ) => String :: new ( ) ,
277
+ // Other value types: Array, Regex, Object are not supported by the CSV format.
278
+ Some ( _) => String :: new ( ) ,
279
+ None => String :: new ( ) ,
280
+ }
281
+ }
282
+
216
283
impl Encoder < Event > for CsvSerializer {
217
284
type Error = vector_common:: Error ;
218
285
219
- fn encode ( & mut self , event : Event , buffer : & mut BytesMut ) -> Result < ( ) , Self :: Error > {
286
+ fn encode ( & mut self , event : Event , upstream_buffer : & mut BytesMut ) -> Result < ( ) , Self :: Error > {
220
287
let log = event. into_log ( ) ;
221
288
222
289
let mut used_buffer_bytes = 0 ;
223
290
for ( fields_written, field) in self . fields . iter ( ) . enumerate ( ) {
224
291
let field_value = log. get ( field) ;
225
292
226
- // write field delimiter
227
293
if fields_written > 0 {
228
- loop {
229
- let ( res, bytes_written) = self
230
- . writer
231
- . delimiter ( & mut self . internal_buffer [ used_buffer_bytes..] ) ;
232
- used_buffer_bytes += bytes_written;
233
- match res {
234
- csv_core:: WriteResult :: InputEmpty => {
235
- break ;
236
- }
237
- csv_core:: WriteResult :: OutputFull => {
238
- buffer. extend_from_slice ( & self . internal_buffer [ ..used_buffer_bytes] ) ;
239
- used_buffer_bytes = 0 ;
240
- }
241
- }
242
- }
294
+ used_buffer_bytes = finish_field (
295
+ & mut self . writer ,
296
+ write_delimiter,
297
+ & mut self . internal_buffer ,
298
+ upstream_buffer,
299
+ used_buffer_bytes,
300
+ ) ;
243
301
}
244
302
245
- // get string value of current field
246
- let field_value = match field_value {
247
- Some ( Value :: Bytes ( bytes) ) => String :: from_utf8_lossy ( bytes) . into_owned ( ) ,
248
- Some ( Value :: Integer ( int) ) => int. to_string ( ) ,
249
- Some ( Value :: Float ( float) ) => float. to_string ( ) ,
250
- Some ( Value :: Boolean ( bool) ) => bool. to_string ( ) ,
251
- Some ( Value :: Timestamp ( timestamp) ) => {
252
- timestamp. to_rfc3339_opts ( SecondsFormat :: AutoSi , true )
253
- }
254
- Some ( Value :: Null ) => String :: new ( ) ,
255
- // Other value types: Array, Regex, Object are not supported by the CSV format.
256
- Some ( _) => String :: new ( ) ,
257
- None => String :: new ( ) ,
258
- } ;
259
-
260
- // mutable byte_slice so it can be written in chunks if internal_buffer fills up
261
- let mut field_value = field_value. as_bytes ( ) ;
262
- // write field_value to internal buffer
263
- loop {
264
- let ( res, bytes_read, bytes_written) = self
265
- . writer
266
- . field ( field_value, & mut self . internal_buffer [ used_buffer_bytes..] ) ;
267
-
268
- field_value = & field_value[ bytes_read..] ;
269
- used_buffer_bytes += bytes_written;
270
-
271
- match res {
272
- csv_core:: WriteResult :: InputEmpty => break ,
273
- csv_core:: WriteResult :: OutputFull => {
274
- buffer. extend_from_slice ( & self . internal_buffer [ ..used_buffer_bytes] ) ;
275
- used_buffer_bytes = 0 ;
276
- }
277
- }
278
- }
303
+ let field_data = field_to_csv_string ( field_value) ;
304
+ used_buffer_bytes = write_field (
305
+ & mut self . writer ,
306
+ & field_data. as_bytes ( ) ,
307
+ & mut self . internal_buffer ,
308
+ upstream_buffer,
309
+ used_buffer_bytes,
310
+ ) ;
279
311
}
280
312
281
- // finish current event (potentially add closing quotes)
282
- loop {
283
- let ( res, bytes_written) = self
284
- . writer
285
- . finish ( & mut self . internal_buffer [ used_buffer_bytes..] ) ;
286
- used_buffer_bytes += bytes_written;
287
- match res {
288
- csv_core:: WriteResult :: InputEmpty => break ,
289
- csv_core:: WriteResult :: OutputFull => {
290
- buffer. extend_from_slice ( & self . internal_buffer [ ..used_buffer_bytes] ) ;
291
- used_buffer_bytes = 0 ;
292
- }
293
- }
294
- }
313
+ used_buffer_bytes = finish_field (
314
+ & mut self . writer ,
315
+ finish_event,
316
+ & mut self . internal_buffer ,
317
+ upstream_buffer,
318
+ used_buffer_bytes,
319
+ ) ;
295
320
296
321
// final flush of internal_buffer
297
322
if used_buffer_bytes > 0 {
298
- buffer . extend_from_slice ( & self . internal_buffer [ ..used_buffer_bytes] ) ;
323
+ upstream_buffer . extend_from_slice ( & self . internal_buffer [ ..used_buffer_bytes] ) ;
299
324
}
300
325
301
326
Ok ( ( ) )
@@ -558,7 +583,7 @@ mod tests {
558
583
559
584
#[ test]
560
585
fn multiple_events ( ) {
561
- let ( fields, event1) = make_event_with_fields ( vec ! [ ( "field1" , "foo\" " ) ] ) ;
586
+ let ( fields, event1) = make_event_with_fields ( vec ! [ ( "field1" , "foo, " ) ] ) ;
562
587
let ( _, event2) = make_event_with_fields ( vec ! [ ( "field1" , "\" bar" ) ] ) ;
563
588
let opts = CsvSerializerOptions {
564
589
fields,
@@ -571,6 +596,6 @@ mod tests {
571
596
serializer. encode ( event1, & mut bytes) . unwrap ( ) ;
572
597
serializer. encode ( event2, & mut bytes) . unwrap ( ) ;
573
598
574
- assert_eq ! ( bytes. freeze( ) , b"\" foo\" \" \" \" \" bar \" " . as_slice( ) ) ;
599
+ assert_eq ! ( bytes. freeze( ) , b"\" foo, \" \" \n bar \" " . as_slice( ) ) ;
575
600
}
576
601
}
0 commit comments