Skip to content

Commit ee480cd

Browse files
chore: Dropped error field from StreamClosed Error (#17693)
Field has been unused, so I've removed it and cleaned up the usages and conditionals where it was included previously. `send_batch()` appears to only return `ClosedError` making some additional matching pointless. Signed-off-by: Spencer Gilbert <[email protected]>
1 parent 59e2cbf commit ee480cd

File tree

37 files changed

+74
-84
lines changed

37 files changed

+74
-84
lines changed

src/internal_events/common.rs

-1
Original file line numberDiff line numberDiff line change
@@ -83,7 +83,6 @@ const STREAM_CLOSED: &str = "stream_closed";
8383

8484
#[derive(Debug)]
8585
pub struct StreamClosedError {
86-
pub error: crate::source_sender::ClosedError,
8786
pub count: usize,
8887
}
8988

src/sources/amqp.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -392,17 +392,17 @@ async fn finalize_event_stream(
392392
let mut stream = stream.map(|event| event.with_batch_notifier(&batch));
393393

394394
match out.send_event_stream(&mut stream).await {
395-
Err(error) => {
396-
emit!(StreamClosedError { error, count: 1 });
395+
Err(_) => {
396+
emit!(StreamClosedError { count: 1 });
397397
}
398398
Ok(_) => {
399399
finalizer.add(msg.into(), receiver);
400400
}
401401
}
402402
}
403403
None => match out.send_event_stream(&mut stream).await {
404-
Err(error) => {
405-
emit!(StreamClosedError { error, count: 1 });
404+
Err(_) => {
405+
emit!(StreamClosedError { count: 1 });
406406
}
407407
Ok(_) => {
408408
let ack_options = lapin::options::BasicAckOptions::default();

src/sources/apache_metrics/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -273,9 +273,9 @@ fn apache_metrics(
273273
debug!("Finished sending.");
274274
Ok(())
275275
}
276-
Err(error) => {
276+
Err(_) => {
277277
let (count, _) = stream.size_hint();
278-
emit!(StreamClosedError { error, count });
278+
emit!(StreamClosedError { count });
279279
Err(())
280280
}
281281
}

src/sources/aws_ecs_metrics/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -207,8 +207,8 @@ async fn aws_ecs_metrics(
207207
endpoint: uri.path(),
208208
});
209209

210-
if let Err(error) = out.send_batch(metrics).await {
211-
emit!(StreamClosedError { error, count });
210+
if (out.send_batch(metrics).await).is_err() {
211+
emit!(StreamClosedError { count });
212212
return Err(());
213213
}
214214
}

src/sources/aws_kinesis_firehose/handlers.rs

+1-4
Original file line numberDiff line numberDiff line change
@@ -145,10 +145,7 @@ pub(super) async fn firehose(
145145

146146
let count = events.len();
147147
if let Err(error) = context.out.send_batch(events).await {
148-
emit!(StreamClosedError {
149-
error: error.clone(),
150-
count,
151-
});
148+
emit!(StreamClosedError { count });
152149
let error = RequestError::ShuttingDown {
153150
request_id: request_id.clone(),
154151
source: error,

src/sources/aws_s3/sqs.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -574,9 +574,9 @@ impl IngestorProcess {
574574

575575
let send_error = match self.out.send_event_stream(&mut stream).await {
576576
Ok(_) => None,
577-
Err(error) => {
577+
Err(_) => {
578578
let (count, _) = stream.size_hint();
579-
emit!(StreamClosedError { error, count });
579+
emit!(StreamClosedError { count });
580580
Some(crate::source_sender::ClosedError)
581581
}
582582
};

src/sources/aws_sqs/source.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -180,7 +180,7 @@ impl SqsSource {
180180
}
181181
}
182182
}
183-
Err(error) => emit!(StreamClosedError { error, count }),
183+
Err(_) => emit!(StreamClosedError { count }),
184184
}
185185
}
186186
}

src/sources/datadog_agent/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -443,8 +443,8 @@ pub(crate) async fn handle_request(
443443
} else {
444444
out.send_batch(events).await
445445
}
446-
.map_err(move |error: crate::source_sender::ClosedError| {
447-
emit!(StreamClosedError { error, count });
446+
.map_err(|_| {
447+
emit!(StreamClosedError { count });
448448
warp::reject::custom(ApiError::ServerShutdown)
449449
})?;
450450
match receiver {

src/sources/demo_logs.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -264,8 +264,8 @@ async fn demo_logs_source(
264264

265265
event
266266
});
267-
out.send_batch(events).await.map_err(|error| {
268-
emit!(StreamClosedError { error, count });
267+
out.send_batch(events).await.map_err(|_| {
268+
emit!(StreamClosedError { count });
269269
})?;
270270
}
271271
Err(error) => {

src/sources/docker_logs/mod.rs

+4-7
Original file line numberDiff line numberDiff line change
@@ -814,13 +814,10 @@ impl EventStreamBuilder {
814814
let result = {
815815
let mut stream = events_stream
816816
.map(move |event| add_hostname(event, &host_key, &hostname, self.log_namespace));
817-
self.out
818-
.send_event_stream(&mut stream)
819-
.await
820-
.map_err(|error| {
821-
let (count, _) = stream.size_hint();
822-
emit!(StreamClosedError { error, count });
823-
})
817+
self.out.send_event_stream(&mut stream).await.map_err(|_| {
818+
let (count, _) = stream.size_hint();
819+
emit!(StreamClosedError { count });
820+
})
824821
};
825822

826823
// End of stream

src/sources/eventstoredb_metrics/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -137,8 +137,8 @@ fn eventstoredb(
137137

138138
events_received.emit(CountByteSize(count, byte_size));
139139

140-
if let Err(error) = cx.out.send_batch(metrics).await {
141-
emit!(StreamClosedError { count, error });
140+
if (cx.out.send_batch(metrics).await).is_err() {
141+
emit!(StreamClosedError { count });
142142
break;
143143
}
144144
}

src/sources/exec/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -498,8 +498,8 @@ async fn run_command(
498498
for event in &mut events {
499499
handle_event(&config, &hostname, &Some(stream.to_string()), pid, event, log_namespace);
500500
}
501-
if let Err(error) = out.send_batch(events).await {
502-
emit!(StreamClosedError { count, error });
501+
if (out.send_batch(events).await).is_err() {
502+
emit!(StreamClosedError { count });
503503
break;
504504
}
505505
},

src/sources/file.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -659,9 +659,9 @@ pub fn file_source(
659659
Ok(()) => {
660660
debug!("Finished sending.");
661661
}
662-
Err(error) => {
662+
Err(_) => {
663663
let (count, _) = messages.size_hint();
664-
emit!(StreamClosedError { error, count });
664+
emit!(StreamClosedError { count });
665665
}
666666
}
667667
});

src/sources/file_descriptors/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -195,9 +195,9 @@ async fn process_stream(
195195
debug!("Finished sending.");
196196
Ok(())
197197
}
198-
Err(error) => {
198+
Err(_) => {
199199
let (count, _) = stream.size_hint();
200-
emit!(StreamClosedError { error, count });
200+
emit!(StreamClosedError { count });
201201
Err(())
202202
}
203203
}

src/sources/gcp_pubsub.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -618,7 +618,7 @@ impl PubsubSource {
618618

619619
let count = events.len();
620620
match self.out.send_batch(events).await {
621-
Err(error) => emit!(StreamClosedError { error, count }),
621+
Err(_) => emit!(StreamClosedError { count }),
622622
Ok(()) => match notifier {
623623
None => ack_ids
624624
.send(ids)

src/sources/host_metrics/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -296,8 +296,8 @@ impl HostMetricsConfig {
296296
bytes_received.emit(ByteSize(0));
297297
let metrics = generator.capture_metrics().await;
298298
let count = metrics.len();
299-
if let Err(error) = out.send_batch(metrics).await {
300-
emit!(StreamClosedError { count, error });
299+
if (out.send_batch(metrics).await).is_err() {
300+
emit!(StreamClosedError { count });
301301
return Err(());
302302
}
303303
}

src/sources/internal_logs.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -191,9 +191,9 @@ async fn run(
191191
Utc::now(),
192192
);
193193

194-
if let Err(error) = out.send_event(Event::from(log)).await {
194+
if (out.send_event(Event::from(log)).await).is_err() {
195195
// this wont trigger any infinite loop considering it stops the component
196-
emit!(StreamClosedError { error, count: 1 });
196+
emit!(StreamClosedError { count: 1 });
197197
return Err(());
198198
}
199199
}

src/sources/internal_metrics.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -190,8 +190,8 @@ impl<'a> InternalMetrics<'a> {
190190
metric
191191
});
192192

193-
if let Err(error) = self.out.send_batch(batch).await {
194-
emit!(StreamClosedError { error, count });
193+
if (self.out.send_batch(batch).await).is_err() {
194+
emit!(StreamClosedError { count });
195195
return Err(());
196196
}
197197
}

src/sources/journald.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -606,8 +606,8 @@ impl<'a> Batch<'a> {
606606
finalizer.finalize(cursor, self.receiver).await;
607607
}
608608
}
609-
Err(error) => {
610-
emit!(StreamClosedError { error, count });
609+
Err(_) => {
610+
emit!(StreamClosedError { count });
611611
// `out` channel is closed, don't restart journalctl.
612612
self.exiting = Some(false);
613613
}

src/sources/kafka.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -449,8 +449,8 @@ async fn parse_message(
449449
let (batch, receiver) = BatchNotifier::new_with_receiver();
450450
let mut stream = stream.map(|event| event.with_batch_notifier(&batch));
451451
match out.send_event_stream(&mut stream).await {
452-
Err(error) => {
453-
emit!(StreamClosedError { error, count });
452+
Err(_) => {
453+
emit!(StreamClosedError { count });
454454
}
455455
Ok(_) => {
456456
// Drop stream to avoid borrowing `msg`: "[...] borrow might be used
@@ -461,8 +461,8 @@ async fn parse_message(
461461
}
462462
}
463463
None => match out.send_event_stream(&mut stream).await {
464-
Err(error) => {
465-
emit!(StreamClosedError { error, count });
464+
Err(_) => {
465+
emit!(StreamClosedError { count });
466466
}
467467
Ok(_) => {
468468
if let Err(error) =

src/sources/kubernetes_logs/mod.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -871,8 +871,7 @@ impl Source {
871871
.map(|result| {
872872
match result {
873873
Ok(Ok(())) => info!(message = "Event processing loop completed gracefully."),
874-
Ok(Err(error)) => emit!(StreamClosedError {
875-
error,
874+
Ok(Err(_)) => emit!(StreamClosedError {
876875
count: events_count
877876
}),
878877
Err(error) => emit!(KubernetesLifecycleError {

src/sources/mongodb_metrics/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -147,8 +147,8 @@ impl SourceConfig for MongoDbMetricsConfig {
147147

148148
let metrics = metrics.into_iter().flatten();
149149

150-
if let Err(error) = cx.out.send_batch(metrics).await {
151-
emit!(StreamClosedError { error, count });
150+
if (cx.out.send_batch(metrics).await).is_err() {
151+
emit!(StreamClosedError { count });
152152
return Err(());
153153
}
154154
}

src/sources/nats.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -238,8 +238,8 @@ async fn nats_source(
238238
event
239239
});
240240

241-
out.send_batch(events).await.map_err(|error| {
242-
emit!(StreamClosedError { error, count });
241+
out.send_batch(events).await.map_err(|_| {
242+
emit!(StreamClosedError { count });
243243
})?;
244244
}
245245
Err(error) => {

src/sources/nginx_metrics/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -135,8 +135,8 @@ impl SourceConfig for NginxMetricsConfig {
135135

136136
let metrics = metrics.into_iter().flatten();
137137

138-
if let Err(error) = cx.out.send_batch(metrics).await {
139-
emit!(StreamClosedError { error, count });
138+
if (cx.out.send_batch(metrics).await).is_err() {
139+
emit!(StreamClosedError { count });
140140
return Err(());
141141
}
142142
}

src/sources/opentelemetry/grpc.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -48,7 +48,7 @@ impl LogsService for Service {
4848
.send_batch_named(LOGS, events)
4949
.map_err(|error| {
5050
let message = error.to_string();
51-
emit!(StreamClosedError { error, count });
51+
emit!(StreamClosedError { count });
5252
Status::unavailable(message)
5353
})
5454
.and_then(|_| handle_batch_status(receiver))

src/sources/opentelemetry/http.rs

+4-6
Original file line numberDiff line numberDiff line change
@@ -122,12 +122,10 @@ async fn handle_request(
122122
let receiver = BatchNotifier::maybe_apply_to(acknowledgements, &mut events);
123123
let count = events.len();
124124

125-
out.send_batch_named(output, events)
126-
.await
127-
.map_err(move |error| {
128-
emit!(StreamClosedError { error, count });
129-
warp::reject::custom(ApiError::ServerShutdown)
130-
})?;
125+
out.send_batch_named(output, events).await.map_err(|_| {
126+
emit!(StreamClosedError { count });
127+
warp::reject::custom(ApiError::ServerShutdown)
128+
})?;
131129

132130
match receiver {
133131
None => Ok(protobuf(ExportLogsServiceResponse {}).into_response()),

src/sources/postgresql_metrics.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -227,8 +227,8 @@ impl SourceConfig for PostgresqlMetricsConfig {
227227
});
228228

229229
let metrics = metrics.into_iter().flatten();
230-
if let Err(error) = cx.out.send_batch(metrics).await {
231-
emit!(StreamClosedError { error, count });
230+
if (cx.out.send_batch(metrics).await).is_err() {
231+
emit!(StreamClosedError { count });
232232
return Err(());
233233
}
234234
}

src/sources/redis/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -279,8 +279,8 @@ impl InputHandler {
279279
event
280280
});
281281

282-
if let Err(error) = self.cx.out.send_batch(events).await {
283-
emit!(StreamClosedError { error, count });
282+
if (self.cx.out.send_batch(events).await).is_err() {
283+
emit!(StreamClosedError { count });
284284
return Err(());
285285
}
286286
}

src/sources/socket/udp.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -269,8 +269,8 @@ pub(super) fn udp(
269269

270270
tokio::select!{
271271
result = out.send_batch(events) => {
272-
if let Err(error) = result {
273-
emit!(StreamClosedError { error, count });
272+
if result.is_err() {
273+
emit!(StreamClosedError { count });
274274
return Ok(())
275275
}
276276
}

src/sources/statsd/mod.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -295,8 +295,8 @@ async fn statsd_udp(
295295
match frame {
296296
Ok(((events, _byte_size), _sock)) => {
297297
let count = events.len();
298-
if let Err(error) = out.send_batch(events).await {
299-
emit!(StreamClosedError { error, count });
298+
if (out.send_batch(events).await).is_err() {
299+
emit!(StreamClosedError { count });
300300
}
301301
}
302302
Err(error) => {

src/sources/syslog.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -364,9 +364,9 @@ pub fn udp(
364364
debug!("Finished sending.");
365365
Ok(())
366366
}
367-
Err(error) => {
367+
Err(_) => {
368368
let (count, _) = stream.size_hint();
369-
emit!(StreamClosedError { error, count });
369+
emit!(StreamClosedError { count });
370370
Err(())
371371
}
372372
}

src/sources/util/http/prelude.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -213,10 +213,10 @@ async fn handle_request(
213213

214214
let count = events.len();
215215
out.send_batch(events)
216-
.map_err(move |error: crate::source_sender::ClosedError| {
216+
.map_err(|_| {
217217
// can only fail if receiving end disconnected, so we are shutting down,
218218
// probably not gracefully.
219-
emit!(StreamClosedError { error, count });
219+
emit!(StreamClosedError { count });
220220
warp::reject::custom(RejectShuttingDown)
221221
})
222222
.and_then(|_| handle_batch_status(receiver))

src/sources/util/http_client.rs

+2-2
Original file line numberDiff line numberDiff line change
@@ -233,9 +233,9 @@ pub(crate) async fn call<
233233
debug!("Finished sending.");
234234
Ok(())
235235
}
236-
Err(error) => {
236+
Err(_) => {
237237
let (count, _) = stream.size_hint();
238-
emit!(StreamClosedError { error, count });
238+
emit!(StreamClosedError { count });
239239
Err(())
240240
}
241241
}

0 commit comments

Comments
 (0)