Skip to content

Commit

Permalink
Add helper to reschedule all objects for reconciliation
Browse files Browse the repository at this point in the history
  • Loading branch information
nightkr committed Jun 16, 2021
1 parent 3b46233 commit 508b7a7
Show file tree
Hide file tree
Showing 4 changed files with 31 additions and 13 deletions.
1 change: 0 additions & 1 deletion kube-core/src/discovery.rs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,6 @@ impl ApiResource {
}
}


/// Resource scope
#[derive(Debug, Clone, Hash, Eq, PartialEq)]
pub enum Scope {
Expand Down
1 change: 0 additions & 1 deletion kube-core/src/object.rs
Original file line number Diff line number Diff line change
Expand Up @@ -208,7 +208,6 @@ where
#[derive(Clone, Deserialize, Serialize, Default, Debug)]
pub struct NotUsed {}


#[cfg(test)]
mod test {
use super::{ApiResource, NotUsed, Object};
Expand Down
1 change: 0 additions & 1 deletion kube-core/src/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ pub struct StatusCause {
pub field: String,
}


#[cfg(test)]
mod test {
use super::Status;
Expand Down
41 changes: 31 additions & 10 deletions kube-runtime/src/controller/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@ use crate::{
};
use derivative::Derivative;
use futures::{
channel, future,
stream::{self, SelectAll},
FutureExt, SinkExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream, TryStreamExt,
channel, future, stream, FutureExt, SinkExt, Stream, StreamExt, TryFuture, TryFutureExt, TryStream,
TryStreamExt,
};
use kube::api::{Api, DynamicObject, ListParams, Resource};
use serde::de::DeserializeOwned;
Expand Down Expand Up @@ -300,7 +299,7 @@ where
{
// NB: Need to Unpin for stream::select_all
// TODO: get an arbitrary std::error::Error in here?
selector: SelectAll<BoxStream<'static, Result<ObjectRef<K>, watcher::Error>>>,
trigger_selector: stream::SelectAll<BoxStream<'static, Result<ObjectRef<K>, watcher::Error>>>,
dyntype: K::DynamicType,
reader: Store<K>,
}
Expand Down Expand Up @@ -335,15 +334,15 @@ where
pub fn new_with(owned_api: Api<K>, lp: ListParams, dyntype: K::DynamicType) -> Self {
let writer = Writer::<K>::new(dyntype.clone());
let reader = writer.as_reader();
let mut selector = stream::SelectAll::new();
let mut trigger_selector = stream::SelectAll::new();
let self_watcher = trigger_self(
try_flatten_applied(reflector(writer, watcher(owned_api, lp))),
dyntype.clone(),
)
.boxed();
selector.push(self_watcher);
trigger_selector.push(self_watcher);
Self {
selector,
trigger_selector,
dyntype,
reader,
}
Expand Down Expand Up @@ -371,7 +370,7 @@ where
Child::DynamicType: Debug + Eq + Hash,
{
let child_watcher = trigger_owners(try_flatten_touched(watcher(api, lp)), self.dyntype.clone());
self.selector.push(child_watcher.boxed());
self.trigger_selector.push(child_watcher.boxed());
self
}

Expand All @@ -391,7 +390,29 @@ where
I::IntoIter: Send,
{
let other_watcher = trigger_with(try_flatten_touched(watcher(api, lp)), mapper);
self.selector.push(other_watcher.boxed());
self.trigger_selector.push(other_watcher.boxed());
self
}

/// Trigger a reconciliation for all managed objects whenever `trigger` emits a value
///
/// For example, this can be used to reconcile all objects whenever the controller's configuration changes.
pub fn reconcile_all_on(mut self, trigger: impl Stream<Item = ()> + Send + Sync + 'static) -> Self {
let store = self.store();
let dyntype = self.dyntype.clone();
self.trigger_selector.push(
trigger
.flat_map(move |()| {
let dyntype = dyntype.clone();
stream::iter(
store
.state()
.into_iter()
.map(move |obj| Ok(ObjectRef::from_obj_with(&obj, dyntype.clone()))),
)
})
.boxed(),
);
self
}

Expand All @@ -418,7 +439,7 @@ where
error_policy,
context,
self.reader,
self.selector,
self.trigger_selector,
)
}
}
Expand Down

0 comments on commit 508b7a7

Please sign in to comment.