Skip to content

Commit

Permalink
Add WatchStreamExt::modify() to modify events
Browse files Browse the repository at this point in the history
Add `WatchStreamExt::modify()` which returns an `EventModify` stream.
This allows for users to directly modify the inner value of an `Event`
returned by the watcher stream, thus avoiding the usage of nested maps.

Example usage:

```rust
let stream = watcher(api, watcher::Config::default()).modify(|pod| {
    pod.managed_fields_mut().clear();
    pod.annotations_mut().clear();
    pod.status = None;
});
```

Signed-off-by: Sanskar Jaiswal <[email protected]>
  • Loading branch information
aryan9600 committed Jul 8, 2023
1 parent db585dd commit e26e2da
Show file tree
Hide file tree
Showing 4 changed files with 90 additions and 8 deletions.
12 changes: 5 additions & 7 deletions examples/pod_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,11 @@ async fn main() -> anyhow::Result<()> {
}
});

let stream = watcher(api, watcher::Config::default()).map_ok(|ev| {
ev.modify(|pod| {
// memory optimization for our store - we don't care about fields/annotations/status
pod.managed_fields_mut().clear();
pod.annotations_mut().clear();
pod.status = None;
})
let stream = watcher(api, watcher::Config::default()).modify(|pod| {
// memory optimization for our store - we don't care about managed fields/annotations/status
pod.managed_fields_mut().clear();
pod.annotations_mut().clear();
pod.status = None;
});

let rf = reflector(writer, stream)
Expand Down
72 changes: 72 additions & 0 deletions kube-runtime/src/utils/event_modify.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
use core::{
pin::Pin,
task::{Context, Poll},
};
use std::task::ready;

use futures::{Stream, TryStream};
use pin_project::pin_project;

use crate::watcher::{Error, Event};

#[pin_project]
/// Stream returned by the [`modify`](super::WatchStreamExt::modify) method.
/// Modifies the [`Event`] item returned by the inner stream by calling
/// [`modify`](Event::modify()) on it.
pub struct EventModify<St, F> {
#[pin]
stream: St,
f: F,
}

impl<St, F, K> EventModify<St, F>
where
St: TryStream<Ok = Event<K>>,
F: FnMut(&mut K),
{
pub(super) fn new(stream: St, f: F) -> EventModify<St, F> {
Self { stream, f }
}
}

impl<St, F, K> Stream for EventModify<St, F>
where
St: Stream<Item = Result<Event<K>, Error>>,
F: FnMut(&mut K),
{
type Item = Result<Event<K>, Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut me = self.project();
Poll::Ready(match ready!(me.stream.as_mut().poll_next(cx)) {
Some(Ok(event)) => Some(Ok(event.modify(me.f))),
Some(Err(err)) => Some(Err(err)),
None => return Poll::Ready(None),
})
}
}

#[cfg(test)]
pub(crate) mod test {
use std::task::Poll;

use super::{Error, Event, EventModify};
use futures::{pin_mut, poll, stream, StreamExt};

#[tokio::test]
async fn eventmodify_modifies_innner_value_of_event() {
let st = stream::iter([Ok(Event::Applied(0)), Err(Error::TooManyObjects)]);
let ev_modify = EventModify::new(st, |x| {
*x += 1;
});
pin_mut!(ev_modify);
assert!(matches!(
poll!(ev_modify.next()),
Poll::Ready(Some(Ok(Event::Applied(1))))
));
assert!(matches!(
poll!(ev_modify.next()),
Poll::Ready(Some(Err(Error::TooManyObjects)))
));
}
}
2 changes: 2 additions & 0 deletions kube-runtime/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
mod backoff_reset_timer;
pub(crate) mod delayed_init;
mod event_flatten;
mod event_modify;
#[cfg(feature = "unstable-runtime-predicates")] mod predicate;
mod stream_backoff;
#[cfg(feature = "unstable-runtime-subscribe")] pub mod stream_subscribe;
mod watch_ext;

pub use backoff_reset_timer::ResetTimerBackoff;
pub use event_flatten::EventFlatten;
pub use event_modify::EventModify;
#[cfg(feature = "unstable-runtime-predicates")]
pub use predicate::{predicates, Predicate, PredicateFilter};
pub use stream_backoff::StreamBackoff;
Expand Down
12 changes: 11 additions & 1 deletion kube-runtime/src/utils/watch_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::utils::predicate::{Predicate, PredicateFilter};
#[cfg(feature = "unstable-runtime-subscribe")]
use crate::utils::stream_subscribe::StreamSubscribe;
use crate::{
utils::{event_flatten::EventFlatten, stream_backoff::StreamBackoff},
utils::{event_flatten::EventFlatten, event_modify::EventModify, stream_backoff::StreamBackoff},
watcher,
};
#[cfg(feature = "unstable-runtime-predicates")] use kube_client::Resource;
Expand Down Expand Up @@ -53,6 +53,16 @@ pub trait WatchStreamExt: Stream {
EventFlatten::new(self, true)
}

/// Modify the [`Event`](crate::watcher::Event) returned by a [`watcher()`] stream.
/// by calling [`modify()`][crate::watcher::Event::modify()] on it.
fn modify<F, K>(self, f: F) -> EventModify<Self, F>
where
Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Sized,
F: FnMut(&mut K),
{
EventModify::new(self, f)
}

/// Filter out a flattened stream on [`predicates`](crate::predicates).
///
/// This will filter out repeat calls where the predicate returns the same result.
Expand Down

0 comments on commit e26e2da

Please sign in to comment.