Skip to content

Commit e46fae7

Browse files
feat(kubernetes_logs): use kube-apiserver cache for list requests (#17095)
Signed-off-by: m.nabokikh <[email protected]> Co-authored-by: Spencer Gilbert <[email protected]>
1 parent d245927 commit e46fae7

File tree

4 files changed

+80
-65
lines changed

4 files changed

+80
-65
lines changed

Cargo.lock

+53-59
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

+2-2
Original file line numberDiff line numberDiff line change
@@ -271,8 +271,8 @@ indexmap = { version = "~1.9.3", default-features = false, features = ["serde"]
271271
infer = { version = "0.13.0", default-features = false, optional = true}
272272
indoc = { version = "2.0.1", default-features = false }
273273
inventory = { version = "0.3.5", default-features = false }
274-
k8s-openapi = { version = "0.16.0", default-features = false, features = ["api", "v1_19"], optional = true }
275-
kube = { version = "0.75.0", default-features = false, features = ["client", "native-tls", "runtime"], optional = true }
274+
k8s-openapi = { version = "0.18.0", default-features = false, features = ["api", "v1_26"], optional = true }
275+
kube = { version = "0.82.0", default-features = false, features = ["client", "openssl-tls", "runtime"], optional = true }
276276
listenfd = { version = "1.0.1", default-features = false, optional = true }
277277
logfmt = { version = "0.0.2", default-features = false, optional = true }
278278
lru = { version = "0.10.0", default-features = false, optional = true }

src/sources/kubernetes_logs/mod.rs

+20-4
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,7 @@ use futures_util::Stream;
1919
use k8s_openapi::api::core::v1::{Namespace, Node, Pod};
2020
use k8s_paths_provider::K8sPathsProvider;
2121
use kube::{
22-
api::{Api, ListParams},
22+
api::Api,
2323
config::{self, KubeConfigOptions},
2424
runtime::{
2525
reflector::{self},
@@ -219,6 +219,9 @@ pub struct Config {
219219
#[configurable(metadata(docs::examples = "/path/to/.kube/config"))]
220220
kube_config_file: Option<PathBuf>,
221221

222+
/// Determines if requests to the kube-apiserver can be served by a cache.
223+
use_apiserver_cache: bool,
224+
222225
/// How long to delay removing metadata entries from the cache when a pod deletion event
223226
/// event is received from the watch stream.
224227
///
@@ -271,6 +274,7 @@ impl Default for Config {
271274
ingestion_timestamp_field: None,
272275
timezone: None,
273276
kube_config_file: None,
277+
use_apiserver_cache: false,
274278
delay_deletion_ms: default_delay_deletion_ms(),
275279
log_namespace: None,
276280
}
@@ -519,6 +523,7 @@ struct Source {
519523
max_line_bytes: usize,
520524
fingerprint_lines: usize,
521525
glob_minimum_cooldown: Duration,
526+
use_apiserver_cache: bool,
522527
ingestion_timestamp_field: Option<OwnedTargetPath>,
523528
delay_deletion: Duration,
524529
}
@@ -595,6 +600,7 @@ impl Source {
595600
max_line_bytes: config.max_line_bytes,
596601
fingerprint_lines: config.fingerprint_lines,
597602
glob_minimum_cooldown,
603+
use_apiserver_cache: config.use_apiserver_cache,
598604
ingestion_timestamp_field,
599605
delay_deletion,
600606
})
@@ -625,6 +631,7 @@ impl Source {
625631
max_line_bytes,
626632
fingerprint_lines,
627633
glob_minimum_cooldown,
634+
use_apiserver_cache,
628635
ingestion_timestamp_field,
629636
delay_deletion,
630637
} = self;
@@ -633,11 +640,18 @@ impl Source {
633640

634641
let pods = Api::<Pod>::all(client.clone());
635642

643+
let list_semantic = if use_apiserver_cache {
644+
watcher::ListSemantic::Any
645+
} else {
646+
watcher::ListSemantic::MostRecent
647+
};
648+
636649
let pod_watcher = watcher(
637650
pods,
638-
ListParams {
651+
watcher::Config {
639652
field_selector: Some(field_selector),
640653
label_selector: Some(label_selector),
654+
list_semantic: list_semantic.clone(),
641655
..Default::default()
642656
},
643657
)
@@ -658,8 +672,9 @@ impl Source {
658672
let namespaces = Api::<Namespace>::all(client.clone());
659673
let ns_watcher = watcher(
660674
namespaces,
661-
ListParams {
675+
watcher::Config {
662676
label_selector: Some(namespace_label_selector),
677+
list_semantic: list_semantic.clone(),
663678
..Default::default()
664679
},
665680
)
@@ -680,8 +695,9 @@ impl Source {
680695
let nodes = Api::<Node>::all(client);
681696
let node_watcher = watcher(
682697
nodes,
683-
ListParams {
698+
watcher::Config {
684699
field_selector: Some(node_selector),
700+
list_semantic,
685701
..Default::default()
686702
},
687703
)

0 commit comments

Comments
 (0)