Skip to content

Commit

Permalink
Add WatchStreamExt::modify() to modify events
Browse files Browse the repository at this point in the history
Add `WatchStreamExt::modify()` which returns an `EventModify` stream.
This allows for users to directly modify the inner value of an `Event`
returned by the watcher stream, thus avoiding the usage of nested maps.

Example usage:

```rust
let stream = watcher(api, watcher::Config::default()).modify(|pod| {
    pod.managed_fields_mut().clear();
    pod.annotations_mut().clear();
    pod.status = None;
});
```

Signed-off-by: Sanskar Jaiswal <[email protected]>
  • Loading branch information
aryan9600 committed Jul 8, 2023
1 parent db585dd commit 6ca8651
Show file tree
Hide file tree
Showing 4 changed files with 124 additions and 8 deletions.
12 changes: 5 additions & 7 deletions examples/pod_reflector.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,11 @@ async fn main() -> anyhow::Result<()> {
}
});

let stream = watcher(api, watcher::Config::default()).map_ok(|ev| {
ev.modify(|pod| {
// memory optimization for our store - we don't care about fields/annotations/status
pod.managed_fields_mut().clear();
pod.annotations_mut().clear();
pod.status = None;
})
let stream = watcher(api, watcher::Config::default()).modify(|pod| {
// memory optimization for our store - we don't care about managed fields/annotations/status
pod.managed_fields_mut().clear();
pod.annotations_mut().clear();
pod.status = None;
});

let rf = reflector(writer, stream)
Expand Down
81 changes: 81 additions & 0 deletions kube-runtime/src/utils/event_modify.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
use core::{
pin::Pin,
task::{Context, Poll},
};
use std::task::ready;

use futures::{Stream, TryStream};
use pin_project::pin_project;

use crate::watcher::{Error, Event};

#[pin_project]
/// Stream returned by the [`modify`](super::WatchStreamExt::modify) method.
/// Modifies the [`Event`] item returned by the inner stream by calling
/// [`modify`](Event::modify()) on it.
pub struct EventModify<St, F> {
#[pin]
stream: St,
f: F,
}

impl<St, F, K> EventModify<St, F>
where
St: TryStream<Ok = Event<K>>,
F: FnMut(&mut K),
{
pub(super) fn new(stream: St, f: F) -> EventModify<St, F> {
Self { stream, f }
}
}

impl<St, F, K> Stream for EventModify<St, F>
where
St: Stream<Item = Result<Event<K>, Error>>,
F: FnMut(&mut K),
{
type Item = Result<Event<K>, Error>;

fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let mut me = self.project();
Poll::Ready(match ready!(me.stream.as_mut().poll_next(cx)) {
Some(Ok(event)) => Some(Ok(event.modify(me.f))),
Some(Err(err)) => Some(Err(err)),
None => None,
})
}
}

#[cfg(test)]
pub(crate) mod test {
use std::task::Poll;

use super::{Error, Event, EventModify};
use futures::{pin_mut, poll, stream, StreamExt};

#[tokio::test]
async fn eventmodify_modifies_innner_value_of_event() {
let restart_val = vec![10];
let st = stream::iter([Ok(Event::Applied(0)),
Err(Error::TooManyObjects),
Ok(Event::Restarted(restart_val)),
]);
let ev_modify = EventModify::new(st, |x| {
*x += 1;
});
pin_mut!(ev_modify);
assert!(matches!(
poll!(ev_modify.next()),
Poll::Ready(Some(Ok(Event::Applied(1))))
));
assert!(matches!(
poll!(ev_modify.next()),
Poll::Ready(Some(Err(Error::TooManyObjects)))
));
assert!(matches!(
poll!(ev_modify.next()),
Poll::Ready(Some(Ok(Event::Restarted(restart_val))))
));
assert!(matches!(poll!(ev_modify.next()), Poll::Ready(None)));
}
}
2 changes: 2 additions & 0 deletions kube-runtime/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@
mod backoff_reset_timer;
pub(crate) mod delayed_init;
mod event_flatten;
mod event_modify;
#[cfg(feature = "unstable-runtime-predicates")] mod predicate;
mod stream_backoff;
#[cfg(feature = "unstable-runtime-subscribe")] pub mod stream_subscribe;
mod watch_ext;

pub use backoff_reset_timer::ResetTimerBackoff;
pub use event_flatten::EventFlatten;
pub use event_modify::EventModify;
#[cfg(feature = "unstable-runtime-predicates")]
pub use predicate::{predicates, Predicate, PredicateFilter};
pub use stream_backoff::StreamBackoff;
Expand Down
37 changes: 36 additions & 1 deletion kube-runtime/src/utils/watch_ext.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ use crate::utils::predicate::{Predicate, PredicateFilter};
#[cfg(feature = "unstable-runtime-subscribe")]
use crate::utils::stream_subscribe::StreamSubscribe;
use crate::{
utils::{event_flatten::EventFlatten, stream_backoff::StreamBackoff},
utils::{event_flatten::EventFlatten, event_modify::EventModify, stream_backoff::StreamBackoff},
watcher,
};
#[cfg(feature = "unstable-runtime-predicates")] use kube_client::Resource;
Expand Down Expand Up @@ -53,6 +53,41 @@ pub trait WatchStreamExt: Stream {
EventFlatten::new(self, true)
}

/// Modify elements of a [`watcher()`] stream.
///
/// Calls [`watcher::Event::modify()`] on every element.
/// Stream shorthand for `stream.map_ok(|event| { event.modify(f) })`.
///
/// ```no_run
/// # use futures::{pin_mut, Stream, StreamExt, TryStreamExt};
/// # use kube::{Api, Client, ResourceExt};
/// # use kube_runtime::{watcher, WatchStreamExt};
/// # use k8s_openapi::api::apps::v1::Deployment;
/// # async fn wrapper() -> Result<(), Box<dyn std::error::Error>> {
/// # let client: kube::Client = todo!();
/// let deploys: Api<Deployment> = Api::all(client);
/// let truncated_deploy_stream = watcher(deploys, watcher::Config::default())
/// .modify(|deploy| {
/// deploy.managed_fields_mut().clear();
/// deploy.status = None;
/// })
/// .applied_objects();
/// pin_mut!(truncated_deploy_stream);
///
/// while let Some(d) = truncated_deploy_stream.try_next().await? {
/// println!("Truncated Deployment: '{:?}'", serde_json::to_string(&d)?);
/// }
/// # Ok(())
/// # }
/// ```
fn modify<F, K>(self, f: F) -> EventModify<Self, F>
where
Self: Stream<Item = Result<watcher::Event<K>, watcher::Error>> + Sized,
F: FnMut(&mut K),
{
EventModify::new(self, f)
}

/// Filter out a flattened stream on [`predicates`](crate::predicates).
///
/// This will filter out repeat calls where the predicate returns the same result.
Expand Down

0 comments on commit 6ca8651

Please sign in to comment.