diff --git a/src/composition/runner.rs b/src/composition/runner.rs index bf98a00c7..eaaf2ed21 100644 --- a/src/composition/runner.rs +++ b/src/composition/runner.rs @@ -1,9 +1,9 @@ use std::collections::HashMap; use apollo_federation_types::config::SupergraphConfig; -use futures::stream::{empty, StreamExt}; +use futures::stream::{empty, BoxStream, StreamExt}; use tap::TapFallible; -use tokio::task::AbortHandle; +use tokio::{sync::mpsc::UnboundedSender, task::AbortHandle}; use tokio_stream::wrappers::UnboundedReceiverStream; use crate::{ @@ -122,11 +122,12 @@ impl SubgraphWatchers { impl SubtaskHandleStream for SubgraphWatchers { type Input = SupergraphConfigDiff; type Output = SubgraphChanged; + fn handle( self, - sender: tokio::sync::mpsc::UnboundedSender, - mut input: futures::stream::BoxStream<'static, Self::Input>, - ) -> tokio::task::AbortHandle { + sender: UnboundedSender, + mut input: BoxStream<'static, Self::Input>, + ) -> AbortHandle { tokio::task::spawn(async move { let mut abort_handles: HashMap = HashMap::new(); for (subgraph_name, (mut messages, subtask)) in self.watchers.into_iter() { @@ -187,3 +188,62 @@ impl SubtaskHandleStream for SubgraphWatchers { .abort_handle() } } + +#[cfg(test)] +mod tests { + use apollo_federation_types::config::{SchemaSource, SubgraphConfig, SupergraphConfig}; + + use super::SubgraphWatchers; + + #[test] + fn test_subgraphwatchers_new() { + let supergraph_config: SupergraphConfig = [ + ( + "file".to_string(), + SubgraphConfig { + routing_url: None, + schema: SchemaSource::File { + file: "/path/to/file".into(), + }, + }, + ), + ( + "introspection".to_string(), + SubgraphConfig { + routing_url: None, + schema: SchemaSource::SubgraphIntrospection { + subgraph_url: "http://subgraph_url".try_into().unwrap(), + introspection_headers: None, + }, + }, + ), + ( + "subgraph".to_string(), + SubgraphConfig { + routing_url: None, + schema: SchemaSource::Subgraph { + graphref: "graphref".to_string(), + subgraph: "subgraph".to_string(), + }, + }, + ), + ( + "sdl".to_string(), + SubgraphConfig { + routing_url: None, + schema: SchemaSource::Sdl { + sdl: "sdl".to_string(), + }, + }, + ), + ] + .into_iter() + .collect(); + let subgraph_watchers = SubgraphWatchers::new(supergraph_config); + + // We should only have watchers for file and introspection based subgraphs. + assert_eq!(2, subgraph_watchers.watchers.len()); + assert!(subgraph_watchers.watchers.contains_key("file")); + assert!(subgraph_watchers.watchers.contains_key("introspection")); + } +}