Skip to content

Commit

Permalink
Exploring shared cache based on multi reflectors
Browse files Browse the repository at this point in the history
Signed-off-by: Danil-Grigorev <[email protected]>
  • Loading branch information
Danil-Grigorev committed Feb 11, 2025
1 parent 550e50f commit 0ab6540
Show file tree
Hide file tree
Showing 15 changed files with 480 additions and 210 deletions.
1 change: 1 addition & 0 deletions examples/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ ws = ["kube/ws"]
latest = ["k8s-openapi/latest"]

[dev-dependencies]
parking_lot.workspace = true
tokio-util.workspace = true
assert-json-diff.workspace = true
garde = { version = "0.22.0", default-features = false, features = ["derive"] }
Expand Down
168 changes: 77 additions & 91 deletions examples/multi_reflector.rs
Original file line number Diff line number Diff line change
@@ -1,74 +1,84 @@
use futures::{future, StreamExt};
use futures::{future, stream, StreamExt};
use k8s_openapi::api::{
apps::v1::Deployment,
core::v1::{ConfigMap, Secret},
};
use kube::{
api::{DynamicObject, GroupVersionKind},
core::TypedResource,
runtime::{
reflector,
reflector::{ObjectRef, Store},
watcher, WatchStreamExt,
reflector::store::CacheWriter,
watcher::{self, dynamic_watcher},
WatchStreamExt,
},
Api, Client,
Api, Client, Resource,
};
use parking_lot::RwLock;
use serde::de::DeserializeOwned;
use std::sync::Arc;
use tracing::*;

// This does not work because Resource trait is not dyn safe.
/*
use std::any::TypeId;
use std::collections::HashMap;
use k8s_openapi::NamespaceResourceScope;
use kube::api::{Resource, ResourceExt};
struct MultiStore {
stores: HashMap<TypeId, Store<dyn Resource<DynamicType = (), Scope = NamespaceResourceScope>>>,
}
impl MultiStore {
fn get<K: Resource<DynamicType = ()>>(&self, name: &str, ns: &str) -> Option<Arc<K>> {
let oref = ObjectRef::<K>::new(name).within(ns);
if let Some(store) = self.stores.get(&TypeId::of::<K>()) {
store.get(oref)
} else {
None
}
}
}*/

// explicit store can work
struct MultiStore {
deploys: Store<Deployment>,
cms: Store<ConfigMap>,
secs: Store<Secret>,
type Cache = Arc<RwLock<HashMap<LookupKey, Arc<DynamicObject>>>>;

#[derive(Default, Clone, Hash, PartialEq, Eq, Debug)]
struct LookupKey {
gvk: GroupVersionKind,
name: Option<String>,
namespace: Option<String>,
}
// but using generics to help out won't because the K needs to be concretised
/*
impl MultiStore {
fn get<K: Resource<DynamicType = ()>>(&self, name: &str, ns: &str) -> Option<Arc<Option<K>>> {
let oref = ObjectRef::<K>::new(name).within(ns);
let kind = K::kind(&()).to_owned();
match kind.as_ref() {
"Deployment" => self.deploys.get(&ObjectRef::new(name).within(ns)),
"ConfigMap" => self.cms.get(&ObjectRef::new(name).within(ns)),
"Secret" => self.secs.get(&ObjectRef::new(name).within(ns)),
_ => None,

impl LookupKey {
fn new<R: TypedResource>(resource: &R) -> LookupKey {
let meta = resource.meta();
LookupKey {
gvk: resource.gvk(),
name: meta.name.clone(),
namespace: meta.namespace.clone(),
}
None
}
}
*/
// so left with this

impl MultiStore {
fn get_deploy(&self, name: &str, ns: &str) -> Option<Arc<Deployment>> {
self.deploys.get(&ObjectRef::<Deployment>::new(name).within(ns))
}
#[derive(Default, Clone)]
struct MultiCache {
store: Cache,
}

fn get_secret(&self, name: &str, ns: &str) -> Option<Arc<Secret>> {
self.secs.get(&ObjectRef::<Secret>::new(name).within(ns))
impl MultiCache {
fn get<K: Resource<DynamicType = impl Default> + DeserializeOwned + Clone>(
&self,
name: &str,
ns: &str,
) -> Option<Arc<K>> {
let obj = self
.store
.read()
.get(&LookupKey {
gvk: K::gvk(&Default::default()),
name: Some(name.into()),
namespace: if !ns.is_empty() { Some(ns.into()) } else { None },
})?
.as_ref()
.clone();
obj.try_parse().ok().map(Arc::new)
}
}

fn get_cm(&self, name: &str, ns: &str) -> Option<Arc<ConfigMap>> {
self.cms.get(&ObjectRef::<ConfigMap>::new(name).within(ns))
impl CacheWriter<DynamicObject> for MultiCache {
/// Applies a single watcher event to the store
fn apply_watcher_event(&mut self, event: &watcher::Event<DynamicObject>) {
match event {
watcher::Event::Init | watcher::Event::InitDone(_) => {}
watcher::Event::Delete(obj) => {
self.store.write().remove(&LookupKey::new(obj));
}
watcher::Event::InitApply(obj) | watcher::Event::Apply(obj) => {
self.store
.write()
.insert(LookupKey::new(obj), Arc::new(obj.clone()));
}
}
}
}

Expand All @@ -77,60 +87,36 @@ async fn main() -> anyhow::Result<()> {
tracing_subscriber::fmt::init();
let client = Client::try_default().await?;

let deploys: Api<Deployment> = Api::default_namespaced(client.clone());
let cms: Api<ConfigMap> = Api::default_namespaced(client.clone());
let secret: Api<Secret> = Api::default_namespaced(client.clone());

let (dep_reader, dep_writer) = reflector::store::<Deployment>();
let (cm_reader, cm_writer) = reflector::store::<ConfigMap>();
let (sec_reader, sec_writer) = reflector::store::<Secret>();
// multistore
let combo_stream = stream::select_all(vec![
dynamic_watcher(Api::<Deployment>::all(client.clone()), Default::default()).boxed(),
dynamic_watcher(Api::<ConfigMap>::all(client.clone()), Default::default()).boxed(),
dynamic_watcher(Api::<Secret>::all(client.clone()), Default::default()).boxed(),
]);

let cfg = watcher::Config::default();
let dep_watcher = watcher(deploys, cfg.clone())
.reflect(dep_writer)
.applied_objects()
.for_each(|_| future::ready(()));
let cm_watcher = watcher(cms, cfg.clone())
.reflect(cm_writer)
let multi_writer = MultiCache::default();
let watcher = combo_stream
.reflect(multi_writer.clone())
.applied_objects()
.for_each(|_| future::ready(()));
let sec_watcher = watcher(secret, cfg)
.reflect(sec_writer)
.applied_objects()
.for_each(|_| future::ready(()));
// poll these forever

// multistore
let stores = MultiStore {
deploys: dep_reader,
cms: cm_reader,
secs: sec_reader,
};

// simulate doing stuff with the stores from some other thread
tokio::spawn(async move {
// Show state every 5 seconds of watching
info!("waiting for them to be ready");
stores.deploys.wait_until_ready().await.unwrap();
stores.cms.wait_until_ready().await.unwrap();
stores.secs.wait_until_ready().await.unwrap();
info!("stores initialised");
// can use helper accessors
info!(
"common cm: {:?}",
stores.get_cm("kube-root-ca.crt", "kube-system").unwrap()
);
loop {
tokio::time::sleep(std::time::Duration::from_secs(5)).await;
info!("cache content: {:?}", multi_writer.store.read().keys());
info!(
"common cm: {:?}",
multi_writer.get::<ConfigMap>("kube-root-ca.crt", "kube-system")
);
// access individual sub stores
info!("Current deploys count: {}", stores.deploys.state().len());
info!("Current objects count: {}", multi_writer.store.read().len());
}
});
// info!("long watches starting");
info!("long watches starting");
tokio::select! {
r = dep_watcher => println!("dep watcher exit: {r:?}"),
r = cm_watcher => println!("cm watcher exit: {r:?}"),
r = sec_watcher => println!("sec watcher exit: {r:?}"),
r = watcher => println!("watcher exit: {r:?}"),
}

Ok(())
Expand Down
2 changes: 1 addition & 1 deletion kube-core/src/gvk.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use thiserror::Error;
pub struct ParseGroupVersionError(pub String);

/// Core information about an API Resource.
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash)]
#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash, Default)]
pub struct GroupVersionKind {
/// API group
pub group: String,
Expand Down
2 changes: 1 addition & 1 deletion kube-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ pub mod gvk;
pub use gvk::{GroupVersion, GroupVersionKind, GroupVersionResource};

pub mod metadata;
pub use metadata::{ListMeta, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt, TypeMeta};
pub use metadata::{ListMeta, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt, TypeMeta, TypedResource};

pub mod labels;

Expand Down
119 changes: 118 additions & 1 deletion kube-core/src/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ use std::{borrow::Cow, marker::PhantomData};
pub use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ListMeta, ObjectMeta};
use serde::{Deserialize, Serialize};

use crate::{DynamicObject, Resource};
use crate::{ApiResource, DynamicObject, GroupVersionKind, Resource};

/// Type information that is flattened into every kubernetes object
#[derive(Deserialize, Serialize, Clone, Default, Debug, Eq, PartialEq, Hash)]
Expand Down Expand Up @@ -175,6 +175,123 @@ impl<K: Resource> Resource for PartialObjectMeta<K> {
}
}

///

Check warning on line 178 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:178:1 | 178 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs = note: `#[warn(clippy::empty_docs)]` on by default
pub trait TypedResource: Resource + Sized {
///

Check warning on line 180 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:180:5 | 180 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs
fn type_meta(&self) -> TypeMeta;
///

Check warning on line 182 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:182:5 | 182 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs
fn gvk(&self) -> GroupVersionKind;
///

Check warning on line 184 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:184:5 | 184 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs
fn kind(&self) -> Cow<'_, str>;
///

Check warning on line 186 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:186:5 | 186 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs
fn group(&self) -> Cow<'_, str>;
///

Check warning on line 188 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:188:5 | 188 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs
fn version(&self) -> Cow<'_, str>;
///

Check warning on line 190 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:190:5 | 190 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs
fn plural(&self) -> Cow<'_, str>;
}

impl<K> TypedResource for K
where
K: Resource,
(K, K::DynamicType): TypedResourceImpl<Resource = K>,
{
fn type_meta(&self) -> TypeMeta {
<(K, K::DynamicType) as TypedResourceImpl>::type_meta(self)

Check warning on line 200 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L199-L200

Added lines #L199 - L200 were not covered by tests
}

fn gvk(&self) -> GroupVersionKind {
<(K, K::DynamicType) as TypedResourceImpl>::gvk(self)

Check warning on line 204 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L203-L204

Added lines #L203 - L204 were not covered by tests
}

fn kind(&self) -> Cow<'_, str> {
<(K, K::DynamicType) as TypedResourceImpl>::kind(self)

Check warning on line 208 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L207-L208

Added lines #L207 - L208 were not covered by tests
}
///

Check warning on line 210 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:210:5 | 210 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs
fn group(&self) -> Cow<'_, str> {
<(K, K::DynamicType) as TypedResourceImpl>::group(self)

Check warning on line 212 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L211-L212

Added lines #L211 - L212 were not covered by tests
}
///

Check warning on line 214 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:214:5 | 214 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs
fn version(&self) -> Cow<'_, str> {
<(K, K::DynamicType) as TypedResourceImpl>::version(self)

Check warning on line 216 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L215-L216

Added lines #L215 - L216 were not covered by tests
}
///

Check warning on line 218 in kube-core/src/metadata.rs

View workflow job for this annotation

GitHub Actions / clippy_nightly

empty doc comment

warning: empty doc comment --> kube-core/src/metadata.rs:218:5 | 218 | /// | ^^^ | = help: consider removing or filling it = help: for further information visit https://rust-lang.github.io/rust-clippy/master/index.html#empty_docs
fn plural(&self) -> Cow<'_, str> {
<(K, K::DynamicType) as TypedResourceImpl>::plural(self)

Check warning on line 220 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L219-L220

Added lines #L219 - L220 were not covered by tests
}
}

#[doc(hidden)]
// Workaround for https://github.com/rust-lang/rust/issues/20400
pub trait TypedResourceImpl {
type Resource: Resource;
fn type_meta(res: &Self::Resource) -> TypeMeta;
fn gvk(res: &Self::Resource) -> GroupVersionKind;
fn kind(res: &Self::Resource) -> Cow<'_, str>;
fn group(res: &Self::Resource) -> Cow<'_, str>;
fn version(res: &Self::Resource) -> Cow<'_, str>;
fn plural(res: &Self::Resource) -> Cow<'_, str>;
}

impl<K> TypedResourceImpl for (K, ())
where
K: Resource<DynamicType = ()>,
{
type Resource = K;

fn type_meta(_: &Self::Resource) -> TypeMeta {
TypeMeta::resource::<K>()

Check warning on line 243 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L242-L243

Added lines #L242 - L243 were not covered by tests
}

fn gvk(res: &Self::Resource) -> GroupVersionKind {
GroupVersionKind::gvk(&res.group(), &res.version(), &res.kind())

Check warning on line 247 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L246-L247

Added lines #L246 - L247 were not covered by tests
}

fn kind(_: &Self::Resource) -> Cow<'_, str> {
K::kind(&())

Check warning on line 251 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L250-L251

Added lines #L250 - L251 were not covered by tests
}

fn group(_: &Self::Resource) -> Cow<'_, str> {
K::group(&())

Check warning on line 255 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L254-L255

Added lines #L254 - L255 were not covered by tests
}

fn version(_: &Self::Resource) -> Cow<'_, str> {
K::version(&())

Check warning on line 259 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L258-L259

Added lines #L258 - L259 were not covered by tests
}

fn plural(_: &Self::Resource) -> Cow<'_, str> {
K::plural(&())

Check warning on line 263 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L262-L263

Added lines #L262 - L263 were not covered by tests
}
}

impl TypedResourceImpl for (DynamicObject, ApiResource) {
type Resource = DynamicObject;

fn type_meta(obj: &Self::Resource) -> TypeMeta {
obj.types.clone().unwrap_or_default()

Check warning on line 271 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L270-L271

Added lines #L270 - L271 were not covered by tests
}

fn gvk(res: &Self::Resource) -> GroupVersionKind {
res.type_meta().try_into().unwrap_or_default()

Check warning on line 275 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L274-L275

Added lines #L274 - L275 were not covered by tests
}

fn kind(res: &Self::Resource) -> Cow<'_, str> {
Cow::from(res.type_meta().kind)

Check warning on line 279 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L278-L279

Added lines #L278 - L279 were not covered by tests
}

fn group(res: &Self::Resource) -> Cow<'_, str> {
Cow::from(res.gvk().group)

Check warning on line 283 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L282-L283

Added lines #L282 - L283 were not covered by tests
}

fn version(res: &Self::Resource) -> Cow<'_, str> {
Cow::from(res.gvk().version)

Check warning on line 287 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L286-L287

Added lines #L286 - L287 were not covered by tests
}

fn plural(res: &Self::Resource) -> Cow<'_, str> {
Cow::from(ApiResource::from_gvk(&res.gvk()).plural)

Check warning on line 291 in kube-core/src/metadata.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/metadata.rs#L290-L291

Added lines #L290 - L291 were not covered by tests
}
}

#[cfg(test)]
mod test {
use super::{ObjectMeta, PartialObjectMeta, PartialObjectMetaExt};
Expand Down
7 changes: 7 additions & 0 deletions kube-core/src/resource.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ use std::{borrow::Cow, collections::BTreeMap};

pub use k8s_openapi::{ClusterResourceScope, NamespaceResourceScope, ResourceScope, SubResourceScope};

use crate::GroupVersionKind;

/// Indicates that a [`Resource`] is of an indeterminate dynamic scope.
pub struct DynamicResourceScope {}
impl ResourceScope for DynamicResourceScope {}
Expand Down Expand Up @@ -54,6 +56,11 @@ pub trait Resource {
/// This is known as the resource in apimachinery, we rename it for disambiguation.
fn plural(dt: &Self::DynamicType) -> Cow<'_, str>;

/// Generates an object reference for the resource
fn gvk(dt: &Self::DynamicType) -> GroupVersionKind {
GroupVersionKind::gvk(&Self::group(dt), &Self::version(dt), &Self::kind(dt))

Check warning on line 61 in kube-core/src/resource.rs

View check run for this annotation

Codecov / codecov/patch

kube-core/src/resource.rs#L60-L61

Added lines #L60 - L61 were not covered by tests
}

/// Creates a url path for http requests for this resource
fn url_path(dt: &Self::DynamicType, namespace: Option<&str>) -> String {
let n = if let Some(ns) = namespace {
Expand Down
Loading

0 comments on commit 0ab6540

Please sign in to comment.