Skip to content

Commit 8918c66

Browse files
authored
fix(kubernetes_logs source): Fix events being empty when log namespacing is enabled (vectordotdev#18244)
* save * save * save * added tests * cleanup * cleanup * clippy cleanup * fix formatting inside macro
1 parent a4d73ca commit 8918c66

File tree

8 files changed

+490
-113
lines changed

8 files changed

+490
-113
lines changed

Cargo.lock

+1
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

lib/vector-core/Cargo.toml

+1
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ publish = false
77

88
[dependencies]
99
async-graphql = { version = "5.0.10", default-features = false, features = ["playground" ], optional = true }
10+
async-stream = { version = "0.3.5", default-features = false }
1011
async-trait = { version = "0.1", default-features = false }
1112
bitmask-enum = { version = "2.2.2", default-features = false }
1213
bytes = { version = "1.4.0", default-features = false, features = ["serde"] }
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
use async_stream::stream;
2+
use futures::{Stream, StreamExt};
3+
use std::time::Duration;
4+
5+
#[derive(Default)]
6+
pub struct Emitter<T> {
7+
values: Vec<T>,
8+
}
9+
10+
impl<T> Emitter<T> {
11+
pub fn new() -> Self {
12+
Self { values: vec![] }
13+
}
14+
pub fn emit(&mut self, value: T) {
15+
self.values.push(value);
16+
}
17+
}
18+
19+
/// Similar to `stream.filter_map(..).flatten(..)` but also allows checking for expired events
20+
/// and flushing when the input stream ends.
21+
pub fn map_with_expiration<S, T, M, E, F>(
22+
initial_state: S,
23+
input: impl Stream<Item = T> + 'static,
24+
expiration_interval: Duration,
25+
// called for each event
26+
mut map_fn: M,
27+
// called periodically to allow expiring internal state
28+
mut expiration_fn: E,
29+
// called once at the end of the input stream
30+
mut flush_fn: F,
31+
) -> impl Stream<Item = T>
32+
where
33+
M: FnMut(&mut S, T, &mut Emitter<T>),
34+
E: FnMut(&mut S, &mut Emitter<T>),
35+
F: FnMut(&mut S, &mut Emitter<T>),
36+
{
37+
let mut state = initial_state;
38+
let mut flush_stream = tokio::time::interval(expiration_interval);
39+
40+
Box::pin(stream! {
41+
futures_util::pin_mut!(input);
42+
loop {
43+
let mut emitter = Emitter::<T>::new();
44+
let done = tokio::select! {
45+
_ = flush_stream.tick() => {
46+
expiration_fn(&mut state, &mut emitter);
47+
false
48+
}
49+
maybe_event = input.next() => {
50+
match maybe_event {
51+
None => {
52+
flush_fn(&mut state, &mut emitter);
53+
true
54+
}
55+
Some(event) => {
56+
map_fn(&mut state, event, &mut emitter);
57+
false
58+
}
59+
}
60+
}
61+
};
62+
yield futures::stream::iter(emitter.values.into_iter());
63+
if done { break }
64+
}
65+
66+
})
67+
.flatten()
68+
}
69+
70+
#[cfg(test)]
71+
mod test {
72+
use super::*;
73+
74+
#[tokio::test]
75+
async fn test_simple() {
76+
let input = futures::stream::iter([1, 2, 3]);
77+
78+
let map_fn = |state: &mut i32, event, emitter: &mut Emitter<i32>| {
79+
*state += event;
80+
emitter.emit(*state);
81+
};
82+
let expiration_fn = |_state: &mut i32, _emitter: &mut Emitter<i32>| {
83+
// do nothing
84+
};
85+
let flush_fn = |state: &mut i32, emitter: &mut Emitter<i32>| {
86+
emitter.emit(*state);
87+
};
88+
let stream: Vec<i32> = map_with_expiration(
89+
0_i32,
90+
input,
91+
Duration::from_secs(100),
92+
map_fn,
93+
expiration_fn,
94+
flush_fn,
95+
)
96+
.take(4)
97+
.collect()
98+
.await;
99+
100+
assert_eq!(vec![1, 3, 6, 6], stream);
101+
}
102+
103+
#[tokio::test]
104+
async fn test_expiration() {
105+
// an input that never ends (to test expiration)
106+
let input = futures::stream::iter([1, 2, 3]).chain(futures::stream::pending());
107+
108+
let map_fn = |state: &mut i32, event, emitter: &mut Emitter<i32>| {
109+
*state += event;
110+
emitter.emit(*state);
111+
};
112+
let expiration_fn = |state: &mut i32, emitter: &mut Emitter<i32>| {
113+
emitter.emit(*state);
114+
};
115+
let flush_fn = |_state: &mut i32, _emitter: &mut Emitter<i32>| {
116+
// do nothing
117+
};
118+
let stream: Vec<i32> = map_with_expiration(
119+
0_i32,
120+
input,
121+
Duration::from_secs(1),
122+
map_fn,
123+
expiration_fn,
124+
flush_fn,
125+
)
126+
.take(4)
127+
.collect()
128+
.await;
129+
130+
assert_eq!(vec![1, 3, 6, 6], stream);
131+
}
132+
}

lib/vector-core/src/stream/mod.rs

+1
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
pub mod batcher;
22
mod concurrent_map;
33
mod driver;
4+
pub mod expiration_map;
45
mod futures_unordered_count;
56
mod partitioned_batcher;
67

src/sources/kubernetes_logs/mod.rs

+12-14
Original file line numberDiff line numberDiff line change
@@ -4,7 +4,6 @@
44
//! running inside the cluster as a DaemonSet.
55
66
#![deny(missing_docs)]
7-
87
use std::{path::PathBuf, time::Duration};
98

109
use bytes::Bytes;
@@ -32,11 +31,10 @@ use vector_common::{
3231
TimeZone,
3332
};
3433
use vector_config::configurable_component;
35-
use vector_core::{
36-
config::LegacyKey, config::LogNamespace, transform::TaskTransform, EstimatedJsonEncodedSizeOf,
37-
};
34+
use vector_core::{config::LegacyKey, config::LogNamespace, EstimatedJsonEncodedSizeOf};
3835
use vrl::value::{kind::Collection, Kind};
3936

37+
use crate::sources::kubernetes_logs::partial_events_merger::merge_partial_events;
4038
use crate::{
4139
config::{
4240
log_schema, ComponentKey, DataType, GenerateConfig, GlobalOptions, SourceConfig,
@@ -72,9 +70,6 @@ use self::node_metadata_annotator::NodeMetadataAnnotator;
7270
use self::parser::Parser;
7371
use self::pod_metadata_annotator::PodMetadataAnnotator;
7472

75-
/// The key we use for `file` field.
76-
const FILE_KEY: &str = "file";
77-
7873
/// The `self_node_name` value env var key.
7974
const SELF_NODE_NAME_ENV_KEY: &str = "VECTOR_SELF_NODE_NAME";
8075

@@ -781,12 +776,6 @@ impl Source {
781776

782777
let (file_source_tx, file_source_rx) = futures::channel::mpsc::channel::<Vec<Line>>(2);
783778

784-
let mut parser = Parser::new(log_namespace);
785-
let partial_events_merger = Box::new(partial_events_merger::build(
786-
auto_partial_merge,
787-
log_namespace,
788-
));
789-
790779
let checkpoints = checkpointer.view();
791780
let events = file_source_rx.flat_map(futures::stream::iter);
792781
let bytes_received = register!(BytesReceived::from(Protocol::HTTP));
@@ -800,6 +789,7 @@ impl Source {
800789
ingestion_timestamp_field.as_ref(),
801790
log_namespace,
802791
);
792+
803793
let file_info = annotator.annotate(&mut event, &line.filename);
804794

805795
emit!(KubernetesLogsEventsReceived {
@@ -834,14 +824,22 @@ impl Source {
834824
checkpoints.update(line.file_id, line.end_offset);
835825
event
836826
});
827+
828+
let mut parser = Parser::new(log_namespace);
837829
let events = events.flat_map(move |event| {
838830
let mut buf = OutputBuffer::with_capacity(1);
839831
parser.transform(&mut buf, event);
840832
futures::stream::iter(buf.into_events())
841833
});
834+
842835
let (events_count, _) = events.size_hint();
843836

844-
let mut stream = partial_events_merger.transform(Box::pin(events));
837+
let mut stream = if auto_partial_merge {
838+
merge_partial_events(events, log_namespace).left_stream()
839+
} else {
840+
events.right_stream()
841+
};
842+
845843
let event_processing_loop = out.send_event_stream(&mut stream);
846844

847845
let mut lifecycle = Lifecycle::new();

0 commit comments

Comments
 (0)