diff --git a/Cargo.toml b/Cargo.toml index 01de4b142..8f7ef0dab 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -34,6 +34,8 @@ missing_docs = "deny" ahash = "0.8" anyhow = "1.0.71" assert-json-diff = "2.0.2" +async-broadcast = "0.7.0" +async-stream = "0.3.5" async-trait = "0.1.64" backoff = "0.4.0" base64 = "0.22.0" diff --git a/examples/Cargo.toml b/examples/Cargo.toml index b3b71250f..5add58582 100644 --- a/examples/Cargo.toml +++ b/examples/Cargo.toml @@ -61,6 +61,10 @@ crossterm = "0.27.0" name = "configmapgen_controller" path = "configmapgen_controller.rs" +[[example]] +name = "shared_stream_controllers" +path = "shared_stream_controllers.rs" + [[example]] name = "crd_api" path = "crd_api.rs" diff --git a/examples/shared_stream_controllers.rs b/examples/shared_stream_controllers.rs new file mode 100644 index 000000000..e14e73148 --- /dev/null +++ b/examples/shared_stream_controllers.rs @@ -0,0 +1,187 @@ +use std::{sync::Arc, time::Duration}; + +use futures::StreamExt; +use k8s_openapi::api::core::v1::{Pod, PodCondition}; +use kube::{ + api::{Patch, PatchParams}, + runtime::{ + controller::Action, + reflector::{self}, + watcher, Config, Controller, WatchStreamExt, + }, + Api, Client, ResourceExt, +}; +use tracing::{debug, error, info, warn}; + +use thiserror::Error; + +// Helper module that namespaces two constants describing a Kubernetes status condition +pub mod condition { + pub static UNDOCUMENTED_TYPE: &str = "UndocumentedPort"; + pub static STATUS_TRUE: &str = "True"; +} + +const SUBSCRIBE_BUFFER_SIZE: usize = 256; + +#[derive(Debug, Error)] +enum Error { + #[error("Failed to patch pod: {0}")] + WriteFailed(#[source] kube::Error), + + #[error("Missing po field: {0}")] + MissingField(&'static str), +} + +#[derive(Clone)] +struct Data { + client: Client, +} + +/// A simple reconciliation function that will copy a pod's labels into the annotations. +async fn reconcile_metadata(pod: Arc, ctx: Arc) -> Result { + let namespace = &pod.namespace().unwrap_or_default(); + if namespace == "kube-system" { + return Ok(Action::await_change()); + } + + let mut pod = (*pod).clone(); + pod.metadata.managed_fields = None; + // combine labels and annotations into a new map + let labels = pod.labels().clone().into_iter(); + pod.annotations_mut().extend(labels); + + let pod_api = Api::::namespaced( + ctx.client.clone(), + pod.metadata + .namespace + .as_ref() + .ok_or_else(|| Error::MissingField(".metadata.name"))?, + ); + + pod_api + .patch( + &pod.name_any(), + &PatchParams::apply("controller-1"), + &Patch::Apply(&pod), + ) + .await + .map_err(Error::WriteFailed)?; + + Ok(Action::requeue(Duration::from_secs(300))) +} + +/// Another reconiliation function that will add an 'UndocumentedPort' condition to pods that do +/// do not have any ports declared across all containers. +async fn reconcile_status(pod: Arc, ctx: Arc) -> Result { + for container in pod.spec.clone().unwrap_or_default().containers.iter() { + if container.ports.clone().unwrap_or_default().len() != 0 { + debug!(name = %pod.name_any(), "Skipped updating pod with documented ports"); + return Ok(Action::await_change()); + } + } + + let pod_api = Api::::namespaced( + ctx.client.clone(), + pod.metadata + .namespace + .as_ref() + .ok_or_else(|| Error::MissingField(".metadata.name"))?, + ); + + let undocumented_condition = PodCondition { + type_: condition::UNDOCUMENTED_TYPE.into(), + status: condition::STATUS_TRUE.into(), + ..Default::default() + }; + let value = serde_json::json!({ + "status": { + "name": pod.name_any(), + "kind": "Pod", + "conditions": vec![undocumented_condition] + } + }); + pod_api + .patch_status( + &pod.name_any(), + &PatchParams::apply("controller-2"), + &Patch::Strategic(value), + ) + .await + .map_err(Error::WriteFailed)?; + + Ok(Action::requeue(Duration::from_secs(300))) +} + +fn error_policy(obj: Arc, error: &Error, _ctx: Arc) -> Action { + error!(%error, name = %obj.name_any(), "Failed reconciliation"); + Action::requeue(Duration::from_secs(10)) +} + +#[tokio::main] +async fn main() -> anyhow::Result<()> { + tracing_subscriber::fmt::init(); + + let client = Client::try_default().await?; + let pods = Api::::namespaced(client.clone(), "default"); + let config = Config::default().concurrency(2); + let ctx = Arc::new(Data { client }); + + // Create a shared store with a predefined buffer that will be shared between subscribers. + let (reader, writer) = reflector::store_shared(SUBSCRIBE_BUFFER_SIZE); + // Before threading an object watch through the store, create a subscriber. + // Any number of subscribers can be created from one writer. + let subscriber = writer + .subscribe() + .expect("subscribers can only be created from shared stores"); + + // Reflect a stream of pod watch events into the store and apply a backoff. For subscribers to + // be able to consume updates, the reflector must be shared. + let pod_watch = watcher(pods.clone(), Default::default()) + .default_backoff() + .reflect_shared(writer) + .for_each(|res| async move { + match res { + Ok(event) => debug!("Received event on root stream {event:?}"), + Err(error) => error!(%error, "Unexpected error when watching resource"), + } + }); + + // Create the first controller using the reconcile_metadata function. Controllers accept + // subscribers through a dedicated interface. + let metadata_controller = Controller::for_shared_stream(subscriber.clone(), reader) + .with_config(config.clone()) + .shutdown_on_signal() + .run(reconcile_metadata, error_policy, ctx.clone()) + .for_each(|res| async move { + match res { + Ok(v) => info!("Reconciled metadata {v:?}"), + Err(error) => warn!(%error, "Failed to reconcile metadata"), + } + }); + + // Subscribers can be used to get a read handle on the store, if the initial handle has been + // moved or dropped. + let reader = subscriber.reader(); + // Create the second controller using the reconcile_status function. + let status_controller = Controller::for_shared_stream(subscriber, reader) + .with_config(config) + .shutdown_on_signal() + .run(reconcile_status, error_policy, ctx) + .for_each(|res| async move { + match res { + Ok(v) => info!("Reconciled status {v:?}"), + Err(error) => warn!(%error, "Failed to reconcile status"), + } + }); + + // Drive streams to readiness. The initial watch (that is reflected) needs to be driven to + // consume events from the API Server and forward them to subscribers. + // + // Both controllers will operate on shared objects. + tokio::select! { + _ = futures::future::join(metadata_controller, status_controller) => {}, + _ = pod_watch => {} + } + + Ok(()) +} diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml index 51e235557..233eeae52 100644 --- a/kube-runtime/Cargo.toml +++ b/kube-runtime/Cargo.toml @@ -48,6 +48,8 @@ backoff.workspace = true async-trait.workspace = true hashbrown.workspace = true k8s-openapi.workspace = true +async-broadcast.workspace = true +async-stream.workspace = true [dev-dependencies] kube = { path = "../kube", features = ["derive", "client", "runtime"], version = "<1.0.0, >=0.60.0" } diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs index b449d5ecc..9dedd1f22 100644 --- a/kube-runtime/src/controller/mod.rs +++ b/kube-runtime/src/controller/mod.rs @@ -123,6 +123,28 @@ where }) } +/// Enqueues the object itself for reconciliation when the object is behind a +/// shared pointer +#[cfg(feature = "unstable-runtime-subscribe")] +fn trigger_self_shared( + stream: S, + dyntype: K::DynamicType, +) -> impl Stream, S::Error>> +where + // Input stream has item as some Arc'd Resource (via + // Controller::for_shared_stream) + S: TryStream>, + K: Resource, + K::DynamicType: Clone, +{ + trigger_with(stream, move |obj| { + Some(ReconcileRequest { + obj_ref: ObjectRef::from_obj_with(obj.as_ref(), dyntype.clone()), + reason: ReconcileReason::ObjectUpdated, + }) + }) +} + /// Enqueues any mapper returned `K` types for reconciliation fn trigger_others( stream: S, @@ -703,6 +725,117 @@ where } } + /// This is the same as [`Controller::for_stream`]. Instead of taking an + /// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared + /// streams can be created out-of-band by subscribing on a store `Writer`. + /// Through this interface, multiple controllers can use the same root + /// (shared) input stream of resources to keep memory overheads smaller. + /// + /// **N.B**: This constructor requires an + /// [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) + /// feature. + /// + /// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not + /// need to share the stream. + /// + /// ## Warning: + /// + /// You **must** ensure the root stream (i.e. stream created through a `reflector()`) + /// is driven to readiness independently of this controller to ensure the + /// watcher never deadlocks. + /// + /// # Example: + /// + /// ```no_run + /// # use futures::StreamExt; + /// # use k8s_openapi::api::apps::v1::Deployment; + /// # use kube::runtime::controller::{Action, Controller}; + /// # use kube::runtime::{predicates, watcher, reflector, WatchStreamExt}; + /// # use kube::{Api, Client, Error, ResourceExt}; + /// # use std::sync::Arc; + /// # async fn reconcile(_: Arc, _: Arc<()>) -> Result { Ok(Action::await_change()) } + /// # fn error_policy(_: Arc, _: &kube::Error, _: Arc<()>) -> Action { Action::await_change() } + /// # async fn doc(client: kube::Client) { + /// let api: Api = Api::default_namespaced(client); + /// let (reader, writer) = reflector::store_shared(128); + /// let subscriber = writer + /// .subscribe() + /// .expect("subscribers can only be created from shared stores"); + /// let deploys = watcher(api, watcher::Config::default()) + /// .default_backoff() + /// .reflect(writer) + /// .applied_objects() + /// .for_each(|ev| async move { + /// match ev { + /// Ok(obj) => tracing::info!("got obj {obj:?}"), + /// Err(error) => tracing::error!(%error, "received error") + /// } + /// }); + /// + /// let controller = Controller::for_shared_stream(subscriber, reader) + /// .run(reconcile, error_policy, Arc::new(())) + /// .for_each(|ev| async move { + /// tracing::info!("reconciled {ev:?}") + /// }); + /// + /// // Drive streams using a select statement + /// tokio::select! { + /// _ = deploys => {}, + /// _ = controller => {}, + /// } + /// # } + #[cfg(feature = "unstable-runtime-subscribe")] + pub fn for_shared_stream(trigger: impl Stream> + Send + 'static, reader: Store) -> Self + where + K::DynamicType: Default, + { + Self::for_shared_stream_with(trigger, reader, Default::default()) + } + + /// This is the same as [`Controller::for_stream`]. Instead of taking an + /// `Api` (e.g. [`Controller::new`]), a stream of resources is used. Shared + /// streams can be created out-of-band by subscribing on a store `Writer`. + /// Through this interface, multiple controllers can use the same root + /// (shared) input stream of resources to keep memory overheads smaller. + /// + /// **N.B**: This constructor requires an + /// [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) + /// feature. + /// + /// Prefer [`Controller::new`] or [`Controller::for_stream`] if you do not + /// need to share the stream. + /// + /// This variant constructor is used for [`dynamic`] types found through + /// discovery. Prefer [`Controller::for_shared_stream`] for static types (i.e. + /// known at compile time). + /// + /// [`dynamic`]: kube_client::core::dynamic + #[cfg(feature = "unstable-runtime-subscribe")] + pub fn for_shared_stream_with( + trigger: impl Stream> + Send + 'static, + reader: Store, + dyntype: K::DynamicType, + ) -> Self { + let mut trigger_selector = stream::SelectAll::new(); + let self_watcher = trigger_self_shared(trigger.map(Ok), dyntype.clone()).boxed(); + trigger_selector.push(self_watcher); + Self { + trigger_selector, + trigger_backoff: Box::::default(), + graceful_shutdown_selector: vec![ + // Fallback future, ensuring that we never terminate if no additional futures are added to the selector + future::pending().boxed(), + ], + forceful_shutdown_selector: vec![ + // Fallback future, ensuring that we never terminate if no additional futures are added to the selector + future::pending().boxed(), + ], + dyntype, + reader, + config: Default::default(), + } + } + /// Specify the configuration for the controller's behavior. #[must_use] pub fn with_config(mut self, config: Config) -> Self { diff --git a/kube-runtime/src/reflector/dispatcher.rs b/kube-runtime/src/reflector/dispatcher.rs new file mode 100644 index 000000000..7fb1a2cf0 --- /dev/null +++ b/kube-runtime/src/reflector/dispatcher.rs @@ -0,0 +1,371 @@ +use core::{ + pin::Pin, + task::{Context, Poll}, +}; +use std::{fmt::Debug, sync::Arc}; + +use derivative::Derivative; +use futures::Stream; +use pin_project::pin_project; +use std::task::ready; + +use crate::reflector::{ObjectRef, Store}; +use async_broadcast::{InactiveReceiver, Receiver, Sender}; + +use super::Lookup; + +#[derive(Derivative)] +#[derivative(Debug(bound = "K: Debug, K::DynamicType: Debug"), Clone)] +// A helper type that holds a broadcast transmitter and a broadcast receiver, +// used to fan-out events from a root stream to multiple listeners. +pub(crate) struct Dispatcher +where + K: Lookup + Clone + 'static, + K::DynamicType: Eq + std::hash::Hash + Clone, +{ + dispatch_tx: Sender>, + // An inactive reader that prevents the channel from closing until the + // writer is dropped. + _dispatch_rx: InactiveReceiver>, +} + +impl Dispatcher +where + K: Lookup + Clone + 'static, + K::DynamicType: Eq + std::hash::Hash + Clone, +{ + /// Creates and returns a new self that wraps a broadcast sender and an + /// inactive broadcast receiver + /// + /// A buffer size is required to create the underlying broadcast channel. + /// Messages will be buffered until all active readers have received a copy + /// of the message. When the channel is full, senders will apply + /// backpressure by waiting for space to free up. + // + // N.B messages are eagerly broadcasted, meaning no active receivers are + // required for a message to be broadcasted. + pub(crate) fn new(buf_size: usize) -> Dispatcher { + // Create a broadcast (tx, rx) pair + let (mut dispatch_tx, dispatch_rx) = async_broadcast::broadcast(buf_size); + // The tx half will not wait for any receivers to be active before + // broadcasting events. If no receivers are active, events will be + // buffered. + dispatch_tx.set_await_active(false); + Self { + dispatch_tx, + _dispatch_rx: dispatch_rx.deactivate(), + } + } + + // Calls broadcast on the channel. Will return when the channel has enough + // space to send an event. + pub(crate) async fn broadcast(&mut self, obj_ref: ObjectRef) { + let _ = self.dispatch_tx.broadcast_direct(obj_ref).await; + } + + // Creates a `ReflectHandle` by creating a receiver from the tx half. + // N.B: the new receiver will be fast-forwarded to the _latest_ event. + // The receiver won't have access to any events that are currently waiting + // to be acked by listeners. + pub(crate) fn subscribe(&self, reader: Store) -> ReflectHandle { + ReflectHandle::new(reader, self.dispatch_tx.new_receiver()) + } +} + +/// A handle to a shared stream reader +/// +/// [`ReflectHandle`]s are created by calling [`subscribe()`] on a [`Writer`], +/// or by calling `clone()` on an already existing [`ReflectHandle`]. Each +/// shared stream reader should be polled independently and driven to readiness +/// to avoid deadlocks. When the [`Writer`]'s buffer is filled, backpressure +/// will be applied on the root stream side. +/// +/// When the root stream is dropped, or it ends, all [`ReflectHandle`]s +/// subscribed to the stream will also terminate after all events yielded by +/// the root stream have been observed. This means [`ReflectHandle`] streams +/// can still be polled after the root stream has been dropped. +/// +/// [`Writer`]: crate::reflector::Writer +#[pin_project] +pub struct ReflectHandle +where + K: Lookup + Clone + 'static, + K::DynamicType: Eq + std::hash::Hash + Clone, +{ + #[pin] + rx: Receiver>, + reader: Store, +} + +impl Clone for ReflectHandle +where + K: Lookup + Clone + 'static, + K::DynamicType: Eq + std::hash::Hash + Clone, +{ + fn clone(&self) -> Self { + ReflectHandle::new(self.reader.clone(), self.rx.clone()) + } +} + +impl ReflectHandle +where + K: Lookup + Clone, + K::DynamicType: Eq + std::hash::Hash + Clone, +{ + pub(super) fn new(reader: Store, rx: Receiver>) -> ReflectHandle { + Self { rx, reader } + } + + #[must_use] + pub fn reader(&self) -> Store { + self.reader.clone() + } +} + +impl Stream for ReflectHandle +where + K: Lookup + Clone, + K::DynamicType: Eq + std::hash::Hash + Clone + Default, +{ + type Item = Arc; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut this = self.project(); + match ready!(this.rx.as_mut().poll_next(cx)) { + Some(obj_ref) => this + .reader + .get(&obj_ref) + .map_or(Poll::Pending, |obj| Poll::Ready(Some(obj))), + None => Poll::Ready(None), + } + } +} + +#[cfg(feature = "unstable-runtime-subscribe")] +#[cfg(test)] +pub(crate) mod test { + use crate::{ + watcher::{Error, Event}, + WatchStreamExt, + }; + use std::{sync::Arc, task::Poll, vec}; + + use crate::reflector; + use futures::{pin_mut, poll, stream, StreamExt}; + use k8s_openapi::api::core::v1::Pod; + + fn testpod(name: &str) -> Pod { + let mut pod = Pod::default(); + pod.metadata.name = Some(name.to_string()); + pod + } + + #[tokio::test] + async fn events_are_passed_through() { + let foo = testpod("foo"); + let bar = testpod("bar"); + let st = stream::iter([ + Ok(Event::Applied(foo.clone())), + Err(Error::TooManyObjects), + Ok(Event::Restarted(vec![foo, bar])), + ]); + + let (reader, writer) = reflector::store_shared(10); + let reflect = st.reflect_shared(writer); + pin_mut!(reflect); + + // Prior to any polls, we should have an empty store. + assert_eq!(reader.len(), 0); + assert!(matches!( + poll!(reflect.next()), + Poll::Ready(Some(Ok(Event::Applied(_)))) + )); + + // Make progress and assert all events are seen + assert_eq!(reader.len(), 1); + assert!(matches!( + poll!(reflect.next()), + Poll::Ready(Some(Err(Error::TooManyObjects))) + )); + assert_eq!(reader.len(), 1); + + let restarted = poll!(reflect.next()); + assert!(matches!(restarted, Poll::Ready(Some(Ok(Event::Restarted(_)))))); + assert_eq!(reader.len(), 2); + + assert!(matches!(poll!(reflect.next()), Poll::Ready(None))); + assert_eq!(reader.len(), 2); + } + + #[tokio::test] + async fn readers_yield_touched_objects() { + // Readers should yield touched objects they receive from Stream events. + // + // NOTE: a Delete(_) event will be ignored if the item does not exist in + // the cache. Same with a Restarted(vec![delete_item]) + let foo = testpod("foo"); + let bar = testpod("bar"); + let st = stream::iter([ + Ok(Event::Deleted(foo.clone())), + Ok(Event::Applied(foo.clone())), + Err(Error::TooManyObjects), + Ok(Event::Restarted(vec![foo.clone(), bar.clone()])), + ]); + + let foo = Arc::new(foo); + let bar = Arc::new(bar); + + let (_, writer) = reflector::store_shared(10); + let subscriber = writer.subscribe().unwrap(); + let reflect = st.reflect_shared(writer); + pin_mut!(reflect); + pin_mut!(subscriber); + + // Deleted events should be skipped by subscriber. + assert!(matches!( + poll!(reflect.next()), + Poll::Ready(Some(Ok(Event::Deleted(_)))) + )); + assert_eq!(poll!(subscriber.next()), Poll::Pending); + + assert!(matches!( + poll!(reflect.next()), + Poll::Ready(Some(Ok(Event::Applied(_)))) + )); + assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone()))); + + // Errors are not propagated to subscribers. + assert!(matches!( + poll!(reflect.next()), + Poll::Ready(Some(Err(Error::TooManyObjects))) + )); + assert!(matches!(poll!(subscriber.next()), Poll::Pending)); + + // Restart event will yield all objects in the list + assert!(matches!( + poll!(reflect.next()), + Poll::Ready(Some(Ok(Event::Restarted(_)))) + )); + assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone()))); + assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(bar.clone()))); + + // When main channel is closed, it is propagated to subscribers + assert!(matches!(poll!(reflect.next()), Poll::Ready(None))); + assert_eq!(poll!(subscriber.next()), Poll::Ready(None)); + } + + #[tokio::test] + async fn readers_yield_when_tx_drops() { + // Once the main stream is dropped, readers should continue to make + // progress and read values that have been sent on the channel. + let foo = testpod("foo"); + let bar = testpod("bar"); + let st = stream::iter([ + Ok(Event::Applied(foo.clone())), + Ok(Event::Restarted(vec![foo.clone(), bar.clone()])), + ]); + + let foo = Arc::new(foo); + let bar = Arc::new(bar); + + let (_, writer) = reflector::store_shared(10); + let subscriber = writer.subscribe().unwrap(); + let mut reflect = Box::pin(st.reflect_shared(writer)); + pin_mut!(subscriber); + + assert!(matches!( + poll!(reflect.next()), + Poll::Ready(Some(Ok(Event::Applied(_)))) + )); + assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone()))); + + // Restart event will yield all objects in the list. Broadcast values + // without polling and then drop. + // + // First, subscribers should be pending. + assert_eq!(poll!(subscriber.next()), Poll::Pending); + assert!(matches!( + poll!(reflect.next()), + Poll::Ready(Some(Ok(Event::Restarted(_)))) + )); + drop(reflect); + + assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone()))); + assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(bar.clone()))); + assert_eq!(poll!(subscriber.next()), Poll::Ready(None)); + } + + #[tokio::test] + async fn reflect_applies_backpressure() { + // When the channel is full, we should observe backpressure applied. + // + // This will be manifested by receiving Poll::Pending on the reflector + // stream while the reader stream is not polled. Once we unblock the + // buffer, the reflector will make progress. + let foo = testpod("foo"); + let bar = testpod("bar"); + let st = stream::iter([ + Ok(Event::Applied(foo.clone())), + Ok(Event::Restarted(vec![foo.clone(), bar.clone()])), + ]); + + let foo = Arc::new(foo); + let bar = Arc::new(bar); + + let (_, writer) = reflector::store_shared(1); + let subscriber = writer.subscribe().unwrap(); + let subscriber_slow = writer.subscribe().unwrap(); + let reflect = st.reflect_shared(writer); + pin_mut!(reflect); + pin_mut!(subscriber); + pin_mut!(subscriber_slow); + + assert_eq!(poll!(subscriber.next()), Poll::Pending); + assert_eq!(poll!(subscriber_slow.next()), Poll::Pending); + + // Poll first subscriber, but not the second. + // + // The buffer can hold one value, so even if we have a slow subscriber, + // we will still get an event from the root. + assert!(matches!( + poll!(reflect.next()), + Poll::Ready(Some(Ok(Event::Applied(_)))) + )); + assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone()))); + // One subscriber is not reading, so we need to apply backpressure until + // channel has capacity. + // + // At this point, the buffer is full. Polling again will trigger the + // backpressure logic. + assert!(matches!(poll!(reflect.next()), Poll::Pending)); + + // Our "fast" subscriber will also have nothing else to poll until the + // slower subscriber advances its pointer in the buffer. + assert_eq!(poll!(subscriber.next()), Poll::Pending); + + // Advance slow reader + assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(foo.clone()))); + + // We now have room for only one more item. In total, the previous event + // had two. We repeat the same pattern. + assert!(matches!(poll!(reflect.next()), Poll::Pending)); + assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(foo.clone()))); + assert!(matches!(poll!(reflect.next()), Poll::Pending)); + assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(foo.clone()))); + assert!(matches!( + poll!(reflect.next()), + Poll::Ready(Some(Ok(Event::Restarted(_)))) + )); + // Poll again to drain the queue. + assert!(matches!(poll!(reflect.next()), Poll::Ready(None))); + assert_eq!(poll!(subscriber.next()), Poll::Ready(Some(bar.clone()))); + assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(Some(bar.clone()))); + + assert_eq!(poll!(subscriber.next()), Poll::Ready(None)); + assert_eq!(poll!(subscriber_slow.next()), Poll::Ready(None)); + } + + // TODO (matei): tests around cloning subscribers once a watch stream has already + // been established. This will depend on the interfaces & impl so are left + // out for now. +} diff --git a/kube-runtime/src/reflector/mod.rs b/kube-runtime/src/reflector/mod.rs index c25b76721..2c6048ba9 100644 --- a/kube-runtime/src/reflector/mod.rs +++ b/kube-runtime/src/reflector/mod.rs @@ -1,12 +1,18 @@ //! Caches objects in memory +mod dispatcher; mod object_ref; pub mod store; -pub use self::object_ref::{Extra as ObjectRefExtra, Lookup, ObjectRef}; +pub use self::{ + dispatcher::ReflectHandle, + object_ref::{Extra as ObjectRefExtra, Lookup, ObjectRef}, +}; use crate::watcher; -use futures::{Stream, TryStreamExt}; +use async_stream::stream; +use futures::{Stream, StreamExt}; use std::hash::Hash; +#[cfg(feature = "unstable-runtime-subscribe")] pub use store::store_shared; pub use store::{store, Store}; /// Cache objects from a [`watcher()`] stream into a local [`Store`] @@ -98,7 +104,19 @@ where K::DynamicType: Eq + Hash + Clone, W: Stream>>, { - stream.inspect_ok(move |event| writer.apply_watcher_event(event)) + let mut stream = Box::pin(stream); + stream! { + while let Some(event) = stream.next().await { + match event { + Ok(ev) => { + writer.apply_watcher_event(&ev); + writer.dispatch_event(&ev).await; + yield Ok(ev); + }, + Err(ev) => yield Err(ev) + } + } + } } #[cfg(test)] diff --git a/kube-runtime/src/reflector/store.rs b/kube-runtime/src/reflector/store.rs index 8fc51b638..a78976c19 100644 --- a/kube-runtime/src/reflector/store.rs +++ b/kube-runtime/src/reflector/store.rs @@ -1,4 +1,4 @@ -use super::{Lookup, ObjectRef}; +use super::{dispatcher::Dispatcher, Lookup, ObjectRef, ReflectHandle}; use crate::{ utils::delayed_init::{self, DelayedInit}, watcher, @@ -16,14 +16,15 @@ type Cache = Arc, Arc>>>; /// This is exclusive since it's not safe to share a single `Store` between multiple reflectors. /// In particular, `Restarted` events will clobber the state of other connected reflectors. #[derive(Debug)] -pub struct Writer +pub struct Writer where - K::DynamicType: Eq + Hash, + K::DynamicType: Eq + Hash + Clone, { store: Cache, dyntype: K::DynamicType, ready_tx: Option>, ready_rx: Arc>, + dispatcher: Option>, } impl Writer @@ -41,6 +42,28 @@ where dyntype, ready_tx: Some(ready_tx), ready_rx: Arc::new(ready_rx), + dispatcher: None, + } + } + + /// Creates a new Writer with the specified dynamic type and buffer size. + /// + /// When the Writer is created through `new_shared`, it will be able to + /// be subscribed. Stored objects will be propagated to all subscribers. The + /// buffer size is used for the underlying channel. An object is cleared + /// from the buffer only when all subscribers have seen it. + /// + /// If the dynamic type is default-able (for example when writer is used with + /// `k8s_openapi` types) you can use `Default` instead. + #[cfg(feature = "unstable-runtime-subscribe")] + pub fn new_shared(buf_size: usize, dyntype: K::DynamicType) -> Self { + let (ready_tx, ready_rx) = DelayedInit::new(); + Writer { + store: Default::default(), + dyntype, + ready_tx: Some(ready_tx), + ready_rx: Arc::new(ready_rx), + dispatcher: Some(Dispatcher::new(buf_size)), } } @@ -56,6 +79,16 @@ where } } + /// Return a handle to a subscriber + /// + /// Multiple subscribe handles may be obtained, by either calling + /// `subscribe` multiple times, or by calling `clone()` + pub fn subscribe(&self) -> Option> { + self.dispatcher + .as_ref() + .map(|dispatcher| dispatcher.subscribe(self.as_reader())) + } + /// Applies a single watcher event to the store pub fn apply_watcher_event(&mut self, event: &watcher::Event) { match event { @@ -82,7 +115,30 @@ where ready_tx.init(()) } } + + /// Broadcast an event to any downstream listeners subscribed on the store + pub(crate) async fn dispatch_event(&mut self, event: &watcher::Event) { + if let Some(ref mut dispatcher) = self.dispatcher { + match event { + watcher::Event::Applied(obj) => { + let obj_ref = obj.to_object_ref(self.dyntype.clone()); + // TODO (matei): should this take a timeout to log when backpressure has + // been applied for too long, e.g. 10s + dispatcher.broadcast(obj_ref).await; + } + + watcher::Event::Restarted(new_objs) => { + let obj_refs = new_objs.iter().map(|obj| obj.to_object_ref(self.dyntype.clone())); + for obj_ref in obj_refs { + dispatcher.broadcast(obj_ref).await; + } + } + watcher::Event::Deleted(_) => {} + } + } + } } + impl Default for Writer where K: Lookup + Clone + 'static, @@ -203,6 +259,27 @@ where (r, w) } +/// Create a (Reader, Writer) for a `Store` for a typed resource `K` +/// +/// The resulting `Writer` can be subscribed on in order to fan out events from +/// a watcher. The `Writer` should be passed to a [`reflector`](crate::reflector()), +/// and the [`Store`] is a read-only handle. +/// +/// A buffer size is used for the underlying message channel. When the buffer is +/// full, backpressure will be applied by waiting for capacity. +#[must_use] +#[allow(clippy::module_name_repetitions)] +#[cfg(feature = "unstable-runtime-subscribe")] +pub fn store_shared(buf_size: usize) -> (Store, Writer) +where + K: Lookup + Clone + 'static, + K::DynamicType: Eq + Hash + Clone + Default, +{ + let w = Writer::::new_shared(buf_size, Default::default()); + let r = w.as_reader(); + (r, w) +} + #[cfg(test)] mod tests { use super::{store, Writer}; diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index fca3a4a4d..f2033fc3d 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -160,7 +160,7 @@ pub trait WatchStreamExt: Stream { /// impl Stream + Send + Sized + 'static, /// ) /// where - /// K: Debug + Send + Sync + 'static, + /// K: Clone + Debug + Send + Sync + 'static, /// S: Stream, watcher::Error>> + Send + Sized + 'static, /// { /// // Create a stream that can be subscribed to @@ -189,6 +189,7 @@ pub trait WatchStreamExt: Stream { fn stream_subscribe(self) -> StreamSubscribe where Self: Stream, watcher::Error>> + Send + Sized + 'static, + K: Clone, { StreamSubscribe::new(self) } @@ -247,6 +248,91 @@ pub trait WatchStreamExt: Stream { { Reflect::new(self, writer) } + + /// Reflect a shared [`watcher()`] stream into a [`Store`] through a [`Writer`] + /// + /// Returns the stream unmodified, but passes every [`watcher::Event`] + /// through a [`Writer`]. This populates a [`Store`] as the stream is + /// polled. When the [`watcher::Event`] is not an error or a + /// [`watcher::Event::Deleted`] then its inner object will also be + /// propagated to subscribers. + /// + /// Subscribers can be created by calling [`subscribe()`] on a [`Writer`]. + /// This will return a [`ReflectHandle`] stream that should be polled + /// independently. When the root stream is dropped, or it ends, all [`ReflectHandle`]s + /// subscribed to the stream will also terminate after all events yielded by + /// the root stream have been observed. This means [`ReflectHandle`] streams + /// can still be polled after the root stream has been dropped. + /// + /// **NB**: This adapter requires an + /// [`unstable`](https://github.com/kube-rs/kube/blob/main/kube-runtime/Cargo.toml#L17-L21) + /// feature + /// + /// ## Warning + /// + /// If the root [`Stream`] is not polled, [`ReflectHandle`] streams will + /// never receive any events. This will cause the streams to deadlock since + /// the root stream will apply backpressure when downstream readers are not + /// consuming events. + /// + /// + /// [`Store`]: crate::reflector::Store + /// [`ReflectHandle`]: crate::reflector::dispatcher::ReflectHandle + /// ## Usage + /// ```no_run + /// # use futures::{pin_mut, Stream, StreamExt, TryStreamExt}; + /// # use std::time::Duration; + /// # use tracing::{info, warn}; + /// use kube::{Api, Client, ResourceExt}; + /// use kube_runtime::{watcher, WatchStreamExt, reflector}; + /// use k8s_openapi::api::apps::v1::Deployment; + /// # async fn wrapper() -> Result<(), Box> { + /// # let client: kube::Client = todo!(); + /// + /// let deploys: Api = Api::default_namespaced(client); + /// let subscriber_buf_sz = 100; + /// let (reader, writer) = reflector::store_shared::(subscriber_buf_sz); + /// let subscriber = &writer.subscribe().unwrap(); + /// + /// tokio::spawn(async move { + /// // start polling the store once the reader is ready + /// reader.wait_until_ready().await.unwrap(); + /// loop { + /// let names = reader.state().iter().map(|d| d.name_any()).collect::>(); + /// info!("Current {} deploys: {:?}", names.len(), names); + /// tokio::time::sleep(Duration::from_secs(10)).await; + /// } + /// }); + /// + /// // configure the watcher stream and populate the store while polling + /// watcher(deploys, watcher::Config::default()) + /// .reflect_shared(writer) + /// .applied_objects() + /// .for_each(|res| async move { + /// match res { + /// Ok(o) => info!("saw in root stream {}", o.name_any()), + /// Err(e) => warn!("watcher error in root stream: {}", e), + /// } + /// }) + /// .await; + /// + /// // subscriber can be used to receive applied_objects + /// subscriber.for_each(|obj| async move { + /// info!("saw in subscriber {}", &obj.name_any()) + /// }).await; + /// + /// # Ok(()) + /// # } + /// ``` + #[cfg(feature = "unstable-runtime-subscribe")] + fn reflect_shared(self, writer: Writer) -> impl Stream + where + Self: Stream>> + Sized, + K: Resource + Clone + 'static, + K::DynamicType: Eq + std::hash::Hash + Clone, + { + crate::reflector(writer, self) + } } impl WatchStreamExt for St where St: Stream {}