diff --git a/examples/pod_reflector.rs b/examples/pod_reflector.rs index 4c64db0e2..cbcda3d46 100644 --- a/examples/pod_reflector.rs +++ b/examples/pod_reflector.rs @@ -28,13 +28,11 @@ async fn main() -> anyhow::Result<()> { } }); - let stream = watcher(api, watcher::Config::default()).map_ok(|ev| { - ev.modify(|pod| { - // memory optimization for our store - we don't care about fields/annotations/status - pod.managed_fields_mut().clear(); - pod.annotations_mut().clear(); - pod.status = None; - }) + let stream = watcher(api, watcher::Config::default()).modify(|pod| { + // memory optimization for our store - we don't care about managed fields/annotations/status + pod.managed_fields_mut().clear(); + pod.annotations_mut().clear(); + pod.status = None; }); let rf = reflector(writer, stream) diff --git a/kube-runtime/src/utils/event_modify.rs b/kube-runtime/src/utils/event_modify.rs new file mode 100644 index 000000000..1681d89c0 --- /dev/null +++ b/kube-runtime/src/utils/event_modify.rs @@ -0,0 +1,72 @@ +use core::{ + pin::Pin, + task::{Context, Poll}, +}; +use std::task::ready; + +use futures::{Stream, TryStream}; +use pin_project::pin_project; + +use crate::watcher::{Error, Event}; + +#[pin_project] +/// Stream returned by the [`modify`](super::WatchStreamExt::modify) method. +/// Modifies the [`Event`] item returned by the inner stream by calling +/// [`modify`](Event::modify()) on it. +pub struct EventModify { + #[pin] + stream: St, + f: F, +} + +impl EventModify +where + St: TryStream>, + F: FnMut(&mut K), +{ + pub(super) fn new(stream: St, f: F) -> EventModify { + Self { stream, f } + } +} + +impl Stream for EventModify +where + St: Stream, Error>>, + F: FnMut(&mut K), +{ + type Item = Result, Error>; + + fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { + let mut me = self.project(); + Poll::Ready(match ready!(me.stream.as_mut().poll_next(cx)) { + Some(Ok(event)) => Some(Ok(event.modify(me.f))), + Some(Err(err)) => Some(Err(err)), + None => return Poll::Ready(None), + }) + } +} + +#[cfg(test)] +pub(crate) mod test { + use std::task::Poll; + + use super::{Error, Event, EventModify}; + use futures::{pin_mut, poll, stream, StreamExt}; + + #[tokio::test] + async fn eventmodify_modifies_innner_value_of_event() { + let st = stream::iter([Ok(Event::Applied(0)), Err(Error::TooManyObjects)]); + let ev_modify = EventModify::new(st, |x| { + *x += 1; + }); + pin_mut!(ev_modify); + assert!(matches!( + poll!(ev_modify.next()), + Poll::Ready(Some(Ok(Event::Applied(1)))) + )); + assert!(matches!( + poll!(ev_modify.next()), + Poll::Ready(Some(Err(Error::TooManyObjects))) + )); + } +} diff --git a/kube-runtime/src/utils/mod.rs b/kube-runtime/src/utils/mod.rs index 41d9d10ec..53e50af9f 100644 --- a/kube-runtime/src/utils/mod.rs +++ b/kube-runtime/src/utils/mod.rs @@ -3,6 +3,7 @@ mod backoff_reset_timer; pub(crate) mod delayed_init; mod event_flatten; +mod event_modify; #[cfg(feature = "unstable-runtime-predicates")] mod predicate; mod stream_backoff; #[cfg(feature = "unstable-runtime-subscribe")] pub mod stream_subscribe; @@ -10,6 +11,7 @@ mod watch_ext; pub use backoff_reset_timer::ResetTimerBackoff; pub use event_flatten::EventFlatten; +pub use event_modify::EventModify; #[cfg(feature = "unstable-runtime-predicates")] pub use predicate::{predicates, Predicate, PredicateFilter}; pub use stream_backoff::StreamBackoff; diff --git a/kube-runtime/src/utils/watch_ext.rs b/kube-runtime/src/utils/watch_ext.rs index a9391f923..874ec8108 100644 --- a/kube-runtime/src/utils/watch_ext.rs +++ b/kube-runtime/src/utils/watch_ext.rs @@ -3,7 +3,7 @@ use crate::utils::predicate::{Predicate, PredicateFilter}; #[cfg(feature = "unstable-runtime-subscribe")] use crate::utils::stream_subscribe::StreamSubscribe; use crate::{ - utils::{event_flatten::EventFlatten, stream_backoff::StreamBackoff}, + utils::{event_flatten::EventFlatten, event_modify::EventModify, stream_backoff::StreamBackoff}, watcher, }; #[cfg(feature = "unstable-runtime-predicates")] use kube_client::Resource; @@ -53,6 +53,16 @@ pub trait WatchStreamExt: Stream { EventFlatten::new(self, true) } + /// Modify the [`Event`](crate::watcher::Event) returned by a [`watcher()`] stream. + /// by calling [`modify()`][crate::watcher::Event::modify()] on it. + fn modify(self, f: F) -> EventModify + where + Self: Stream, watcher::Error>> + Sized, + F: FnMut(&mut K), + { + EventModify::new(self, f) + } + /// Filter out a flattened stream on [`predicates`](crate::predicates). /// /// This will filter out repeat calls where the predicate returns the same result.