@@ -16,6 +16,7 @@ use crate::{
16
16
http :: HttpClient ,
17
17
internal_events :: SinkRequestBuildError ,
18
18
};
19
+ use vector_core :: config :: telemetry;
19
20
use bytes :: Bytes ;
20
21
```
21
22
@@ -81,12 +82,12 @@ struct BasicEncoder;
81
82
The Encoder must implement the [ ` Encoder ` ] [ encoder ] trait:
82
83
83
84
``` rust
84
- impl Encoder <Event > for BasicEncoder {
85
+ impl encoding :: Encoder <Event > for BasicEncoder {
85
86
fn encode_input (
86
87
& self ,
87
88
input : Event ,
88
89
writer : & mut dyn std :: io :: Write ,
89
- ) -> std :: io :: Result <usize > {
90
+ ) -> std :: io :: Result <( usize , GroupedCountByteSize ) > {
90
91
}
91
92
}
92
93
```
@@ -98,16 +99,25 @@ sending batches of events, or they may send a completely different type if each
98
99
event is processed in some way prior to encoding.
99
100
100
101
[ ` encode_input ` ] [ encoder_encode_input ] serializes the event to a String and
101
- writes these bytes:
102
+ writes these bytes. The function also creates a [ ` GroupedCountByteSize ` ]
103
+ [ grouped_count_byte_size] object. This object tracks the size of the event
104
+ that is sent by the sink, optionally grouped by the source and service that
105
+ originated the event if Vector has been configured to do so. It is necessary to
106
+ calculate the sizes in this function since the encode function sometimes drops
107
+ fields from the event prior to encoding. We need the size to be calculated after
108
+ these fields have been dropped.
102
109
103
110
``` rust
104
111
fn encode_input (
105
112
& self ,
106
113
input : Event ,
107
114
writer : & mut dyn std :: io :: Write ,
108
- ) -> std :: io :: Result <usize > {
115
+ ) -> std :: io :: Result <(usize , GroupedCountByteSize )> {
116
+ let mut byte_size = telemetry (). create_request_count_byte_size ();
117
+ byte_size . add_event (& input , input . estimated_json_encoded_size_of ());
118
+
109
119
let event = serde_json :: to_string (& input ). unwrap ();
110
- write_all (writer , 1 , event . as_bytes ()). map (| ()| event . len ())
120
+ write_all (writer , 1 , event . as_bytes ()). map (| ()| ( event . len (), byte_size ))
111
121
}
112
122
```
113
123
@@ -152,8 +162,12 @@ We need to implement a number of traits for the request to access these fields:
152
162
153
163
``` rust
154
164
impl MetaDescriptive for BasicRequest {
155
- fn get_metadata (& self ) -> RequestMetadata {
156
- self . metadata
165
+ fn get_metadata (& self ) -> & RequestMetadata {
166
+ & self . metadata
167
+ }
168
+
169
+ fn metadata_mut (& mut self ) -> & mut RequestMetadata {
170
+ & mut self . metadata
157
171
}
158
172
}
159
173
@@ -249,7 +263,7 @@ when sending the event to an `amqp` server.
249
263
mut input : Event ,
250
264
) -> (Self :: Metadata , RequestMetadataBuilder , Self :: Events ) {
251
265
let finalizers = input . take_finalizers ();
252
- let metadata_builder = RequestMetadataBuilder :: from_events (& input );
266
+ let metadata_builder = RequestMetadataBuilder :: from_event (& input );
253
267
(finalizers , metadata_builder , input )
254
268
}
255
269
```
@@ -338,7 +352,12 @@ that will be invoked to send the actual data.
338
352
match client . call (req ). await {
339
353
Ok (response ) => {
340
354
if response . status (). is_success () {
341
- Ok (BasicResponse { byte_size })
355
+ Ok (BasicResponse {
356
+ byte_size ,
357
+ json_size : request
358
+ . metadata
359
+ . into_events_estimated_json_encoded_byte_size (),
360
+ })
342
361
} else {
343
362
Err (" received error response" )
344
363
}
@@ -359,18 +378,21 @@ The return from our service must be an object that implements the
359
378
``` rust
360
379
struct BasicResponse {
361
380
byte_size : usize ,
381
+ json_size : GroupedCountByteSize ,
362
382
}
363
383
364
384
impl DriverResponse for BasicResponse {
365
385
fn event_status (& self ) -> EventStatus {
366
386
EventStatus :: Delivered
367
387
}
368
388
369
- fn events_sent (& self ) -> RequestCountByteSize {
370
- // (events count, byte size)
371
- CountByteSize (1 , self . byte_size). into ()
389
+ fn events_sent (& self ) -> & GroupedCountByteSize {
390
+ & self . json_size
372
391
}
373
- }
392
+
393
+ fn bytes_sent (& self ) -> Option <usize > {
394
+ Some (self . byte_size)
395
+ }}
374
396
```
375
397
376
398
Vector calls the methods in this trait to determine if the event was delivered successfully.
@@ -492,3 +514,4 @@ BODY:
492
514
[ sinkbuilder_ext_into_driver ] : https://rust-doc.vector.dev/vector/sinks/util/builder/trait.sinkbuilderext#method.into_driver
493
515
[ stream_filter_map ] : https://docs.rs/futures/latest/futures/stream/trait.StreamExt.html#method.filter_map
494
516
[ driver ] : https://rust-doc.vector.dev/vector_core/stream/struct.driver
517
+ [ grouped_count_byte_size ] : https://rust-doc.vector.dev/vector_common/request_metadata/enum.groupedcountbytesize
0 commit comments