diff --git a/examples/Cargo.toml b/examples/Cargo.toml index 1c7740f1c..e51081e3e 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -22,6 +22,7 @@ ws = ["kube/ws"] latest = ["k8s-openapi/latest"] [dev-dependencies] +parking_lot.workspace = true tokio-util.workspace = true assert-json-diff.workspace = true garde = { version = "0.22.0", default-features = false, features = ["derive"] } diff --git a/examples/multi_reflector.rs b/examples/multi_reflector.rs index 9f3c4537d..1e4f1038d 100644 --- a/examples/multi_reflector.rs +++ b/examples/multi_reflector.rs @@ -1,74 +1,84 @@ -use futures::{future, StreamExt}; +use futures::{future, stream, StreamExt}; use k8s_openapi::api::{ apps::v1::Deployment, core::v1::{ConfigMap, Secret}, }; use kube::{ + api::{DynamicObject, GroupVersionKind}, + core::TypedResource, runtime::{ - reflector, - reflector::{ObjectRef, Store}, - watcher, WatchStreamExt, + reflector::store::CacheWriter, + watcher::{self, dynamic_watcher}, + WatchStreamExt, }, - Api, Client, + Api, Client, Resource, }; +use parking_lot::RwLock; +use serde::de::DeserializeOwned; use std::sync::Arc; use tracing::*; -// This does not work because Resource trait is not dyn safe. -/* -use std::any::TypeId; use std::collections::HashMap; -use k8s_openapi::NamespaceResourceScope; -use kube::api::{Resource, ResourceExt}; -struct MultiStore { - stores: HashMap>>, - } -impl MultiStore { - fn get>(&self, name: &str, ns: &str) -> Option> { - let oref = ObjectRef::::new(name).within(ns); - if let Some(store) = self.stores.get(&TypeId::of::()) { - store.get(oref) - } else { - None - } - } -}*/ -// explicit store can work -struct MultiStore { - deploys: Store, - cms: Store, - secs: Store, +type Cache = Arc>>>; + +#[derive(Default, Clone, Hash, PartialEq, Eq, Debug)] +struct LookupKey { + gvk: GroupVersionKind, + name: Option, + namespace: Option, } -// but using generics to help out won't because the K needs to be concretised -/* -impl MultiStore { - fn get>(&self, name: &str, ns: &str) -> Option>> { - let oref = ObjectRef::::new(name).within(ns); - let kind = K::kind(&()).to_owned(); - match kind.as_ref() { - "Deployment" => self.deploys.get(&ObjectRef::new(name).within(ns)), - "ConfigMap" => self.cms.get(&ObjectRef::new(name).within(ns)), - "Secret" => self.secs.get(&ObjectRef::new(name).within(ns)), - _ => None, + +impl LookupKey { + fn new(resource: &R) -> LookupKey { + let meta = resource.meta(); + LookupKey { + gvk: resource.gvk(), + name: meta.name.clone(), + namespace: meta.namespace.clone(), } - None } } -*/ -// so left with this -impl MultiStore { - fn get_deploy(&self, name: &str, ns: &str) -> Option> { - self.deploys.get(&ObjectRef::::new(name).within(ns)) - } +#[derive(Default, Clone)] +struct MultiCache { + store: Cache, +} - fn get_secret(&self, name: &str, ns: &str) -> Option> { - self.secs.get(&ObjectRef::::new(name).within(ns)) +impl MultiCache { + fn get + DeserializeOwned + Clone>( + &self, + name: &str, + ns: &str, + ) -> Option> { + let obj = self + .store + .read() + .get(&LookupKey { + gvk: K::gvk(&Default::default()), + name: Some(name.into()), + namespace: if !ns.is_empty() { Some(ns.into()) } else { None }, + })? + .as_ref() + .clone(); + obj.try_parse().ok().map(Arc::new) } +} - fn get_cm(&self, name: &str, ns: &str) -> Option> { - self.cms.get(&ObjectRef::::new(name).within(ns)) +impl CacheWriter for MultiCache { + /// Applies a single watcher event to the store + fn apply_watcher_event(&mut self, event: &watcher::Event) { + match event { + watcher::Event::Init | watcher::Event::InitDone(_) => {} + watcher::Event::Delete(obj) => { + self.store.write().remove(&LookupKey::new(obj)); + } + watcher::Event::InitApply(obj) | watcher::Event::Apply(obj) => { + self.store + .write() + .insert(LookupKey::new(obj), Arc::new(obj.clone())); + } + } } } @@ -77,60 +87,36 @@ async fn main() -> anyhow::Result<()> { tracing_subscriber::fmt::init(); let client = Client::try_default().await?; - let deploys: Api = Api::default_namespaced(client.clone()); - let cms: Api = Api::default_namespaced(client.clone()); - let secret: Api = Api::default_namespaced(client.clone()); - - let (dep_reader, dep_writer) = reflector::store::(); - let (cm_reader, cm_writer) = reflector::store::(); - let (sec_reader, sec_writer) = reflector::store::(); + // multistore + let combo_stream = stream::select_all(vec![ + dynamic_watcher(Api::::all(client.clone()), Default::default()).boxed(), + dynamic_watcher(Api::::all(client.clone()), Default::default()).boxed(), + dynamic_watcher(Api::::all(client.clone()), Default::default()).boxed(), + ]); - let cfg = watcher::Config::default(); - let dep_watcher = watcher(deploys, cfg.clone()) - .reflect(dep_writer) - .applied_objects() - .for_each(|_| future::ready(())); - let cm_watcher = watcher(cms, cfg.clone()) - .reflect(cm_writer) + let multi_writer = MultiCache::default(); + let watcher = combo_stream + .reflect(multi_writer.clone()) .applied_objects() .for_each(|_| future::ready(())); - let sec_watcher = watcher(secret, cfg) - .reflect(sec_writer) - .applied_objects() - .for_each(|_| future::ready(())); - // poll these forever - - // multistore - let stores = MultiStore { - deploys: dep_reader, - cms: cm_reader, - secs: sec_reader, - }; // simulate doing stuff with the stores from some other thread tokio::spawn(async move { - // Show state every 5 seconds of watching - info!("waiting for them to be ready"); - stores.deploys.wait_until_ready().await.unwrap(); - stores.cms.wait_until_ready().await.unwrap(); - stores.secs.wait_until_ready().await.unwrap(); - info!("stores initialised"); // can use helper accessors - info!( - "common cm: {:?}", - stores.get_cm("kube-root-ca.crt", "kube-system").unwrap() - ); loop { tokio::time::sleep(std::time::Duration::from_secs(5)).await; + info!("cache content: {:?}", multi_writer.store.read().keys()); + info!( + "common cm: {:?}", + multi_writer.get::("kube-root-ca.crt", "kube-system") + ); // access individual sub stores - info!("Current deploys count: {}", stores.deploys.state().len()); + info!("Current objects count: {}", multi_writer.store.read().len()); } }); - // info!("long watches starting"); + info!("long watches starting"); tokio::select! { - r = dep_watcher => println!("dep watcher exit: {r:?}"), - r = cm_watcher => println!("cm watcher exit: {r:?}"), - r = sec_watcher => println!("sec watcher exit: {r:?}"), + r = watcher => println!("watcher exit: {r:?}"), } Ok(()) diff --git a/kube-core/src/gvk.rs b/kube-core/src/gvk.rs index 91b986601..77d140eda 100644 --- a/kube-core/src/gvk.rs +++ b/kube-core/src/gvk.rs @@ -12,7 +12,7 @@ use thiserror::Error; pub struct ParseGroupVersionError(pub String); /// Core information about an API Resource. -#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash)] +#[derive(Deserialize, Serialize, Debug, Clone, PartialEq, Eq, Hash, Default)] pub struct GroupVersionKind { /// API group pub group: String, diff --git a/kube-core/src/lib.rs b/kube-core/src/lib.rs index 6ba9f81b6..58d1e4be6 100644 --- a/kube-core/src/lib.rs +++ b/kube-core/src/lib.rs @@ -35,7 +35,7 @@ pub mod gvk; pub use gvk::{GroupVersion, GroupVersionKind, GroupVersionResource}; pub mod metadata; -pub use metadata::{ListMeta, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt, TypeMeta}; +pub use metadata::{ListMeta, ObjectMeta, PartialObjectMeta, PartialObjectMetaExt, TypeMeta, TypedResource}; pub mod labels; diff --git a/kube-core/src/metadata.rs b/kube-core/src/metadata.rs index 67edf6e16..18bef1f46 100644 --- a/kube-core/src/metadata.rs +++ b/kube-core/src/metadata.rs @@ -4,7 +4,7 @@ use std::{borrow::Cow, marker::PhantomData}; pub use k8s_openapi::apimachinery::pkg::apis::meta::v1::{ListMeta, ObjectMeta}; use serde::{Deserialize, Serialize}; -use crate::{DynamicObject, Resource}; +use crate::{ApiResource, DynamicObject, GroupVersionKind, Resource}; /// Type information that is flattened into every kubernetes object #[derive(Deserialize, Serialize, Clone, Default, Debug, Eq, PartialEq, Hash)] @@ -175,6 +175,123 @@ impl Resource for PartialObjectMeta { } } +/// +pub trait TypedResource: Resource + Sized { + /// + fn type_meta(&self) -> TypeMeta; + /// + fn gvk(&self) -> GroupVersionKind; + /// + fn kind(&self) -> Cow<'_, str>; + /// + fn group(&self) -> Cow<'_, str>; + /// + fn version(&self) -> Cow<'_, str>; + /// + fn plural(&self) -> Cow<'_, str>; +} + +impl TypedResource for K +where + K: Resource, + (K, K::DynamicType): TypedResourceImpl, +{ + fn type_meta(&self) -> TypeMeta { + <(K, K::DynamicType) as TypedResourceImpl>::type_meta(self) + } + + fn gvk(&self) -> GroupVersionKind { + <(K, K::DynamicType) as TypedResourceImpl>::gvk(self) + } + + fn kind(&self) -> Cow<'_, str> { + <(K, K::DynamicType) as TypedResourceImpl>::kind(self) + } + /// + fn group(&self) -> Cow<'_, str> { + <(K, K::DynamicType) as TypedResourceImpl>::group(self) + } + /// + fn version(&self) -> Cow<'_, str> { + <(K, K::DynamicType) as TypedResourceImpl>::version(self) + } + /// + fn plural(&self) -> Cow<'_, str> { + <(K, K::DynamicType) as TypedResourceImpl>::plural(self) + } +} + +#[doc(hidden)] +// Workaround for https://github.com/rust-lang/rust/issues/20400 +pub trait TypedResourceImpl { + type Resource: Resource; + fn type_meta(res: &Self::Resource) -> TypeMeta; + fn gvk(res: &Self::Resource) -> GroupVersionKind; + fn kind(res: &Self::Resource) -> Cow<'_, str>; + fn group(res: &Self::Resource) -> Cow<'_, str>; + fn version(res: &Self::Resource) -> Cow<'_, str>; + fn plural(res: &Self::Resource) -> Cow<'_, str>; +} + +impl TypedResourceImpl for (K, ()) +where + K: Resource, +{ + type Resource = K; + + fn type_meta(_: &Self::Resource) -> TypeMeta { + TypeMeta::resource::() + } + + fn gvk(res: &Self::Resource) -> GroupVersionKind { + GroupVersionKind::gvk(&res.group(), &res.version(), &res.kind()) + } + + fn kind(_: &Self::Resource) -> Cow<'_, str> { + K::kind(&()) + } + + fn group(_: &Self::Resource) -> Cow<'_, str> { + K::group(&()) + } + + fn version(_: &Self::Resource) -> Cow<'_, str> { + K::version(&()) + } + + fn plural(_: &Self::Resource) -> Cow<'_, str> { + K::plural(&()) + } +} + +impl TypedResourceImpl for (DynamicObject, ApiResource) { + type Resource = DynamicObject; + + fn type_meta(obj: &Self::Resource) -> TypeMeta { + obj.types.clone().unwrap_or_default() + } + + fn gvk(res: &Self::Resource) -> GroupVersionKind { + res.type_meta().try_into().unwrap_or_default() + } + + fn kind(res: &Self::Resource) -> Cow<'_, str> { + Cow::from(res.type_meta().kind) + } + + fn group(res: &Self::Resource) -> Cow<'_, str> { + Cow::from(res.gvk().group) + } + + fn version(res: &Self::Resource) -> Cow<'_, str> { + Cow::from(res.gvk().version) + } + + fn plural(res: &Self::Resource) -> Cow<'_, str> { + Cow::from(ApiResource::from_gvk(&res.gvk()).plural) + } +} + #[cfg(test)] mod test { use super::{ObjectMeta, PartialObjectMeta, PartialObjectMetaExt}; diff --git a/kube-core/src/resource.rs b/kube-core/src/resource.rs index 3ab1d88df..6dffdeeb1 100644 --- a/kube-core/src/resource.rs +++ b/kube-core/src/resource.rs @@ -8,6 +8,8 @@ use std::{borrow::Cow, collections::BTreeMap}; pub use k8s_openapi::{ClusterResourceScope, NamespaceResourceScope, ResourceScope, SubResourceScope}; +use crate::GroupVersionKind; + /// Indicates that a [`Resource`] is of an indeterminate dynamic scope. pub struct DynamicResourceScope {} impl ResourceScope for DynamicResourceScope {} @@ -54,6 +56,11 @@ pub trait Resource { /// This is known as the resource in apimachinery, we rename it for disambiguation. fn plural(dt: &Self::DynamicType) -> Cow<'_, str>; + /// Generates an object reference for the resource + fn gvk(dt: &Self::DynamicType) -> GroupVersionKind { + GroupVersionKind::gvk(&Self::group(dt), &Self::version(dt), &Self::kind(dt)) + } + /// Creates a url path for http requests for this resource fn url_path(dt: &Self::DynamicType, namespace: Option<&str>) -> String { let n = if let Some(ns) = namespace { diff --git a/kube-core/src/watch.rs b/kube-core/src/watch.rs index f2423af8d..6c28b58e7 100644 --- a/kube-core/src/watch.rs +++ b/kube-core/src/watch.rs @@ -43,7 +43,7 @@ impl Debug for WatchEvent { /// /// Can only be relied upon to have metadata with resource version. /// Bookmarks contain apiVersion + kind + basically empty metadata. -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug, Default)] pub struct Bookmark { /// apiVersion + kind #[serde(flatten)] @@ -54,7 +54,7 @@ pub struct Bookmark { } /// Slimed down Metadata for WatchEvent::Bookmark -#[derive(Serialize, Deserialize, Clone)] +#[derive(Serialize, Deserialize, Clone, Debug, Default)] #[serde(rename_all = "camelCase")] pub struct BookmarkMeta { /// The only field we need from a Bookmark event. diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index 8a8f7fc3a..f9a0d2ef1 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -1683,7 +1683,7 @@ mod tests { use super::{Action, APPLIER_REQUEUE_BUF_SIZE}; use crate::{ applier, - reflector::{self, ObjectRef}, + reflector::{self, store::CacheWriter as _, ObjectRef}, watcher::{self, metadata_watcher, watcher, Event}, Config, Controller, }; @@ -1766,7 +1766,7 @@ mod tests { queue_rx.map(Result::<_, Infallible>::Ok), Config::default(), )); - store_tx.apply_watcher_event(&watcher::Event::InitDone); + store_tx.apply_watcher_event(&watcher::Event::InitDone(None)); for i in 0..items { let obj = ConfigMap { metadata: ObjectMeta { diff --git a/kube-runtime/src/reflector/dispatcher.rs b/kube-runtime/src/reflector/dispatcher.rs index 1060dab2b..3f35d4ae3 100644 --- a/kube-runtime/src/reflector/dispatcher.rs +++ b/kube-runtime/src/reflector/dispatcher.rs @@ -170,7 +170,7 @@ pub(crate) mod test { Ok(Event::Init), Ok(Event::InitApply(foo)), Ok(Event::InitApply(bar)), - Ok(Event::InitDone), + Ok(Event::InitDone(None)), ]); let (reader, writer) = reflector::store_shared(10); @@ -204,7 +204,7 @@ pub(crate) mod test { assert_eq!(reader.len(), 1); let restarted = poll!(reflect.next()); - assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitDone))))); + assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::InitDone(None)))))); assert_eq!(reader.len(), 2); assert!(matches!(poll!(reflect.next()), Poll::Ready(None))); @@ -226,7 +226,7 @@ pub(crate) mod test { Ok(Event::Init), Ok(Event::InitApply(foo.clone())), Ok(Event::InitApply(bar.clone())), - Ok(Event::InitDone), + Ok(Event::InitDone(None)), ]); let foo = Arc::new(foo); @@ -274,7 +274,7 @@ pub(crate) mod test { assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::InitDone))) + Poll::Ready(Some(Ok(Event::InitDone(None)))) )); // these don't come back in order atm: @@ -297,7 +297,7 @@ pub(crate) mod test { Ok(Event::Init), Ok(Event::InitApply(foo.clone())), Ok(Event::InitApply(bar.clone())), - Ok(Event::InitDone), + Ok(Event::InitDone(None)), ]); let foo = Arc::new(foo); @@ -339,7 +339,7 @@ pub(crate) mod test { assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::InitDone))) + Poll::Ready(Some(Ok(Event::InitDone(None)))) )); drop(reflect); diff --git a/kube-runtime/src/reflector/mod.rs b/kube-runtime/src/reflector/mod.rs index 88f4f2910..23e2b97c1 100644 --- a/kube-runtime/src/reflector/mod.rs +++ b/kube-runtime/src/reflector/mod.rs @@ -11,6 +11,7 @@ pub use self::{ use crate::watcher; use async_stream::stream; use futures::{Stream, StreamExt}; +use store::CacheWriter as _; use std::hash::Hash; #[cfg(feature = "unstable-runtime-subscribe")] pub use store::store_shared; pub use store::{store, Store}; @@ -244,7 +245,7 @@ mod tests { Ok(watcher::Event::Apply(cm_a.clone())), Ok(watcher::Event::Init), Ok(watcher::Event::InitApply(cm_b.clone())), - Ok(watcher::Event::InitDone), + Ok(watcher::Event::InitDone(None)), ]), ) .map(|_| ()) diff --git a/kube-runtime/src/reflector/store.rs b/kube-runtime/src/reflector/store.rs index d6d264dea..70800bf6d 100644 --- a/kube-runtime/src/reflector/store.rs +++ b/kube-runtime/src/reflector/store.rs @@ -13,6 +13,12 @@ use thiserror::Error; type Cache = Arc, Arc>>>; +/// A writable `CacheWriter` trait +pub trait CacheWriter { + /// Applies a single watcher event to the store + fn apply_watcher_event(&mut self, event: &watcher::Event); +} + /// A writable Store handle /// /// This is exclusive since it's not safe to share a single `Store` between multiple reflectors. @@ -98,8 +104,40 @@ where .map(|dispatcher| dispatcher.subscribe(self.as_reader())) } + /// Broadcast an event to any downstream listeners subscribed on the store + pub(crate) async fn dispatch_event(&mut self, event: &watcher::Event) { + if let Some(ref mut dispatcher) = self.dispatcher { + match event { + watcher::Event::Apply(obj) => { + let obj_ref = obj.to_object_ref(self.dyntype.clone()); + // TODO (matei): should this take a timeout to log when backpressure has + // been applied for too long, e.g. 10s + dispatcher.broadcast(obj_ref).await; + } + + watcher::Event::InitDone(_) => { + let obj_refs: Vec<_> = { + let store = self.store.read(); + store.keys().cloned().collect() + }; + + for obj_ref in obj_refs { + dispatcher.broadcast(obj_ref).await; + } + } + + _ => {} + } + } + } +} + +impl CacheWriter for Writer +where + K::DynamicType: Eq + Hash + Clone, + { /// Applies a single watcher event to the store - pub fn apply_watcher_event(&mut self, event: &watcher::Event) { + fn apply_watcher_event(&mut self, event: &watcher::Event) { match event { watcher::Event::Apply(obj) => { let key = obj.to_object_ref(self.dyntype.clone()); @@ -118,7 +156,7 @@ where let obj = Arc::new(obj.clone()); self.buffer.insert(key, obj); } - watcher::Event::InitDone => { + watcher::Event::InitDone(_) => { let mut store = self.store.write(); // Swap the buffer into the store @@ -136,33 +174,6 @@ where } } } - - /// Broadcast an event to any downstream listeners subscribed on the store - pub(crate) async fn dispatch_event(&mut self, event: &watcher::Event) { - if let Some(ref mut dispatcher) = self.dispatcher { - match event { - watcher::Event::Apply(obj) => { - let obj_ref = obj.to_object_ref(self.dyntype.clone()); - // TODO (matei): should this take a timeout to log when backpressure has - // been applied for too long, e.g. 10s - dispatcher.broadcast(obj_ref).await; - } - - watcher::Event::InitDone => { - let obj_refs: Vec<_> = { - let store = self.store.read(); - store.keys().cloned().collect() - }; - - for obj_ref in obj_refs { - dispatcher.broadcast(obj_ref).await; - } - } - - _ => {} - } - } - } } impl Default for Writer @@ -309,7 +320,7 @@ where #[cfg(test)] mod tests { use super::{store, Writer}; - use crate::{reflector::ObjectRef, watcher}; + use crate::{reflector::{store::CacheWriter as _, ObjectRef}, watcher}; use k8s_openapi::api::core::v1::ConfigMap; use kube_client::api::ObjectMeta; diff --git a/kube-runtime/src/utils/event_decode.rs b/kube-runtime/src/utils/event_decode.rs index 2a0085120..472d776d8 100644 --- a/kube-runtime/src/utils/event_decode.rs +++ b/kube-runtime/src/utils/event_decode.rs @@ -28,7 +28,7 @@ where fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { let mut me = self.project(); Poll::Ready(loop { - let var_name = match ready!(me.stream.as_mut().poll_next(cx)) { + let var_name: Option> = match ready!(me.stream.as_mut().poll_next(cx)) { Some(Ok(Event::Apply(obj) | Event::InitApply(obj))) => Some(Ok(obj)), Some(Ok(Event::Delete(obj))) => { if *me.emit_deleted { @@ -37,7 +37,7 @@ where continue; } } - Some(Ok(Event::Init | Event::InitDone)) => continue, + Some(Ok(Event::Init | Event::InitDone(_))) => continue, Some(Err(err)) => Some(Err(err)), None => return Poll::Ready(None), }; diff --git a/kube-runtime/src/utils/reflect.rs b/kube-runtime/src/utils/reflect.rs index e93354202..70324e3db 100644 --- a/kube-runtime/src/utils/reflect.rs +++ b/kube-runtime/src/utils/reflect.rs @@ -2,44 +2,53 @@ use core::{ pin::Pin, task::{Context, Poll}, }; +use std::marker::PhantomData; use futures::{Stream, TryStream}; use pin_project::pin_project; use crate::{ - reflector::store::Writer, + reflector::store::CacheWriter, watcher::{Error, Event}, }; use kube_client::Resource; /// Stream returned by the [`reflect`](super::WatchStreamExt::reflect) method #[pin_project] -pub struct Reflect +pub struct Reflect where K: Resource + Clone + 'static, K::DynamicType: Eq + std::hash::Hash + Clone, + W: CacheWriter, { #[pin] stream: St, - writer: Writer, + writer: W, + _phantom: PhantomData, } -impl Reflect +impl Reflect where St: TryStream>, K: Resource + Clone, K::DynamicType: Eq + std::hash::Hash + Clone, + W: CacheWriter, { - pub(super) fn new(stream: St, writer: Writer) -> Reflect { - Self { stream, writer } + pub(super) fn new(stream: St, writer: W) -> Reflect { + Self { + stream, + writer, + _phantom: Default::default(), + } } } -impl Stream for Reflect +impl Stream for Reflect where K: Resource + Clone, K::DynamicType: Eq + std::hash::Hash + Clone, St: Stream, Error>>, + W: CacheWriter, { type Item = Result, Error>; @@ -77,7 +86,7 @@ pub(crate) mod test { Ok(Event::Init), Ok(Event::InitApply(foo)), Ok(Event::InitApply(bar)), - Ok(Event::InitDone), + Ok(Event::InitDone(None)), ]); let (reader, writer) = reflector::store(); @@ -111,7 +120,7 @@ pub(crate) mod test { assert!(matches!( poll!(reflect.next()), - Poll::Ready(Some(Ok(Event::InitDone))) + Poll::Ready(Some(Ok(Event::InitDone(None)))) )); assert_eq!(reader.len(), 2); diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index 241871837..8e67e4eac 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -1,4 +1,5 @@ use crate::{ + reflector::store::CacheWriter, utils::{ event_decode::EventDecode, event_modify::EventModify, @@ -9,10 +10,10 @@ use crate::{ }; use kube_client::Resource; -use crate::{ - reflector::store::Writer, - utils::{Backoff, Reflect}, -}; +#[cfg(feature = "unstable-runtime-subscribe")] +use crate::reflector::store::Writer; + +use crate::utils::{Backoff, Reflect}; use crate::watcher::DefaultBackoff; use futures::{Stream, TryStream}; @@ -174,7 +175,7 @@ pub trait WatchStreamExt: Stream { /// ``` /// /// [`Store`]: crate::reflector::Store - fn reflect(self, writer: Writer) -> Reflect + fn reflect(self, writer: impl CacheWriter) -> Reflect> where Self: Stream>> + Sized, K: Resource + Clone + 'static, diff --git a/kube-runtime/src/watcher.rs b/kube-runtime/src/watcher.rs index 755320b38..fb4003170 100644 --- a/kube-runtime/src/watcher.rs +++ b/kube-runtime/src/watcher.rs @@ -9,12 +9,12 @@ use backon::BackoffBuilder; use educe::Educe; use futures::{stream::BoxStream, Stream, StreamExt}; use kube_client::{ - api::{ListParams, Resource, ResourceExt, VersionMatch, WatchEvent, WatchParams}, - core::{metadata::PartialObjectMeta, ObjectList, Selector}, + api::{DynamicObject, ListParams, ResourceExt, TypeMeta, VersionMatch, WatchEvent, WatchParams}, + core::{metadata::PartialObjectMeta, ObjectList, Resource, Selector}, error::ErrorResponse, Api, Error as ClientErr, }; -use serde::de::DeserializeOwned; +use serde::{de::DeserializeOwned, Serialize}; use std::{clone::Clone, collections::VecDeque, fmt::Debug, future, time::Duration}; use thiserror::Error; use tracing::{debug, error, warn}; @@ -64,7 +64,7 @@ pub enum Event { /// /// Any objects that were previously [`Applied`](Event::Applied) but are not listed in any of /// the `InitApply` events should be assumed to have been [`Deleted`](Event::Deleted). - InitDone, + InitDone(Option), } impl Event { @@ -79,7 +79,7 @@ impl Event { pub fn into_iter_applied(self) -> impl Iterator { match self { Self::Apply(obj) | Self::InitApply(obj) => Some(obj), - Self::Delete(_) | Self::Init | Self::InitDone => None, + Self::Delete(_) | Self::Init | Self::InitDone(_) => None, } .into_iter() } @@ -96,7 +96,7 @@ impl Event { pub fn into_iter_touched(self) -> impl Iterator { match self { Self::Apply(obj) | Self::Delete(obj) | Self::InitApply(obj) => Some(obj), - Self::Init | Self::InitDone => None, + Self::Init | Self::InitDone(_) => None, } .into_iter() } @@ -122,7 +122,7 @@ impl Event { pub fn modify(mut self, mut f: impl FnMut(&mut K)) -> Self { match &mut self { Self::Apply(obj) | Self::Delete(obj) | Self::InitApply(obj) => (f)(obj), - Self::Init | Self::InitDone => {} // markers, nothing to modify + Self::Init | Self::InitDone(_) => {} // markers, nothing to modify } self } @@ -140,12 +140,14 @@ enum State { continue_token: Option, objects: VecDeque, last_bookmark: Option, + bookmark: Option, }, /// Kubernetes 1.27 Streaming Lists /// The initial watch is in progress InitialWatch { #[educe(Debug(ignore))] stream: BoxStream<'static, kube_client::Result>>, + bookmark: Option, }, /// The initial LIST was successful, so we should move on to starting the actual watch. InitListed { resource_version: String }, @@ -456,6 +458,68 @@ where } } +/// A wrapper around the `Api` of a `Resource` type that when used by the +/// watcher will return the entire (full) object +struct TypedObject<'a, K> { + api: &'a Api, +} + +#[async_trait] +impl ApiMode for TypedObject<'_, K> +where + K: Resource + Clone + Debug + DeserializeOwned + Serialize + Send + 'static, +{ + type Value = DynamicObject; + + async fn list(&self, lp: &ListParams) -> kube_client::Result> { + let ObjectList { + types, + metadata, + items: original, + } = self.api.list(lp).await?; + + let mut items = vec![]; + for obj in original { + let value = serde_json::to_value(obj).map_err(kube_client::Error::SerdeError)?; + let mut item: DynamicObject = + serde_json::from_value(value).map_err(kube_client::Error::SerdeError)?; + item.types = Some(TypeMeta::resource::()); + items.push(item); + } + + Ok(ObjectList { + types, + metadata, + items, + }) + } + + async fn watch( + &self, + wp: &WatchParams, + version: &str, + ) -> kube_client::Result>>> { + let stream = self.api.watch(wp, version).await?; + let stream = stream.map(|r| { + let convert = |obj| { + let value = serde_json::to_value(obj).map_err(kube_client::Error::SerdeError)?; + let mut item: DynamicObject = + serde_json::from_value(value).map_err(kube_client::Error::SerdeError)?; + item.types = Some(TypeMeta::resource::()); + Ok(item) + }; + Ok(match r? { + WatchEvent::Added(obj) => WatchEvent::Added(convert(obj)?), + WatchEvent::Modified(obj) => WatchEvent::Modified(convert(obj)?), + WatchEvent::Deleted(obj) => WatchEvent::Deleted(convert(obj)?), + WatchEvent::Bookmark(bookmark) => WatchEvent::Bookmark(bookmark), + WatchEvent::Error(error_response) => WatchEvent::Error(error_response), + }) + }); + Ok(StreamExt::boxed(stream)) + } +} + /// A wrapper around the `Api` of a `Resource` type that when used by the /// watcher will return only the metadata associated with an object struct MetaOnly<'a, K> { @@ -498,13 +562,23 @@ where { match state { State::Empty => match wc.initial_list_strategy { - InitialListStrategy::ListWatch => (Some(Ok(Event::Init)), State::InitPage { - continue_token: None, - objects: VecDeque::default(), - last_bookmark: None, - }), + InitialListStrategy::ListWatch => ( + Some(Ok(Event::Init)), + State::InitPage { + continue_token: None, + objects: VecDeque::default(), + last_bookmark: None, + bookmark: Default::default(), + }, + ), InitialListStrategy::StreamingList => match api.watch(&wc.to_watch_params(), "0").await { - Ok(stream) => (None, State::InitialWatch { stream }), + Ok(stream) => ( + None, + State::InitialWatch { + stream, + bookmark: None, + }, + ), Err(err) => { if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { warn!("watch initlist error with 403: {err:?}"); @@ -519,19 +593,30 @@ where continue_token, mut objects, last_bookmark, + mut bookmark, } => { if let Some(next) = objects.pop_front() { - return (Some(Ok(Event::InitApply(next))), State::InitPage { - continue_token, - objects, - last_bookmark, - }); + if bookmark.is_none() { + bookmark = Some(next.clone()) + } + return ( + Some(Ok(Event::InitApply(next))), + State::InitPage { + continue_token, + objects, + last_bookmark, + bookmark, + }, + ); } // check if we need to perform more pages if continue_token.is_none() { if let Some(resource_version) = last_bookmark { // we have drained the last page - move on to next stage - return (Some(Ok(Event::InitDone)), State::InitListed { resource_version }); + return ( + Some(Ok(Event::InitDone(bookmark))), + State::InitListed { resource_version }, + ); } } let mut lp = wc.to_list_params(); @@ -545,11 +630,15 @@ where } // Buffer page here, causing us to return to this enum branch (State::InitPage) // until the objects buffer has drained - (None, State::InitPage { - continue_token, - objects: list.items.into_iter().collect(), - last_bookmark, - }) + ( + None, + State::InitPage { + continue_token, + objects: list.items.into_iter().collect(), + last_bookmark, + bookmark, + }, + ) } Err(err) => { if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { @@ -561,26 +650,38 @@ where } } } - State::InitialWatch { mut stream } => { + State::InitialWatch { + mut stream, + mut bookmark, + } => { match stream.next().await { Some(Ok(WatchEvent::Added(obj) | WatchEvent::Modified(obj))) => { - (Some(Ok(Event::InitApply(obj))), State::InitialWatch { stream }) + if bookmark.is_none() { + bookmark = Some(obj.clone()) + } + ( + Some(Ok(Event::InitApply(obj))), + State::InitialWatch { stream, bookmark }, + ) } Some(Ok(WatchEvent::Deleted(_obj))) => { // Kubernetes claims these events are impossible // https://kubernetes.io/docs/reference/using-api/api-concepts/#streaming-lists error!("got deleted event during initial watch. this is a bug"); - (None, State::InitialWatch { stream }) + (None, State::InitialWatch { stream, bookmark }) } Some(Ok(WatchEvent::Bookmark(bm))) => { let marks_initial_end = bm.metadata.annotations.contains_key("k8s.io/initial-events-end"); if marks_initial_end { - (Some(Ok(Event::InitDone)), State::Watching { - resource_version: bm.metadata.resource_version, - stream, - }) + ( + Some(Ok(Event::InitDone(bookmark))), + State::Watching { + resource_version: bm.metadata.resource_version, + stream, + }, + ) } else { - (None, State::InitialWatch { stream }) + (None, State::InitialWatch { stream, bookmark }) } } Some(Ok(WatchEvent::Error(err))) => { @@ -588,7 +689,7 @@ where let new_state = if err.code == 410 { State::default() } else { - State::InitialWatch { stream } + State::InitialWatch { stream, bookmark } }; if err.code == 403 { warn!("watcher watchevent error 403: {err:?}"); @@ -603,26 +704,33 @@ where } else { debug!("watcher error: {err:?}"); } - (Some(Err(Error::WatchFailed(err))), State::InitialWatch { stream }) + ( + Some(Err(Error::WatchFailed(err))), + State::InitialWatch { stream, bookmark }, + ) } None => (None, State::default()), } } State::InitListed { resource_version } => { match api.watch(&wc.to_watch_params(), &resource_version).await { - Ok(stream) => (None, State::Watching { - resource_version, - stream, - }), + Ok(stream) => ( + None, + State::Watching { + resource_version, + stream, + }, + ), Err(err) => { if std::matches!(err, ClientErr::Api(ErrorResponse { code: 403, .. })) { warn!("watch initlist error with 403: {err:?}"); } else { debug!("watch initlist error: {err:?}"); } - (Some(Err(Error::WatchStartFailed(err))), State::InitListed { - resource_version, - }) + ( + Some(Err(Error::WatchStartFailed(err))), + State::InitListed { resource_version }, + ) } } } @@ -635,10 +743,13 @@ where if resource_version.is_empty() { (Some(Err(Error::NoResourceVersion)), State::default()) } else { - (Some(Ok(Event::Apply(obj))), State::Watching { - resource_version, - stream, - }) + ( + Some(Ok(Event::Apply(obj))), + State::Watching { + resource_version, + stream, + }, + ) } } Some(Ok(WatchEvent::Deleted(obj))) => { @@ -646,16 +757,22 @@ where if resource_version.is_empty() { (Some(Err(Error::NoResourceVersion)), State::default()) } else { - (Some(Ok(Event::Delete(obj))), State::Watching { - resource_version, - stream, - }) + ( + Some(Ok(Event::Delete(obj))), + State::Watching { + resource_version, + stream, + }, + ) } } - Some(Ok(WatchEvent::Bookmark(bm))) => (None, State::Watching { - resource_version: bm.metadata.resource_version, - stream, - }), + Some(Ok(WatchEvent::Bookmark(bm))) => ( + None, + State::Watching { + resource_version: bm.metadata.resource_version, + stream, + }, + ), Some(Ok(WatchEvent::Error(err))) => { // HTTP GONE, means we have desynced and need to start over and re-list :( let new_state = if err.code == 410 { @@ -679,10 +796,13 @@ where } else { debug!("watcher error: {err:?}"); } - (Some(Err(Error::WatchFailed(err))), State::Watching { - resource_version, - stream, - }) + ( + Some(Err(Error::WatchFailed(err))), + State::Watching { + resource_version, + stream, + }, + ) } None => (None, State::InitListed { resource_version }), }, @@ -769,6 +889,23 @@ pub fn watcher( ) } +#[allow(clippy::module_name_repetitions)] +pub fn dynamic_watcher( + api: Api, + watcher_config: Config, +) -> impl Stream>> + Send +where + K: Resource + Clone + DeserializeOwned + Serialize + Debug + Send + 'static, +{ + futures::stream::unfold( + (api, watcher_config, State::default()), + |(api, watcher_config, state)| async { + let (event, state) = step(&TypedObject { api: &api }, &watcher_config, state).await; + Some((event, (api, watcher_config, state))) + }, + ) +} + /// Watches a Kubernetes Resource for changes continuously and receives only the /// metadata /// @@ -874,9 +1011,9 @@ pub fn watch_object Some(Ok(None)), // Pass up `None` if the object wasn't seen in the initial list - Ok(Event::InitDone) if !obj_seen => Some(Ok(None)), + Ok(Event::InitDone(_)) if !obj_seen => Some(Ok(None)), // Ignore marker events - Ok(Event::Init | Event::InitDone) => None, + Ok(Event::Init | Event::InitDone(_)) => None, // Bubble up errors Err(err) => Some(Err(err)), }