From b8572ac46711ce770d7fffb5c893970660192238 Mon Sep 17 00:00:00 2001
From: David Herberth <github@dav1d.de>
Date: Thu, 30 Mar 2023 10:33:29 +0200
Subject: [PATCH] implements owns_stream and owns_stream_with for Controller

Signed-off-by: David Herberth <github@dav1d.de>
---
 kube-runtime/Cargo.toml            |  3 +-
 kube-runtime/src/controller/mod.rs | 57 ++++++++++++++++++++++++++++++
 2 files changed, 59 insertions(+), 1 deletion(-)

diff --git a/kube-runtime/Cargo.toml b/kube-runtime/Cargo.toml
index ec1e3f529..598a66cdd 100644
--- a/kube-runtime/Cargo.toml
+++ b/kube-runtime/Cargo.toml
@@ -15,8 +15,9 @@ rust-version = "1.63.0"
 edition = "2021"
 
 [features]
-unstable-runtime = ["unstable-runtime-subscribe"]
+unstable-runtime = ["unstable-runtime-subscribe", "unstable-runtime-owns_stream"]
 unstable-runtime-subscribe = []
+unstable-runtime-owns_stream = []
 
 [package.metadata.docs.rs]
 features = ["k8s-openapi/v1_26", "unstable-runtime"]
diff --git a/kube-runtime/src/controller/mod.rs b/kube-runtime/src/controller/mod.rs
index 98affaece..961f07a3a 100644
--- a/kube-runtime/src/controller/mod.rs
+++ b/kube-runtime/src/controller/mod.rs
@@ -584,6 +584,63 @@ where
         self
     }
 
+    /// Trigger the reconciliation process for `Child` objets which `K` wns emitted by `trigger`.
+    ///
+    /// Same as [`Controller::owns`], but instad of a resource a stream of resources is used.
+    /// This allows for customized and pre-filtered watch streams to be used as a trigger.
+    ///
+    /// # Example:
+    ///
+    /// ```no_run
+    /// # use futures::StreamExt;
+    /// # use k8s_openapi::api::core::v1::ConfigMap;
+    /// # use k8s_openapi::api::apps::v1::StatefulSet;
+    /// # use kube::runtime::controller::Action;
+    /// # use kube::runtime::{watcher, Controller, WatchStreamExt};
+    /// # use kube::{Api, Client, Error, ResourceExt};
+    /// # use std::sync::Arc;
+    /// # let client: Client = todo!();
+    /// # async fn reconcile(_: Arc<ConfigMap>, _: Arc<()>) -> Result<Action, Error> { Ok(Action::await_change()) }
+    /// # fn error_policy(_: Arc<ConfigMap>, _: &kube::Error, _: Arc<Store<Pod>>) -> Action { Action::await_change() }
+    /// # type CustomResource = ConfigMap;
+    ///
+    /// let sts_stream = watcher(Api::<StatefulSet>::all(client.clone()), watcher::Config::default())
+    ///     .touched_objects()
+    ///     .predicate_filter(predicates::generation);
+    ///
+    /// Controller::new(Api::<CustomResource>::all(client), watcher::Config::default())
+    ///     .owns_stream(sts_stream)
+    ///     .run(reconcile, error_policy, Arc::new(()))
+    ///     .for_each(|_| std::future::ready(()))
+    ///     .await;
+    /// # };
+    #[cfg(feature = "unstable-runtime-owns_stream")]
+    #[must_use]
+    pub fn owns_stream<Child: Resource<DynamicType = ()> + Send + 'static>(
+        self,
+        trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
+    ) -> Self {
+        self.owns_stream_with(trigger, ())
+    }
+
+    /// Trigger the reconciliation process for `Child` objets which `K` wns emitted by `trigger`.
+    ///
+    /// Same as [`Controller::owns_stream`], but accepts a `DynamicType` so it can be used with dynamic resources.
+    #[cfg(feature = "unstable-runtime-owns_stream")]
+    #[must_use]
+    pub fn owns_stream_with<Child: Resource + Send + 'static>(
+        mut self,
+        trigger: impl Stream<Item = Result<Child, watcher::Error>> + Send + 'static,
+        dyntype: Child::DynamicType,
+    ) -> Self
+    where
+        Child::DynamicType: Debug + Eq + Hash + Clone,
+    {
+        let child_watcher = trigger_owners(trigger, self.dyntype.clone(), dyntype);
+        self.trigger_selector.push(child_watcher.boxed());
+        self
+    }
+
     /// Specify `Watched` object which `K` has a custom relation to and should be watched
     ///
     /// To define the `Watched` relation with `K`, you **must** define a custom relation mapper, which,