From b803e3884a74925e3a913b80d8e579fab95d8083 Mon Sep 17 00:00:00 2001 From: Brian <2059306+dotdat@users.noreply.github.com> Date: Wed, 18 Dec 2024 15:44:52 -0600 Subject: [PATCH] Watch subgraphs when rover dev runs (#2310) Refactors OneShotComposition into a CompositionPipeline so that we can consistently create a one-and-done composition artifact and then kick off subgraph watchers for running with LSP or Rover Dev. Additionally, this adds watching subgraphs to the pipeline and fixes a few issues along the way. --- src/command/dev/next/mod.rs | 89 +++--- src/command/dev/next/router/binary.rs | 84 ++++-- src/command/dev/next/router/hot_reload.rs | 6 +- src/command/dev/next/router/run.rs | 41 ++- src/command/supergraph/compose/do_compose.rs | 53 ++-- src/composition/events/mod.rs | 2 +- src/composition/mod.rs | 15 +- src/composition/pipeline.rs | 271 ++++++++++++++++++ src/composition/runner/mod.rs | 219 +++----------- src/composition/supergraph/binary.rs | 8 +- .../supergraph/config/federation.rs | 3 - .../supergraph/config/full/subgraph.rs | 65 ++--- .../supergraph/config/full/subgraphs.rs | 14 + .../supergraph/config/full/supergraph.rs | 13 + .../supergraph/config/lazy/supergraph.rs | 22 +- .../supergraph/config/resolver/mod.rs | 17 +- .../config/unresolved/supergraph.rs | 38 +-- src/composition/watchers/composition.rs | 71 +++-- src/composition/watchers/subgraphs.rs | 33 ++- src/composition/watchers/watcher/file.rs | 3 +- src/composition/watchers/watcher/subgraph.rs | 33 ++- .../watchers/watcher/supergraph_config.rs | 103 +++++-- src/subtask.rs | 4 + src/utils/effect/read_file.rs | 2 +- 24 files changed, 787 insertions(+), 422 deletions(-) create mode 100644 src/composition/pipeline.rs diff --git a/src/command/dev/next/mod.rs b/src/command/dev/next/mod.rs index 9d7970e1d..050083aeb 100644 --- a/src/command/dev/next/mod.rs +++ b/src/command/dev/next/mod.rs @@ -1,28 +1,24 @@ #![warn(missing_docs)] +use std::io::stdin; + use anyhow::anyhow; use apollo_federation_types::config::RouterVersion; use camino::Utf8PathBuf; use futures::StreamExt; use houston::{Config, Profile}; -use router::{ - install::InstallRouter, - run::RunRouter, - watchers::{file::FileWatcher, router_config::RouterConfigWatcher}, -}; +use router::{install::InstallRouter, run::RunRouter, watchers::file::FileWatcher}; use rover_client::operations::config::who_am_i::WhoAmI; -use tower::{Service, ServiceExt}; use crate::{ command::Dev, - composition::runner::OneShotComposition, - subtask::{Subtask, SubtaskRunUnit}, + composition::pipeline::CompositionPipeline, utils::{ client::StudioClientConfig, effect::{ exec::{TokioCommand, TokioSpawn}, read_file::FsReadFile, - write_file::{FsWriteFile, WriteFileRequest}, + write_file::FsWriteFile, }, }, RoverError, RoverOutput, RoverResult, @@ -45,7 +41,7 @@ impl Dev { let elv2_license_accepter = self.opts.plugin_opts.elv2_license_accepter; let skip_update = self.opts.plugin_opts.skip_update; let read_file_impl = FsReadFile::default(); - let mut write_file_impl = FsWriteFile::default(); + let write_file_impl = FsWriteFile::default(); let exec_command_impl = TokioCommand::default(); let router_address = RouterAddress::new( self.opts.supergraph_opts.supergraph_address, @@ -63,26 +59,38 @@ impl Dev { .await .map_err(|err| RoverError::new(anyhow!("{}", err)))?; - let supergraph_yaml = self.opts.supergraph_opts.clone().supergraph_config_path; - let federation_version = self.opts.supergraph_opts.federation_version.clone(); - let profile = self.opts.plugin_opts.profile.clone(); - let graph_ref = self.opts.supergraph_opts.graph_ref.clone(); - - let one_shot_composition = OneShotComposition::builder() - .client_config(client_config.clone()) - .profile(profile.clone()) - .elv2_license_accepter(elv2_license_accepter) - .skip_update(skip_update) - .and_federation_version(federation_version) - .and_graph_ref(graph_ref.clone()) - .and_supergraph_yaml(supergraph_yaml) - .and_override_install_path(override_install_path.clone()) - .build(); - - let supergraph_schema = one_shot_composition - .compose(&read_file_impl, &write_file_impl, &exec_command_impl) + let profile = &self.opts.plugin_opts.profile; + let graph_ref = &self.opts.supergraph_opts.graph_ref; + let supergraph_config_path = &self.opts.supergraph_opts.clone().supergraph_config_path; + + let service = client_config.get_authenticated_client(profile)?.service()?; + let service = WhoAmI::new(service); + + let composition_pipeline = CompositionPipeline::default() + .init( + &mut stdin(), + &client_config.get_authenticated_client(profile)?, + supergraph_config_path.clone(), + graph_ref.clone(), + ) + .await? + .resolve_federation_version( + &client_config, + &client_config.get_authenticated_client(profile)?, + self.opts.supergraph_opts.federation_version.clone(), + ) .await? - .supergraph_sdl; + .install_supergraph_binary( + client_config.clone(), + override_install_path.clone(), + elv2_license_accepter, + skip_update, + ) + .await?; + let composition_success = composition_pipeline + .compose(&exec_command_impl, &read_file_impl, &write_file_impl, None) + .await?; + let supergraph_schema = composition_success.supergraph_sdl(); // TODO: figure out how to actually get this; maybe based on fed version? didn't see a cli // opt @@ -91,10 +99,19 @@ impl Dev { let credential = Profile::get_credential(&profile.profile_name, &Config::new(None::<&String>, None)?)?; - let service = client_config - .get_authenticated_client(&profile)? - .service()?; - let service = WhoAmI::new(service); + let composition_runner = composition_pipeline + .runner( + exec_command_impl, + read_file_impl.clone(), + write_file_impl.clone(), + profile, + &client_config, + self.opts.subgraph_opts.subgraph_polling_interval, + tmp_config_dir_path.clone(), + ) + .await?; + + let composition_messages = composition_runner.run(); let mut run_router = RunRouter::default() .install::( @@ -107,17 +124,17 @@ impl Dev { .await? .load_config(&read_file_impl, router_address, router_config_path) .await? - .load_remote_config(service, graph_ref, Some(credential)) + .load_remote_config(service, graph_ref.clone(), Some(credential)) .await .run( FsWriteFile::default(), TokioSpawn::default(), &tmp_config_dir_path, - client_config, + client_config.clone(), supergraph_schema, ) .await? - .watch_for_changes(write_file_impl) + .watch_for_changes(write_file_impl, composition_messages) .await; while let Some(router_log) = run_router.router_logs().next().await { diff --git a/src/command/dev/next/router/binary.rs b/src/command/dev/next/router/binary.rs index 69519eae8..048f84e0d 100644 --- a/src/command/dev/next/router/binary.rs +++ b/src/command/dev/next/router/binary.rs @@ -166,31 +166,69 @@ where .send(Err(err)) .tap_err(|err| tracing::error!("Failed to send error message {:?}", err)); } - Ok(mut child) => match child.stdout.take() { - Some(stdout) => { - tokio::task::spawn(async move { - let mut lines = BufReader::new(stdout).lines(); - while let Ok(Some(line)) = lines.next_line().await.tap_err(|err| { - tracing::error!("Error reading from router stdout: {:?}", err) - }) { - let _ = sender.send(Ok(RouterLog::Stdout(line))).tap_err(|err| { - tracing::error!( - "Failed to send router stdout message. {:?}", - err - ) - }); - } - }); + Ok(mut child) => { + match child.stdout.take() { + Some(stdout) => { + tokio::task::spawn({ + let sender = sender.clone(); + async move { + let mut lines = BufReader::new(stdout).lines(); + while let Ok(Some(line)) = + lines.next_line().await.tap_err(|err| { + tracing::error!( + "Error reading from router stdout: {:?}", + err + ) + }) + { + let _ = sender.send(Ok(RouterLog::Stdout(line))).tap_err( + |err| { + tracing::error!( + "Failed to send router stdout message. {:?}", + err + ) + }, + ); + } + } + }); + } + None => { + let err = RunRouterBinaryError::OutputCapture { + descriptor: "stdin".to_string(), + }; + let _ = sender.send(Err(err)).tap_err(|err| { + tracing::error!("Failed to send error message {:?}", err) + }); + } } - None => { - let err = RunRouterBinaryError::OutputCapture { - descriptor: "stdin".to_string(), - }; - let _ = sender.send(Err(err)).tap_err(|err| { - tracing::error!("Failed to send error message {:?}", err) - }); + match child.stderr.take() { + Some(stderr) => { + tokio::task::spawn(async move { + let mut lines = BufReader::new(stderr).lines(); + while let Ok(Some(line)) = lines.next_line().await.tap_err(|err| { + tracing::error!("Error reading from router stderr: {:?}", err) + }) { + let _ = + sender.send(Ok(RouterLog::Stderr(line))).tap_err(|err| { + tracing::error!( + "Failed to send router stderr message. {:?}", + err + ) + }); + } + }); + } + None => { + let err = RunRouterBinaryError::OutputCapture { + descriptor: "stdin".to_string(), + }; + let _ = sender.send(Err(err)).tap_err(|err| { + tracing::error!("Failed to send error message {:?}", err) + }); + } } - }, + } } }) .abort_handle() diff --git a/src/command/dev/next/router/hot_reload.rs b/src/command/dev/next/router/hot_reload.rs index 6e632116a..327c80cf2 100644 --- a/src/command/dev/next/router/hot_reload.rs +++ b/src/command/dev/next/router/hot_reload.rs @@ -7,10 +7,8 @@ use crate::{subtask::SubtaskHandleStream, utils::effect::write_file::WriteFile}; use super::config::RouterConfig; -pub struct SupergraphSchema(String); - pub enum RouterUpdateEvent { - SchemaChanged { schema: SupergraphSchema }, + SchemaChanged { schema: String }, ConfigChanged { config: RouterConfig }, } @@ -44,7 +42,7 @@ where match router_update_event { RouterUpdateEvent::SchemaChanged { schema } => { match write_file_impl - .write_file(&self.schema, schema.0.as_bytes()) + .write_file(&self.schema, schema.as_bytes()) .await { Ok(_) => { diff --git a/src/command/dev/next/router/run.rs b/src/command/dev/next/router/run.rs index 983c47c6a..749c95b31 100644 --- a/src/command/dev/next/router/run.rs +++ b/src/command/dev/next/router/run.rs @@ -2,7 +2,10 @@ use std::time::Duration; use apollo_federation_types::config::RouterVersion; use camino::{Utf8Path, Utf8PathBuf}; -use futures::{stream, StreamExt}; +use futures::{ + stream::{self, BoxStream}, + StreamExt, +}; use houston::Credential; use rover_client::{ operations::config::who_am_i::{RegistryIdentity, WhoAmIError, WhoAmIRequest}, @@ -16,6 +19,7 @@ use tracing::info; use crate::{ command::dev::next::FileWatcher, + composition::events::CompositionEvent, options::LicenseAccepter, subtask::{Subtask, SubtaskRunStream, SubtaskRunUnit}, utils::{ @@ -131,7 +135,7 @@ impl RunRouter { spawn: Spawn, temp_router_dir: &Utf8Path, studio_client_config: StudioClientConfig, - supergraph_schema: String, + supergraph_schema: &str, ) -> Result, RunRouterBinaryError> where Spawn: Service + Send + Clone + 'static, @@ -171,7 +175,7 @@ impl RunRouter { .call( WriteFileRequest::builder() .path(hot_reload_schema_path.clone()) - .contents(supergraph_schema.into_bytes()) + .contents(supergraph_schema.as_bytes().to_vec()) .build(), ) .await @@ -260,10 +264,15 @@ impl RunRouter { } impl RunRouter { - pub async fn watch_for_changes(self, write_file_impl: WriteF) -> RunRouter + pub async fn watch_for_changes( + self, + write_file_impl: WriteF, + composition_messages: BoxStream<'static, CompositionEvent>, + ) -> RunRouter where WriteF: WriteFile + Send + Clone + 'static, { + tracing::info!("Watching for subgraph changes"); let (router_config_updates, config_watcher_subtask) = if let Some(config_path) = self.state.config_path { @@ -275,9 +284,22 @@ impl RunRouter { (None, None) }; + let composition_messages = + tokio_stream::StreamExt::filter_map(composition_messages, |event| match event { + CompositionEvent::Started => None, + CompositionEvent::Error(err) => { + tracing::error!("Composition error {:?}", err); + None + } + CompositionEvent::Success(success) => Some(RouterUpdateEvent::SchemaChanged { + schema: success.supergraph_sdl().to_string(), + }), + }) + .boxed(); + let hot_reload_watcher = HotReloadWatcher::builder() .config(self.state.hot_reload_config_path) - .schema(self.state.hot_reload_schema_path) + .schema(self.state.hot_reload_schema_path.clone()) .write_file_impl(write_file_impl) .build(); @@ -285,10 +307,13 @@ impl RunRouter { Subtask::new(hot_reload_watcher); let router_config_updates = router_config_updates - .map(|stream| stream.boxed()) + .map(move |stream| stream.boxed()) .unwrap_or_else(|| stream::empty().boxed()); - let abort_hot_reload = SubtaskRunStream::run(hot_reload_subtask, router_config_updates); + let router_updates = + tokio_stream::StreamExt::merge(router_config_updates, composition_messages); + + let abort_hot_reload = SubtaskRunStream::run(hot_reload_subtask, router_updates.boxed()); let abort_config_watcher = config_watcher_subtask.map(SubtaskRunUnit::run); @@ -299,6 +324,7 @@ impl RunRouter { abort_config_watcher, hot_reload_events, router_logs: self.state.router_logs, + hot_reload_schema_path: self.state.hot_reload_schema_path, }, } } @@ -352,5 +378,6 @@ mod state { pub abort_router: AbortHandle, pub abort_config_watcher: Option, pub abort_hot_reload: AbortHandle, + pub hot_reload_schema_path: Utf8PathBuf, } } diff --git a/src/command/supergraph/compose/do_compose.rs b/src/command/supergraph/compose/do_compose.rs index 34171f58e..5d1fe547d 100644 --- a/src/command/supergraph/compose/do_compose.rs +++ b/src/command/supergraph/compose/do_compose.rs @@ -38,7 +38,7 @@ use crate::{ }, composition::{ events::CompositionEvent, - runner::{OneShotComposition, Runner}, + runner::Runner, supergraph::{ binary::{OutputTarget, SupergraphBinary}, config::{ @@ -163,6 +163,8 @@ impl Compose { client_config: StudioClientConfig, output_file: Option, ) -> RoverResult { + use crate::composition::pipeline::CompositionPipeline; + let read_file_impl = FsReadFile::default(); let write_file_impl = FsWriteFile::default(); let exec_command_impl = TokioCommand::default(); @@ -173,27 +175,40 @@ impl Compose { .clone() .supergraph_yaml; - let federation_version = self.opts.federation_version.clone(); let profile = self.opts.plugin_opts.profile.clone(); let graph_ref = self.opts.supergraph_config_source.graph_ref.clone(); - let one_shot_composition = OneShotComposition::builder() - .client_config(client_config) - .profile(profile) - .elv2_license_accepter(self.opts.plugin_opts.elv2_license_accepter) - .skip_update(self.opts.plugin_opts.skip_update) - .and_federation_version(federation_version) - .and_graph_ref(graph_ref) - .and_supergraph_yaml(supergraph_yaml) - .and_override_install_path(override_install_path) - .and_output_file(output_file) - .build(); - - Ok(RoverOutput::CompositionResult( - one_shot_composition - .compose(&read_file_impl, &write_file_impl, &exec_command_impl) - .await?, - )) + let composition_pipeline = CompositionPipeline::default() + .init( + &mut stdin(), + &client_config.get_authenticated_client(&profile)?, + supergraph_yaml, + graph_ref.clone(), + ) + .await? + .resolve_federation_version( + &client_config, + &client_config.get_authenticated_client(&profile)?, + self.opts.federation_version.clone(), + ) + .await? + .install_supergraph_binary( + client_config.clone(), + override_install_path.clone(), + self.opts.plugin_opts.elv2_license_accepter, + self.opts.plugin_opts.skip_update, + ) + .await?; + let composition_success = composition_pipeline + .compose( + &exec_command_impl, + &read_file_impl, + &write_file_impl, + output_file, + ) + .await?; + + Ok(RoverOutput::CompositionResult(composition_success.into())) } #[cfg(not(feature = "composition-rewrite"))] diff --git a/src/composition/events/mod.rs b/src/composition/events/mod.rs index cbb3563a2..60dc8d87b 100644 --- a/src/composition/events/mod.rs +++ b/src/composition/events/mod.rs @@ -1,7 +1,7 @@ use super::{CompositionError, CompositionSuccess}; /// Events emitted from composition -#[derive(Debug, Eq, PartialEq)] +#[derive(Debug)] pub enum CompositionEvent { /// The composition has started and may not have finished yet. This is useful for letting users /// know composition is running diff --git a/src/composition/mod.rs b/src/composition/mod.rs index 7509caef0..dca367f81 100644 --- a/src/composition/mod.rs +++ b/src/composition/mod.rs @@ -8,6 +8,7 @@ use camino::Utf8PathBuf; use derive_getters::Getters; pub mod events; +pub mod pipeline; pub mod runner; pub mod supergraph; #[cfg(test)] @@ -24,7 +25,7 @@ pub struct CompositionSuccess { federation_version: FederationVersion, } -#[derive(thiserror::Error, Debug, Eq, PartialEq)] +#[derive(thiserror::Error, Debug)] pub enum CompositionError { #[error("Failed to run the composition binary")] Binary { error: String }, @@ -39,7 +40,17 @@ pub enum CompositionError { #[error("Invalid input for `{binary} compose`")] InvalidInput { binary: Utf8PathBuf, error: String }, #[error("Failed to read the file at: {path}.\n{error}")] - ReadFile { path: Utf8PathBuf, error: String }, + ReadFile { + path: Utf8PathBuf, + error: Box, + }, + #[error("Failed to write to the file at: {path}.\n{error}")] + WriteFile { + path: Utf8PathBuf, + error: Box, + }, #[error("Encountered {} while trying to build a supergraph.", .source.length_string())] Build { source: BuildErrors }, + #[error("Serialization error.\n{}", .0)] + SerdeYaml(#[from] serde_yaml::Error), } diff --git a/src/composition/pipeline.rs b/src/composition/pipeline.rs new file mode 100644 index 000000000..bf0327727 --- /dev/null +++ b/src/composition/pipeline.rs @@ -0,0 +1,271 @@ +use std::{env::current_dir, fmt::Debug, fs::canonicalize}; + +use apollo_federation_types::config::{FederationVersion, SupergraphConfig}; +use camino::Utf8PathBuf; +use rover_client::shared::GraphRef; +use tempfile::tempdir; + +use crate::{ + options::{LicenseAccepter, ProfileOpt}, + utils::{ + client::StudioClientConfig, + effect::{ + exec::ExecCommand, fetch_remote_subgraph::FetchRemoteSubgraph, + fetch_remote_subgraphs::FetchRemoteSubgraphs, install::InstallBinary, + introspect::IntrospectSubgraph, read_file::ReadFile, read_stdin::ReadStdin, + write_file::WriteFile, + }, + parsers::FileDescriptorType, + }, +}; + +use super::{ + runner::{CompositionRunner, Runner}, + supergraph::{ + binary::OutputTarget, + config::resolver::{ + LoadRemoteSubgraphsError, LoadSupergraphConfigError, ResolveSupergraphConfigError, + SupergraphConfigResolver, + }, + install::{InstallSupergraph, InstallSupergraphError}, + }, + CompositionError, CompositionSuccess, +}; + +#[derive(thiserror::Error, Debug)] +pub enum CompositionPipelineError { + #[error("Failed to load remote subgraphs.\n{}", .0)] + LoadRemoteSubgraphs(#[from] LoadRemoteSubgraphsError), + #[error("Failed to load the supergraph config.\n{}", .0)] + LoadSupergraphConfig(#[from] LoadSupergraphConfigError), + #[error("Failed to resolve the supergraph config.\n{}", .0)] + ResolveSupergraphConfig(#[from] ResolveSupergraphConfigError), + #[error("IO error.\n{}", .0)] + Io(#[from] std::io::Error), + #[error("Serialization error.\n{}", .0)] + SerdeYaml(#[from] serde_yaml::Error), + #[error("Error writing file: {}.\n{}", .path, .err)] + WriteFile { + path: Utf8PathBuf, + err: Box, + }, + #[error("Failed to install the supergraph binary.\n{}", .0)] + InstallSupergraph(#[from] InstallSupergraphError), +} + +pub struct CompositionPipeline { + state: State, +} + +impl Default for CompositionPipeline { + fn default() -> Self { + CompositionPipeline { state: state::Init } + } +} + +impl CompositionPipeline { + pub async fn init( + self, + read_stdin_impl: &mut impl ReadStdin, + fetch_remote_subgraphs_impl: &impl FetchRemoteSubgraphs, + supergraph_yaml: Option, + graph_ref: Option, + ) -> Result, CompositionPipelineError> + { + let supergraph_yaml = supergraph_yaml.and_then(|supergraph_yaml| match supergraph_yaml { + FileDescriptorType::File(file) => canonicalize(file) + .ok() + .map(|file| FileDescriptorType::File(Utf8PathBuf::from_path_buf(file).unwrap())), + FileDescriptorType::Stdin => Some(FileDescriptorType::Stdin), + }); + let supergraph_root = supergraph_yaml.clone().and_then(|file| match file { + FileDescriptorType::File(file) => { + let mut current_dir = current_dir().expect("Unable to get current directory path"); + + current_dir.push(file); + let path = Utf8PathBuf::from_path_buf(current_dir).unwrap(); + let parent = path.parent().unwrap().to_path_buf(); + Some(parent) + } + FileDescriptorType::Stdin => None, + }); + let resolver = SupergraphConfigResolver::default() + .load_remote_subgraphs(fetch_remote_subgraphs_impl, graph_ref.as_ref()) + .await? + .load_from_file_descriptor(read_stdin_impl, supergraph_yaml.as_ref())?; + Ok(CompositionPipeline { + state: state::ResolveFederationVersion { + resolver, + supergraph_root, + }, + }) + } +} + +impl CompositionPipeline { + pub async fn resolve_federation_version( + self, + introspect_subgraph_impl: &impl IntrospectSubgraph, + fetch_remote_subgraph_impl: &impl FetchRemoteSubgraph, + federation_version: Option, + ) -> Result, CompositionPipelineError> { + let fully_resolved_supergraph_config = self + .state + .resolver + .fully_resolve_subgraphs( + introspect_subgraph_impl, + fetch_remote_subgraph_impl, + self.state.supergraph_root.as_ref(), + ) + .await?; + let federation_version = federation_version.unwrap_or_else(|| { + fully_resolved_supergraph_config + .federation_version() + .clone() + }); + Ok(CompositionPipeline { + state: state::InstallSupergraph { + resolver: self.state.resolver, + supergraph_root: self.state.supergraph_root, + fully_resolved_supergraph_config, + federation_version, + }, + }) + } +} + +impl CompositionPipeline { + pub async fn install_supergraph_binary( + self, + studio_client_config: StudioClientConfig, + override_install_path: Option, + elv2_license_accepter: LicenseAccepter, + skip_update: bool, + ) -> Result, CompositionPipelineError> { + let supergraph_binary = + InstallSupergraph::new(self.state.federation_version, studio_client_config) + .install(override_install_path, elv2_license_accepter, skip_update) + .await?; + + Ok(CompositionPipeline { + state: state::Run { + resolver: self.state.resolver, + supergraph_root: self.state.supergraph_root, + fully_resolved_supergraph_config: self.state.fully_resolved_supergraph_config, + supergraph_binary, + }, + }) + } +} + +impl CompositionPipeline { + pub async fn compose( + &self, + exec_command_impl: &impl ExecCommand, + read_file_impl: &impl ReadFile, + write_file_impl: &impl WriteFile, + output_file: Option, + ) -> Result { + let supergraph_config_filepath = + Utf8PathBuf::from_path_buf(tempdir()?.path().join("supergraph.yaml")) + .expect("Unable to parse path"); + write_file_impl + .write_file( + &supergraph_config_filepath, + serde_yaml::to_string(&SupergraphConfig::from( + self.state.fully_resolved_supergraph_config.clone(), + ))? + .as_bytes(), + ) + .await + .map_err(|err| CompositionError::WriteFile { + path: supergraph_config_filepath.clone(), + error: Box::new(err), + })?; + + let result = self + .state + .supergraph_binary + .compose( + exec_command_impl, + read_file_impl, + &output_file + .map(OutputTarget::File) + .unwrap_or(OutputTarget::Stdout), + supergraph_config_filepath, + ) + .await?; + Ok(result) + } + + #[allow(clippy::too_many_arguments)] + pub async fn runner( + &self, + exec_command: ExecC, + read_file: ReadF, + write_file: WriteF, + profile: &ProfileOpt, + client_config: &StudioClientConfig, + introspection_polling_interval: u64, + output_dir: Utf8PathBuf, + ) -> Result, CompositionPipelineError> + where + ReadF: ReadFile + Debug + Eq + PartialEq + Send + Sync + 'static, + ExecC: ExecCommand + Debug + Eq + PartialEq + Send + Sync + 'static, + WriteF: WriteFile + Debug + Eq + PartialEq + Send + Sync + 'static, + { + let lazily_resolved_supergraph_config = self + .state + .resolver + .lazily_resolve_subgraphs(self.state.supergraph_root.as_ref()) + .await?; + let subgraphs = lazily_resolved_supergraph_config.subgraphs().clone(); + let runner = Runner::default() + .setup_subgraph_watchers( + subgraphs, + profile, + client_config, + introspection_polling_interval, + ) + .setup_supergraph_config_watcher(lazily_resolved_supergraph_config) + .setup_composition_watcher( + self.state.fully_resolved_supergraph_config.clone(), + self.state.supergraph_binary.clone(), + exec_command, + read_file, + write_file, + output_dir, + ); + Ok(runner) + } +} + +mod state { + use apollo_federation_types::config::FederationVersion; + use camino::Utf8PathBuf; + + use crate::composition::supergraph::{ + binary::SupergraphBinary, + config::{ + full::FullyResolvedSupergraphConfig, resolver::InitializedSupergraphConfigResolver, + }, + }; + + pub struct Init; + pub struct ResolveFederationVersion { + pub resolver: InitializedSupergraphConfigResolver, + pub supergraph_root: Option, + } + pub struct InstallSupergraph { + pub resolver: InitializedSupergraphConfigResolver, + pub supergraph_root: Option, + pub fully_resolved_supergraph_config: FullyResolvedSupergraphConfig, + pub federation_version: FederationVersion, + } + pub struct Run { + pub resolver: InitializedSupergraphConfigResolver, + pub supergraph_root: Option, + pub fully_resolved_supergraph_config: FullyResolvedSupergraphConfig, + pub supergraph_binary: SupergraphBinary, + } +} diff --git a/src/composition/runner/mod.rs b/src/composition/runner/mod.rs index e186aeb5d..f68942b98 100644 --- a/src/composition/runner/mod.rs +++ b/src/composition/runner/mod.rs @@ -3,35 +3,21 @@ #![warn(missing_docs)] -use std::{collections::BTreeMap, env::current_dir, fmt::Debug, io::stdin}; +use std::{collections::BTreeMap, fmt::Debug}; -//use std::{env::current_dir, fs::File, process::Command, str}; - -use anyhow::anyhow; -use apollo_federation_types::config::{FederationVersion, SupergraphConfig}; -use buildstructor::Builder; use camino::Utf8PathBuf; use futures::stream::{BoxStream, StreamExt}; -use rover_client::shared::GraphRef; -use rover_std::warnln; -use tempfile::tempdir; use crate::{ - command::supergraph::compose::CompositionOutput, - composition::{ - supergraph::install::InstallSupergraph, - watchers::watcher::{file::FileWatcher, supergraph_config::SupergraphConfigWatcher}, + composition::watchers::watcher::{ + file::FileWatcher, supergraph_config::SupergraphConfigWatcher, }, - options::{LicenseAccepter, ProfileOpt}, + options::ProfileOpt, subtask::{Subtask, SubtaskRunStream, SubtaskRunUnit}, utils::{ client::StudioClientConfig, - effect::{ - exec::ExecCommand, install::InstallBinary, read_file::ReadFile, write_file::WriteFile, - }, - parsers::FileDescriptorType, + effect::{exec::ExecCommand, read_file::ReadFile, write_file::WriteFile}, }, - RoverError, RoverResult, }; use self::state::SetupSubgraphWatchers; @@ -39,11 +25,10 @@ use self::state::SetupSubgraphWatchers; use super::{ events::CompositionEvent, supergraph::{ - binary::{OutputTarget, SupergraphBinary}, + binary::SupergraphBinary, config::{ - full::FullyResolvedSubgraphs, + full::FullyResolvedSupergraphConfig, lazy::{LazilyResolvedSubgraph, LazilyResolvedSupergraphConfig}, - resolver::SupergraphConfigResolver, }, }, watchers::{composition::CompositionWatcher, subgraphs::SubgraphWatchers}, @@ -66,140 +51,6 @@ pub struct Runner { state: State, } -/// Everything necessary to run composition once -#[derive(Builder)] -pub struct OneShotComposition { - override_install_path: Option, - federation_version: Option, - client_config: StudioClientConfig, - profile: ProfileOpt, - supergraph_yaml: Option, - output_file: Option, - graph_ref: Option, - elv2_license_accepter: LicenseAccepter, - skip_update: bool, -} - -impl OneShotComposition { - /// Runs composition - pub async fn compose( - self, - read_file_impl: &ReadF, - write_file_impl: &WriteF, - exec_command_impl: &Exec, - ) -> RoverResult { - let mut stdin = stdin(); - - let supergraph_root = self.supergraph_yaml.clone().and_then(|file| match file { - FileDescriptorType::File(file) => { - let mut current_dir = current_dir().expect("Unable to get current directory path"); - - current_dir.push(file); - let path = Utf8PathBuf::from_path_buf(current_dir).unwrap(); - let parent = path.parent().unwrap().to_path_buf(); - Some(parent) - } - FileDescriptorType::Stdin => None, - }); - - let studio_client = self - .client_config - .get_authenticated_client(&self.profile.clone())?; - - // Get a FullyResolvedSupergraphConfig from first loading in any remote subgraphs and then - // a local supergraph config (if present) and then combining them into a fully resolved - // supergraph config - let resolver = SupergraphConfigResolver::default() - .load_remote_subgraphs(&studio_client, self.graph_ref.as_ref()) - .await? - .load_from_file_descriptor(&mut stdin, self.supergraph_yaml.as_ref())? - .fully_resolve_subgraphs( - &self.client_config, - &studio_client, - supergraph_root.as_ref(), - ) - .await?; - - // We convert the FullyResolvedSupergraphConfig into a Supergraph because it makes using - // Serde easier (said differently: we're using the Federation-rs types here for - // compatability with Federation-rs tooling later on when we use their supergraph binary to - // actually run composition) - let supergraph_config: SupergraphConfig = resolver.clone().into(); - - // Convert the FullyResolvedSupergraphConfig to yaml before we save it - let supergraph_config_yaml = serde_yaml::to_string(&supergraph_config)?; - - // We're going to save to a temporary place because we don't actually need the supergraph - // config to stick around; we only need it on disk to point the supergraph binary at - let supergraph_config_filepath = - Utf8PathBuf::from_path_buf(tempdir()?.path().join("supergraph.yaml")) - .expect("Unable to parse path"); - - tracing::debug!("{:?}", supergraph_config_yaml); - - // Write the supergraph config to disk - write_file_impl - .write_file( - &supergraph_config_filepath, - supergraph_config_yaml.as_bytes(), - ) - .await?; - - // Use the CLI option for federation over the one we can read off of the supergraph config - // (but default to the one we can read off the supergraph config) - let fed_version = self - .federation_version - .as_ref() - .unwrap_or(resolver.federation_version()); - - // We care about the exact version of the federation version because certain options aren't - // available before 2.9.0 and we gate on that version below - let exact_version = fed_version - .get_exact() - // This should be impossible to get to because we convert to a FederationVersion a few - // lines above and so _should_ have an exact version - .ok_or(RoverError::new(anyhow!( - "failed to get exact Federation version" - )))?; - - // Making the output file mutable allows us to change it if we're using a version of the - // supergraph binary that can't write to file (ie, anything pre-2.9.0) - let mut output_file = self.output_file; - - // When the `--output` flag is used, we need a supergraph binary version that is at least - // v2.9.0. We ignore that flag for composition when we have anything less than that - if output_file.is_some() - && (exact_version.major < 2 || (exact_version.major == 2 && exact_version.minor < 9)) - { - warnln!("ignoring `--output` because it is not supported in this version of the dependent binary, `supergraph`: {}. Upgrade to Federation 2.9.0 or greater to install a version of the binary that supports it.", fed_version); - output_file = None; - } - - // Build the supergraph binary, paying special attention to the CLI options - let supergraph_binary = - InstallSupergraph::new(fed_version.clone(), self.client_config.clone()) - .install( - self.override_install_path, - self.elv2_license_accepter, - self.skip_update, - ) - .await?; - - let result = supergraph_binary - .compose( - exec_command_impl, - read_file_impl, - &output_file - .map(OutputTarget::File) - .unwrap_or(OutputTarget::Stdout), - supergraph_config_filepath, - ) - .await?; - - Ok(result.into()) - } -} - impl Default for Runner { fn default() -> Self { Runner { @@ -239,6 +90,14 @@ impl Runner { // events. // We could return None here if we received a supergraph config directly from stdin. In // that case, we don't want to configure a watcher. + tracing::info!( + "Setting up SupergraphConfigWatcher from origin: {}", + supergraph_config + .origin_path() + .as_ref() + .map(|x| x.to_string()) + .unwrap_or_default() + ); let supergraph_config_watcher = if let Some(origin_path) = supergraph_config.origin_path() { let f = FileWatcher::new(origin_path.clone()); let watcher = SupergraphConfigWatcher::new(f, supergraph_config); @@ -258,29 +117,27 @@ impl Runner { impl Runner { /// Configures the composition watcher #[allow(clippy::too_many_arguments)] - pub fn setup_composition_watcher( + pub fn setup_composition_watcher( self, - subgraphs: FullyResolvedSubgraphs, + supergraph_config: FullyResolvedSupergraphConfig, supergraph_binary: SupergraphBinary, exec_command: ExecC, read_file: ReadF, write_file: WriteF, - output_target: OutputTarget, temp_dir: Utf8PathBuf, - ) -> Runner> + ) -> Runner> where - ReadF: ReadFile + Debug + Eq + PartialEq + Send + Sync + 'static, ExecC: ExecCommand + Debug + Eq + PartialEq + Send + Sync + 'static, + ReadF: ReadFile + Debug + Eq + PartialEq + Send + Sync + 'static, WriteF: WriteFile + Debug + Eq + PartialEq + Send + Sync + 'static, { // Create a handler for supergraph composition events. let composition_watcher = CompositionWatcher::builder() - .subgraphs(subgraphs) + .supergraph_config(supergraph_config) .supergraph_binary(supergraph_binary) .exec_command(exec_command) .read_file(read_file) .write_file(write_file) - .output_target(output_target) .temp_dir(temp_dir) .build(); Runner { @@ -293,25 +150,35 @@ impl Runner { } } -impl Runner> +/// Alias for a [`Runner`] that is ready to be run +pub type CompositionRunner = Runner>; + +impl Runner> where - ReadF: ReadFile + Debug + Eq + PartialEq + Send + Sync + 'static, ExecC: ExecCommand + Debug + Eq + PartialEq + Send + Sync + 'static, + ReadF: ReadFile + Debug + Eq + PartialEq + Send + Sync + 'static, WriteF: WriteFile + Debug + Eq + PartialEq + Send + Sync + 'static, { /// Runs the [`Runner`] pub fn run(self) -> BoxStream<'static, CompositionEvent> { - let (supergraph_config_stream, supergraph_config_subtask) = - if let Some(supergraph_config_watcher) = self.state.supergraph_config_watcher { - let (supergraph_config_stream, supergraph_config_subtask) = - Subtask::new(supergraph_config_watcher); - ( - supergraph_config_stream.boxed(), - Some(supergraph_config_subtask), - ) - } else { - (tokio_stream::empty().boxed(), None) - }; + let (supergraph_config_stream, supergraph_config_subtask) = if let Some( + supergraph_config_watcher, + ) = + self.state.supergraph_config_watcher + { + tracing::info!("Watching subgraphs for changes..."); + let (supergraph_config_stream, supergraph_config_subtask) = + Subtask::new(supergraph_config_watcher); + ( + supergraph_config_stream.boxed(), + Some(supergraph_config_subtask), + ) + } else { + tracing::warn!( + "No supergraph config detected, changes to subgraph configurations will not be applied automatically" + ); + (tokio_stream::empty().boxed(), None) + }; let (subgraph_change_stream, subgraph_watcher_subtask) = Subtask::new(self.state.subgraph_watchers); diff --git a/src/composition/supergraph/binary.rs b/src/composition/supergraph/binary.rs index 3e5e7b8dd..4607c21ae 100644 --- a/src/composition/supergraph/binary.rs +++ b/src/composition/supergraph/binary.rs @@ -6,6 +6,7 @@ use apollo_federation_types::{ }; use buildstructor::Builder; use camino::Utf8PathBuf; +use rover_std::warnln; use tap::TapFallible; use crate::{ @@ -31,7 +32,7 @@ impl OutputTarget { if version.supports_output_flag() { OutputTarget::File(path) } else { - tracing::warn!("This version of supergraph does not support the `--output flag`. Defaulting to `stdout`"); + warnln!("ignoring `--output` because it is not supported in this version of the dependent binary, `supergraph`: {}. Upgrade to Federation 2.9.0 or greater to install a version of the binary that supports it.", version); OutputTarget::Stdout } } @@ -59,8 +60,7 @@ impl From for CompositionError { } } -#[derive(Builder, Debug, Clone)] -#[cfg_attr(test, derive(derive_getters::Getters))] +#[derive(Builder, Debug, Clone, derive_getters::Getters)] pub struct SupergraphBinary { exe: Utf8PathBuf, version: SupergraphVersion, @@ -119,7 +119,7 @@ impl SupergraphBinary { .await .map_err(|err| CompositionError::ReadFile { path: path.clone(), - error: format!("{:?}", err), + error: Box::new(err), })? } OutputTarget::Stdout => std::str::from_utf8(&output.stdout) diff --git a/src/composition/supergraph/config/federation.rs b/src/composition/supergraph/config/federation.rs index 40b1d4770..9a1b4324a 100644 --- a/src/composition/supergraph/config/federation.rs +++ b/src/composition/supergraph/config/federation.rs @@ -194,7 +194,6 @@ mod tests { FullyResolvedSubgraph::builder() .schema(subgraph_scenario.sdl) .and_routing_url(subgraph_scenario.unresolved_subgraph.routing_url().clone()) - .is_fed_two(false) .build(), )]; let federation_version = federation_version_resolver @@ -228,7 +227,6 @@ mod tests { FullyResolvedSubgraph::builder() .schema(subgraph_scenario.sdl) .and_routing_url(subgraph_scenario.unresolved_subgraph.routing_url().clone()) - .is_fed_two(false) .build(), )]; let federation_version = federation_version_resolver @@ -261,7 +259,6 @@ mod tests { FullyResolvedSubgraph::builder() .schema(subgraph_scenario.sdl) .and_routing_url(subgraph_scenario.unresolved_subgraph.routing_url().clone()) - .is_fed_two(true) .build(), )]; let federation_version = federation_version_resolver diff --git a/src/composition/supergraph/config/full/subgraph.rs b/src/composition/supergraph/config/full/subgraph.rs index 6a9d48c22..298b72648 100644 --- a/src/composition/supergraph/config/full/subgraph.rs +++ b/src/composition/supergraph/config/full/subgraph.rs @@ -20,7 +20,6 @@ use crate::{ pub struct FullyResolvedSubgraph { #[getter(skip)] routing_url: Option, - #[getter(skip)] schema: String, is_fed_two: bool, } @@ -29,15 +28,12 @@ pub struct FullyResolvedSubgraph { impl FullyResolvedSubgraph { /// Hook for [`buildstructor::buildstructor`]'s builder pattern to create a [`FullyResolvedSubgraph`] #[builder] - pub fn new( - schema: String, - routing_url: Option, - is_fed_two: Option, - ) -> FullyResolvedSubgraph { + pub fn new(schema: String, routing_url: Option) -> FullyResolvedSubgraph { + let is_fed_two = schema_contains_link_directive(&schema); FullyResolvedSubgraph { schema, routing_url, - is_fed_two: is_fed_two.unwrap_or_default(), + is_fed_two, } } /// Resolves a [`UnresolvedSubgraph`] to a [`FullyResolvedSubgraph`] @@ -54,12 +50,10 @@ impl FullyResolvedSubgraph { let file = unresolved_subgraph.resolve_file_path(supergraph_config_root, file)?; let schema = Fs::read_file(&file).map_err(|err| ResolveSubgraphError::Fs(Box::new(err)))?; - let is_fed_two = schema_contains_link_directive(&schema); - Ok(FullyResolvedSubgraph { - routing_url: unresolved_subgraph.routing_url().clone(), - schema, - is_fed_two, - }) + Ok(FullyResolvedSubgraph::builder() + .and_routing_url(unresolved_subgraph.routing_url().clone()) + .schema(schema) + .build()) } SchemaSource::SubgraphIntrospection { subgraph_url, @@ -79,12 +73,10 @@ impl FullyResolvedSubgraph { .routing_url() .clone() .or_else(|| Some(subgraph_url.to_string())); - let is_fed_two = schema_contains_link_directive(&schema); - Ok(FullyResolvedSubgraph { - routing_url, - schema, - is_fed_two, - }) + Ok(FullyResolvedSubgraph::builder() + .and_routing_url(routing_url) + .schema(schema) + .build()) } SchemaSource::Subgraph { graphref: graph_ref, @@ -104,26 +96,27 @@ impl FullyResolvedSubgraph { source: Box::new(err), })?; let schema = remote_subgraph.schema().clone(); - let is_fed_two = schema_contains_link_directive(&schema); - Ok(FullyResolvedSubgraph { - routing_url: unresolved_subgraph - .routing_url() - .clone() - .or(Some(remote_subgraph.routing_url().to_string())), - schema, - is_fed_two, - }) - } - SchemaSource::Sdl { sdl } => { - let is_fed_two = schema_contains_link_directive(sdl); - Ok(FullyResolvedSubgraph { - routing_url: unresolved_subgraph.routing_url().clone(), - schema: sdl.to_string(), - is_fed_two, - }) + Ok(FullyResolvedSubgraph::builder() + .routing_url( + unresolved_subgraph + .routing_url() + .clone() + .unwrap_or_else(|| remote_subgraph.routing_url().to_string()), + ) + .schema(schema) + .build()) } + SchemaSource::Sdl { sdl } => Ok(FullyResolvedSubgraph::builder() + .and_routing_url(unresolved_subgraph.routing_url().clone()) + .schema(sdl.to_string()) + .build()), } } + + /// Mutably updates this subgraph's schema + pub fn update_schema(&mut self, schema: String) { + self.schema = schema; + } } impl From for SubgraphConfig { diff --git a/src/composition/supergraph/config/full/subgraphs.rs b/src/composition/supergraph/config/full/subgraphs.rs index 0a9bc6fd3..496f7454f 100644 --- a/src/composition/supergraph/config/full/subgraphs.rs +++ b/src/composition/supergraph/config/full/subgraphs.rs @@ -3,6 +3,8 @@ use std::collections::BTreeMap; use apollo_federation_types::config::{SchemaSource, SubgraphConfig, SupergraphConfig}; use thiserror::Error; +use super::FullyResolvedSupergraphConfig; + /// Error that occurs when a subgraph schema source is invalid #[derive(Error, Debug)] #[error("Invalid schema source: {:?}", .schema_source)] @@ -71,3 +73,15 @@ impl From for SupergraphConfig { SupergraphConfig::new(subgraphs, None) } } + +impl From for FullyResolvedSubgraphs { + fn from(value: FullyResolvedSupergraphConfig) -> Self { + let subgraphs = value + .subgraphs() + .clone() + .into_iter() + .map(|(name, subgraph)| (name, subgraph.schema().clone())) + .collect(); + FullyResolvedSubgraphs { subgraphs } + } +} diff --git a/src/composition/supergraph/config/full/supergraph.rs b/src/composition/supergraph/config/full/supergraph.rs index 2e1a8932a..209cd84d3 100644 --- a/src/composition/supergraph/config/full/supergraph.rs +++ b/src/composition/supergraph/config/full/supergraph.rs @@ -19,6 +19,7 @@ use super::FullyResolvedSubgraph; /// Represents a [`SupergraphConfig`] that has a known [`FederationVersion`] and /// its subgraph [`SchemaSource`]s reduced to [`SchemaSource::Sdl`] #[derive(Clone, Debug, Eq, PartialEq, Getters)] +#[cfg_attr(test, derive(buildstructor::Builder))] pub struct FullyResolvedSupergraphConfig { origin_path: Option, subgraphs: BTreeMap, @@ -67,6 +68,8 @@ impl FullyResolvedSupergraphConfig { let subgraphs = BTreeMap::from_iter(subgraphs); let federation_version = unresolved_supergraph_config .federation_version_resolver() + .clone() + .ok_or_else(|| ResolveSupergraphConfigError::MissingFederationVersionResolver)? .resolve(subgraphs.iter())?; Ok(FullyResolvedSupergraphConfig { origin_path: unresolved_supergraph_config.origin_path().clone(), @@ -77,4 +80,14 @@ impl FullyResolvedSupergraphConfig { Err(ResolveSupergraphConfigError::ResolveSubgraphs(errors)) } } + + /// Updates the subgraph with the provided name using the provided schema + pub fn update_subgraph_schema(&mut self, name: String, subgraph: FullyResolvedSubgraph) { + self.subgraphs.insert(name, subgraph); + } + + /// Removes the subgraph with the name provided + pub fn remove_subgraph(&mut self, name: &str) { + self.subgraphs.remove(name); + } } diff --git a/src/composition/supergraph/config/lazy/supergraph.rs b/src/composition/supergraph/config/lazy/supergraph.rs index 722e86751..846c5737d 100644 --- a/src/composition/supergraph/config/lazy/supergraph.rs +++ b/src/composition/supergraph/config/lazy/supergraph.rs @@ -29,15 +29,19 @@ impl LazilyResolvedSupergraphConfig { supergraph_config_root: &Utf8PathBuf, unresolved_supergraph_config: UnresolvedSupergraphConfig, ) -> Result> { - let subgraphs = stream::iter(unresolved_supergraph_config.subgraphs().iter().map( - |(name, unresolved_subgraph)| async { - let result = LazilyResolvedSubgraph::resolve( - supergraph_config_root, - unresolved_subgraph.clone(), - )?; - Ok((name.to_string(), result)) - }, - )) + let subgraphs = stream::iter( + unresolved_supergraph_config + .subgraphs() + .clone() + .into_iter() + .map(|(name, unresolved_subgraph)| async move { + let result = LazilyResolvedSubgraph::resolve( + supergraph_config_root, + unresolved_subgraph.clone(), + )?; + Ok((name.to_string(), result)) + }), + ) .buffer_unordered(50) .collect::>>() .await; diff --git a/src/composition/supergraph/config/resolver/mod.rs b/src/composition/supergraph/config/resolver/mod.rs index 0614dfd0c..42bcacb1a 100644 --- a/src/composition/supergraph/config/resolver/mod.rs +++ b/src/composition/supergraph/config/resolver/mod.rs @@ -201,8 +201,16 @@ pub enum ResolveSupergraphConfigError { /// subgraphs use the `@link` directive, which requires Federation 2 #[error(transparent)] FederationVersionMismatch(#[from] FederationVersionMismatch), + /// Occurs when a `FederationVersionResolver` was not supplied to an `UnresolvedSupergraphConfig` + /// and federation version resolution was attempted + #[error("Unable to resolve federation version")] + MissingFederationVersionResolver, } +/// Public alias for [`SupergraphConfigResolver`] +/// This state of [`SupergraphConfigResolver`] is ready to resolve subgraphs fully or lazily +pub type InitializedSupergraphConfigResolver = SupergraphConfigResolver; + impl SupergraphConfigResolver { /// Fully resolves the subgraph configurations in the supergraph config file to their SDLs pub async fn fully_resolve_subgraphs( @@ -234,10 +242,17 @@ impl SupergraphConfigResolver { /// config is piped through stdin pub async fn lazily_resolve_subgraphs( &self, - supergraph_config_root: &Utf8PathBuf, + supergraph_config_root: Option<&Utf8PathBuf>, ) -> Result { + let supergraph_config_root = supergraph_config_root.ok_or_else(|| { + ResolveSupergraphConfigError::ResolveSubgraphs(vec![ + ResolveSubgraphError::SupergraphConfigMissing, + ]) + })?; + if !self.state.subgraphs.is_empty() { let unresolved_supergraph_config = UnresolvedSupergraphConfig::builder() + .and_origin_path(self.state.origin_path.clone()) .subgraphs(self.state.subgraphs.clone()) .federation_version_resolver(self.state.federation_version_resolver.clone()) .build(); diff --git a/src/composition/supergraph/config/unresolved/supergraph.rs b/src/composition/supergraph/config/unresolved/supergraph.rs index 3333685ae..f8cabac0d 100644 --- a/src/composition/supergraph/config/unresolved/supergraph.rs +++ b/src/composition/supergraph/config/unresolved/supergraph.rs @@ -15,7 +15,7 @@ use super::UnresolvedSubgraph; pub struct UnresolvedSupergraphConfig { origin_path: Option, subgraphs: BTreeMap, - federation_version_resolver: FederationVersionResolverFromSubgraphs, + federation_version_resolver: Option, } #[buildstructor] @@ -25,7 +25,7 @@ impl UnresolvedSupergraphConfig { pub fn new( origin_path: Option, subgraphs: BTreeMap, - federation_version_resolver: FederationVersionResolverFromSubgraphs, + federation_version_resolver: Option, ) -> UnresolvedSupergraphConfig { let subgraphs = BTreeMap::from_iter( subgraphs @@ -41,7 +41,9 @@ impl UnresolvedSupergraphConfig { /// Provides the target federation version provided by the user pub fn target_federation_version(&self) -> Option { - self.federation_version_resolver.target_federation_version() + self.federation_version_resolver + .clone() + .and_then(|resolver| resolver.target_federation_version()) } } @@ -287,9 +289,9 @@ mod tests { let unresolved_supergraph_config = UnresolvedSupergraphConfig { origin_path: None, subgraphs: unresolved_subgraphs, - federation_version_resolver: FederationVersionResolverFromSubgraphs::new( + federation_version_resolver: Some(FederationVersionResolverFromSubgraphs::new( target_federation_version, - ), + )), }; let RemoteSubgraphScenario { @@ -361,11 +363,6 @@ mod tests { sdl_subgraph_name.clone(), FullyResolvedSubgraph::builder() .schema(sdl_subgraph_scenario.sdl.clone()) - .is_fed_two( - sdl_subgraph_scenario - .subgraph_federation_version - .is_fed_two(), - ) .build(), ), ( @@ -373,11 +370,6 @@ mod tests { FullyResolvedSubgraph::builder() .routing_url(file_subgraph_scenario.routing_url.clone()) .schema(file_subgraph_scenario.sdl.clone()) - .is_fed_two( - file_subgraph_scenario - .subgraph_federation_version - .is_fed_two(), - ) .build(), ), ( @@ -385,11 +377,6 @@ mod tests { FullyResolvedSubgraph::builder() .routing_url(remote_subgraph_routing_url.clone()) .schema(remote_subgraph_scenario.sdl.clone()) - .is_fed_two( - remote_subgraph_scenario - .subgraph_federation_version - .is_fed_two(), - ) .build(), ), ( @@ -397,11 +384,6 @@ mod tests { FullyResolvedSubgraph::builder() .routing_url(introspect_subgraph_routing_url.clone()) .schema(introspect_subgraph_scenario.sdl.clone()) - .is_fed_two( - introspect_subgraph_scenario - .subgraph_federation_version - .is_fed_two(), - ) .build(), ), ]); @@ -494,9 +476,9 @@ mod tests { let unresolved_supergraph_config = UnresolvedSupergraphConfig { origin_path: None, subgraphs: unresolved_subgraphs, - federation_version_resolver: FederationVersionResolverFromSubgraphs::new(Some( + federation_version_resolver: Some(FederationVersionResolverFromSubgraphs::new(Some( target_federation_version.clone(), - )), + ))), }; let RemoteSubgraphScenario { @@ -640,7 +622,7 @@ mod tests { let unresolved_supergraph_config = UnresolvedSupergraphConfig { origin_path: Some(supergraph_config_origin_path), subgraphs: unresolved_subgraphs, - federation_version_resolver: FederationVersionResolverFromSubgraphs::new(None), + federation_version_resolver: Some(FederationVersionResolverFromSubgraphs::new(None)), }; let result = LazilyResolvedSupergraphConfig::resolve( diff --git a/src/composition/watchers/composition.rs b/src/composition/watchers/composition.rs index 1fd0e41b3..73853c850 100644 --- a/src/composition/watchers/composition.rs +++ b/src/composition/watchers/composition.rs @@ -12,7 +12,7 @@ use crate::{ events::CompositionEvent, supergraph::{ binary::{OutputTarget, SupergraphBinary}, - config::full::FullyResolvedSubgraphs, + config::full::FullyResolvedSupergraphConfig, }, watchers::subgraphs::SubgraphEvent, }, @@ -21,20 +21,19 @@ use crate::{ }; #[derive(Builder, Debug)] -pub struct CompositionWatcher { - subgraphs: FullyResolvedSubgraphs, +pub struct CompositionWatcher { + supergraph_config: FullyResolvedSupergraphConfig, supergraph_binary: SupergraphBinary, - output_target: OutputTarget, exec_command: ExecC, read_file: ReadF, write_file: WriteF, temp_dir: Utf8PathBuf, } -impl SubtaskHandleStream for CompositionWatcher +impl SubtaskHandleStream for CompositionWatcher where - ReadF: ReadFile + Send + Sync + 'static, ExecC: ExecCommand + Send + Sync + 'static, + ReadF: ReadFile + Send + Sync + 'static, WriteF: WriteFile + Send + Sync + 'static, { type Input = SubgraphEvent; @@ -46,23 +45,27 @@ where mut input: BoxStream<'static, Self::Input>, ) -> AbortHandle { tokio::task::spawn({ - let mut subgraphs = self.subgraphs.clone(); + let mut supergraph_config = self.supergraph_config.clone(); let target_file = self.temp_dir.join("supergraph.yaml"); async move { while let Some(event) = input.next().await { match event { SubgraphEvent::SubgraphChanged(subgraph_schema_changed) => { let name = subgraph_schema_changed.name(); - let sdl = subgraph_schema_changed.sdl(); - subgraphs.upsert_subgraph(name.to_string(), sdl.to_string()); + tracing::info!("Schema change detected for subgraph: {}", name); + supergraph_config.update_subgraph_schema( + name.to_string(), + subgraph_schema_changed.into(), + ); } SubgraphEvent::SubgraphRemoved(subgraph_removed) => { let name = subgraph_removed.name(); - subgraphs.remove_subgraph(name); + tracing::info!("Subgraph removed: {}", name); + supergraph_config.remove_subgraph(name); } } - let supergraph_config = SupergraphConfig::from(subgraphs.clone()); + let supergraph_config = SupergraphConfig::from(supergraph_config.clone()); let supergraph_config_yaml = serde_yaml::to_string(&supergraph_config); let supergraph_config_yaml = match supergraph_config_yaml { @@ -94,7 +97,7 @@ where .compose( &self.exec_command, &self.read_file, - &self.output_target, + &OutputTarget::Stdout, target_file.clone(), ) .await; @@ -143,8 +146,7 @@ mod tests { composition::{ events::CompositionEvent, supergraph::{ - binary::{OutputTarget, SupergraphBinary}, - config::full::FullyResolvedSubgraphs, + binary::SupergraphBinary, config::full::FullyResolvedSupergraphConfig, version::SupergraphVersion, }, test::{default_composition_json, default_composition_success}, @@ -170,8 +172,13 @@ mod tests { let temp_dir = assert_fs::TempDir::new()?; let temp_dir_path = Utf8PathBuf::from_path_buf(temp_dir.to_path_buf()).unwrap(); - let subgraphs = FullyResolvedSubgraphs::new(BTreeMap::new()); - let supergraph_version = SupergraphVersion::new(Version::from_str("2.8.0").unwrap()); + let federation_version = Version::from_str("2.8.0").unwrap(); + + let subgraphs = FullyResolvedSupergraphConfig::builder() + .subgraphs(BTreeMap::new()) + .federation_version(FederationVersion::ExactFedTwo(federation_version.clone())) + .build(); + let supergraph_version = SupergraphVersion::new(federation_version.clone()); let supergraph_binary = SupergraphBinary::builder() .version(supergraph_version) @@ -200,13 +207,13 @@ mod tests { indoc::indoc! { r#"subgraphs: {}: - routing_url: null + routing_url: https://example.com schema: sdl: '{}' - federation_version: null + federation_version: ={} "# }, - subgraph_name, subgraph_sdl + subgraph_name, subgraph_sdl, federation_version ); let expected_supergraph_sdl_bytes = expected_supergraph_sdl.into_bytes(); @@ -221,17 +228,20 @@ mod tests { .returning(|_, _| Ok(())); let composition_handler = CompositionWatcher::builder() - .subgraphs(subgraphs) + .supergraph_config(subgraphs) .supergraph_binary(supergraph_binary) .exec_command(mock_exec) .read_file(mock_read_file) .write_file(mock_write_file) .temp_dir(temp_dir_path) - .output_target(OutputTarget::Stdout) .build(); let subgraph_change_events: BoxStream = once(async { - SubgraphEvent::SubgraphChanged(SubgraphSchemaChanged::new(subgraph_name, subgraph_sdl)) + SubgraphEvent::SubgraphChanged(SubgraphSchemaChanged::new( + subgraph_name, + subgraph_sdl, + Some("https://example.com".to_string()), + )) }) .boxed(); let (mut composition_messages, composition_subtask) = Subtask::new(composition_handler); @@ -241,16 +251,21 @@ mod tests { let next_message = composition_messages.next().await; assert_that!(next_message) .is_some() - .is_equal_to(CompositionEvent::Started); + .matches(|event| matches!(event, CompositionEvent::Started)); // Assert we get the expected final composition event. if !composition_error { let next_message = composition_messages.next().await; - assert_that!(next_message) - .is_some() - .is_equal_to(CompositionEvent::Success(default_composition_success( - FederationVersion::ExactFedTwo(Version::from_str("2.8.0")?), - ))); + assert_that!(next_message).is_some().matches(|event| { + if let CompositionEvent::Success(success) = event { + success + == &default_composition_success(FederationVersion::ExactFedTwo( + Version::from_str("2.8.0").unwrap(), + )) + } else { + false + } + }); } else { assert!(matches!( composition_messages.next().await.unwrap(), diff --git a/src/composition/watchers/subgraphs.rs b/src/composition/watchers/subgraphs.rs index 58630b848..64f76c8a7 100644 --- a/src/composition/watchers/subgraphs.rs +++ b/src/composition/watchers/subgraphs.rs @@ -7,7 +7,7 @@ use tokio::{sync::mpsc::UnboundedSender, task::AbortHandle}; use tokio_stream::{wrappers::UnboundedReceiverStream, StreamExt}; use crate::{ - composition::supergraph::config::lazy::LazilyResolvedSubgraph, + composition::supergraph::config::{full::FullyResolvedSubgraph, lazy::LazilyResolvedSubgraph}, options::ProfileOpt, subtask::{Subtask, SubtaskHandleStream, SubtaskRunUnit}, utils::client::StudioClientConfig, @@ -46,6 +46,7 @@ impl SubgraphWatchers { .filter_map(|(name, resolved_subgraph)| { let subgraph_config = SubgraphConfig::from(resolved_subgraph); SubgraphWatcher::from_schema_source( + subgraph_config.routing_url, subgraph_config.schema, profile, client_config, @@ -83,12 +84,26 @@ pub struct SubgraphSchemaChanged { name: String, /// SDL with changes sdl: String, + routing_url: Option, } impl SubgraphSchemaChanged { #[cfg(test)] - pub fn new(name: String, sdl: String) -> SubgraphSchemaChanged { - SubgraphSchemaChanged { name, sdl } + pub fn new(name: String, sdl: String, routing_url: Option) -> SubgraphSchemaChanged { + SubgraphSchemaChanged { + name, + sdl, + routing_url, + } + } +} + +impl From for FullyResolvedSubgraph { + fn from(value: SubgraphSchemaChanged) -> Self { + FullyResolvedSubgraph::builder() + .schema(value.sdl) + .and_routing_url(value.routing_url) + .build() } } @@ -118,12 +133,16 @@ impl SubtaskHandleStream for SubgraphWatchers { for (subgraph_name, (mut messages, subtask)) in self.watchers.into_iter() { let sender = sender.clone(); let subgraph_name_c = subgraph_name.clone(); + let routing_url = subtask.inner().routing_url().clone(); let messages_abort_handle = tokio::task::spawn(async move { while let Some(change) = messages.next().await { + let routing_url = routing_url.clone(); + tracing::info!("Subgraph change detected: {:?}", change); let _ = sender .send(SubgraphEvent::SubgraphChanged(SubgraphSchemaChanged { name: subgraph_name_c.clone(), sdl: change.sdl().to_string(), + routing_url, })) .tap_err(|err| tracing::error!("{:?}", err)); } @@ -139,6 +158,7 @@ impl SubtaskHandleStream for SubgraphWatchers { // Adding the abort handle to the currentl collection of handles. for (subgraph_name, subgraph_config) in diff.added() { if let Ok(subgraph_watcher) = SubgraphWatcher::from_schema_source( + subgraph_config.routing_url.clone(), subgraph_config.schema.clone(), &self.profile, &self.client_config, @@ -169,6 +189,7 @@ impl SubtaskHandleStream for SubgraphWatchers { SubgraphSchemaChanged { name: subgraph_name.to_string(), sdl, + routing_url: subgraph_watcher.routing_url().clone(), }, )) .tap_err(|err| tracing::error!("{:?}", err)); @@ -181,13 +202,16 @@ impl SubtaskHandleStream for SubgraphWatchers { let sender = sender.clone(); let subgraph_name_c = subgraph_name.clone(); + let routing_url = subtask.inner().routing_url().clone(); let messages_abort_handle = tokio::spawn(async move { while let Some(change) = messages.next().await { + let routing_url = routing_url.clone(); let _ = sender .send(SubgraphEvent::SubgraphChanged( SubgraphSchemaChanged { name: subgraph_name_c.to_string(), sdl: change.sdl().to_string(), + routing_url, }, )) .tap_err(|err| tracing::error!("{:?}", err)); @@ -204,7 +228,9 @@ impl SubtaskHandleStream for SubgraphWatchers { } for (name, subgraph_config) in diff.changed() { + eprintln!("Change detected for subgraph: `{}`", name); if let Ok(watcher) = SubgraphWatcher::from_schema_source( + subgraph_config.routing_url.clone(), subgraph_config.schema.clone(), &self.profile, &self.client_config, @@ -225,6 +251,7 @@ impl SubtaskHandleStream for SubgraphWatchers { SubgraphSchemaChanged { name: name.to_string(), sdl, + routing_url: watcher.routing_url().clone(), }, )) .tap_err(|err| tracing::error!("{:?}", err)); diff --git a/src/composition/watchers/watcher/file.rs b/src/composition/watchers/watcher/file.rs index c6488f5af..ecf1e8e96 100644 --- a/src/composition/watchers/watcher/file.rs +++ b/src/composition/watchers/watcher/file.rs @@ -1,4 +1,5 @@ use camino::Utf8PathBuf; +use derive_getters::Getters; use futures::{stream::BoxStream, StreamExt}; use rover_std::{errln, Fs, RoverStdError}; use tap::TapFallible; @@ -6,7 +7,7 @@ use tokio::sync::mpsc::unbounded_channel; use tokio_stream::wrappers::UnboundedReceiverStream; /// File watcher specifically for files related to composition -#[derive(Clone, Debug)] +#[derive(Clone, Debug, Getters)] pub struct FileWatcher { /// The filepath to watch path: Utf8PathBuf, diff --git a/src/composition/watchers/watcher/subgraph.rs b/src/composition/watchers/watcher/subgraph.rs index 469c50f6f..85fd95631 100644 --- a/src/composition/watchers/watcher/subgraph.rs +++ b/src/composition/watchers/watcher/subgraph.rs @@ -1,7 +1,5 @@ -use std::{marker::Send, pin::Pin}; - use apollo_federation_types::config::SchemaSource; -use futures::{Stream, StreamExt}; +use futures::{stream::BoxStream, StreamExt}; use tap::TapFallible; use tokio::{sync::mpsc::UnboundedSender, task::AbortHandle}; @@ -24,6 +22,7 @@ pub struct UnsupportedSchemaSource(SchemaSource); pub struct SubgraphWatcher { /// The kind of watcher used (eg, file, introspection) watcher: SubgraphWatcherKind, + routing_url: Option, } #[derive(Debug, Clone)] @@ -60,6 +59,7 @@ pub enum SubgraphWatcherKind { impl SubgraphWatcher { /// Derive the right SubgraphWatcher (ie, File, Introspection) from the federation-rs SchemaSource pub fn from_schema_source( + routing_url: Option, schema_source: SchemaSource, profile: &ProfileOpt, client_config: &StudioClientConfig, @@ -70,6 +70,7 @@ impl SubgraphWatcher { match schema_source { SchemaSource::File { file } => Ok(Self { watcher: SubgraphWatcherKind::File(FileWatcher::new(file)), + routing_url, }), SchemaSource::SubgraphIntrospection { subgraph_url, @@ -81,14 +82,17 @@ impl SubgraphWatcher { client_config, introspection_polling_interval, )), + routing_url, }), SchemaSource::Subgraph { graphref, subgraph } => Ok(Self { watcher: SubgraphWatcherKind::Once(NonRepeatingFetch::RemoteSchema( RemoteSchema::new(graphref, subgraph, profile, client_config), )), + routing_url, }), SchemaSource::Sdl { sdl } => Ok(Self { watcher: SubgraphWatcherKind::Once(NonRepeatingFetch::Sdl(Sdl::new(sdl))), + routing_url, }), } } @@ -99,11 +103,14 @@ impl SubgraphWatcherKind { /// /// Development note: this is a stream of Strings, but in the future we might want something /// more flexible to get type safety. - async fn watch(&self) -> Pin + Send>> { + fn watch(&self) -> Option> { match self { - Self::File(file_watcher) => file_watcher.clone().watch(), - Self::Introspect(introspection) => introspection.watch(), - kind => unimplemented!("{kind:?} is not a watcher"), + Self::File(file_watcher) => Some(file_watcher.clone().watch()), + Self::Introspect(introspection) => Some(introspection.watch()), + kind => { + tracing::debug!("{kind:?} is not watchable. Skipping"); + None + } } } } @@ -120,11 +127,13 @@ impl SubtaskHandleUnit for SubgraphWatcher { fn handle(self, sender: UnboundedSender) -> AbortHandle { tokio::spawn(async move { - let mut watcher = self.watcher.watch().await; - while let Some(sdl) = watcher.next().await { - let _ = sender - .send(WatchedSdlChange { sdl }) - .tap_err(|err| tracing::error!("{:?}", err)); + let watcher = self.watcher.watch(); + if let Some(mut watcher) = watcher { + while let Some(sdl) = watcher.next().await { + let _ = sender + .send(WatchedSdlChange { sdl }) + .tap_err(|err| tracing::error!("{:?}", err)); + } } }) .abort_handle() diff --git a/src/composition/watchers/watcher/supergraph_config.rs b/src/composition/watchers/watcher/supergraph_config.rs index ae08adc88..75971e0a6 100644 --- a/src/composition/watchers/watcher/supergraph_config.rs +++ b/src/composition/watchers/watcher/supergraph_config.rs @@ -8,7 +8,9 @@ use tap::TapFallible; use tokio::{sync::mpsc::UnboundedSender, task::AbortHandle}; use crate::{ - composition::supergraph::config::lazy::LazilyResolvedSupergraphConfig, + composition::supergraph::config::{ + lazy::LazilyResolvedSupergraphConfig, unresolved::UnresolvedSupergraphConfig, + }, subtask::SubtaskHandleUnit, }; @@ -36,33 +38,72 @@ impl SubtaskHandleUnit for SupergraphConfigWatcher { type Output = SupergraphConfigDiff; fn handle(self, sender: UnboundedSender) -> AbortHandle { - tokio::spawn(async move { + let supergraph_config_path = self.file_watcher.path().clone(); + tokio::spawn({ + async move { + let supergraph_config_path = supergraph_config_path.clone(); let mut latest_supergraph_config = self.supergraph_config.clone(); - while let Some(contents) = self.file_watcher.clone().watch().next().await { + let file_watcher = self.file_watcher.clone(); + while let Some(contents) = file_watcher.clone().watch().next().await { + eprintln!("{} changed. Applying changes to the session.", supergraph_config_path); + tracing::info!( + "{} changed. Parsing it as a `SupergraphConfig`", + supergraph_config_path + ); match SupergraphConfig::new_from_yaml(&contents) { Ok(supergraph_config) => { - if let Ok(supergraph_config_diff) = SupergraphConfigDiff::new( - &latest_supergraph_config, - supergraph_config.clone(), - ) { - let _ = sender - .send(supergraph_config_diff) - .tap_err(|err| tracing::error!("{:?}", err)); + let subgraphs = BTreeMap::from_iter(supergraph_config.clone().into_iter()); + let unresolved_supergraph_config = UnresolvedSupergraphConfig::builder() + .origin_path(supergraph_config_path.clone()) + .subgraphs(subgraphs) + .build(); + let supergraph_config = LazilyResolvedSupergraphConfig::resolve( + &supergraph_config_path.parent().unwrap().to_path_buf(), + unresolved_supergraph_config, + ) + .await.map(SupergraphConfig::from); + + match supergraph_config { + Ok(supergraph_config) => { + let supergraph_config_diff = SupergraphConfigDiff::new( + &latest_supergraph_config, + supergraph_config.clone(), + ); + match supergraph_config_diff { + Ok(supergraph_config_diff) => { + let _ = sender + .send(supergraph_config_diff) + .tap_err(|err| tracing::error!("{:?}", err)); + } + Err(err) => { + tracing::error!("Failed to construct a diff between the current and previous `SupergraphConfig`s.\n{}", err); + } + } + + latest_supergraph_config = supergraph_config; + } + Err(err) => { + errln!( + "Failed to lazily resolve the supergraph config at {}.\n{}", + self.file_watcher.path(), + itertools::join(err, "\n") + ); + } } - latest_supergraph_config = supergraph_config; } Err(err) => { tracing::error!("could not parse supergraph config file: {:?}", err); - errln!("could not parse supergraph config file: {:?}", err); + errln!("Could not parse supergraph config file.\n{}", err); } } } + } }) .abort_handle() } } -#[derive(Getters)] +#[derive(Getters, Debug)] pub struct SupergraphConfigDiff { added: Vec<(String, SubgraphConfig)>, changed: Vec<(String, SubgraphConfig)>, @@ -100,28 +141,34 @@ impl SupergraphConfigDiff { // Filter the added and removed subgraphs from the new supergraph config. let added = new_subgraphs + .clone() .into_iter() .filter(|(name, _)| added_names.contains(name)) .collect::>(); let removed = removed_names.into_iter().cloned().collect::>(); // Find any in-place changes (eg, SDL, SchemaSource::Subgraph) - let mut changed = vec![]; - for (old_name, old_config) in old.clone().into_iter() { - // Exclude any removed subgraphs - if !removed.contains(&old_name) { - let found_new = new - .clone() - .into_iter() - .find(|(sub_name, _sub_config)| *sub_name == old_name); - - if let Some((old_name, new_config)) = found_new { - if new_config != old_config { - changed.push((old_name, new_config)); + let changed = old + .clone() + .into_iter() + .filter(|(old_name, _)| !removed.contains(old_name)) + .filter_map(|(old_name, old_subgraph)| { + new_subgraphs.get(&old_name).and_then(|new_subgraph| { + let new_subgraph = new_subgraph.clone(); + tracing::info!( + "old:\n{:?}\nnew:\n{:?}\neq:{}", + old_subgraph, + new_subgraph, + old_subgraph == new_subgraph + ); + if old_subgraph == new_subgraph { + None + } else { + Some((old_name, new_subgraph)) } - } - } - } + }) + }) + .collect::>(); Ok(SupergraphConfigDiff { added, diff --git a/src/subtask.rs b/src/subtask.rs index 935461a04..d9a80f140 100644 --- a/src/subtask.rs +++ b/src/subtask.rs @@ -107,6 +107,10 @@ impl Subtask { Subtask { inner, sender: tx }, ) } + + pub fn inner(&self) -> &T { + &self.inner + } } impl, Output> SubtaskRunUnit for Subtask { diff --git a/src/utils/effect/read_file.rs b/src/utils/effect/read_file.rs index 135bf7572..6149d7694 100644 --- a/src/utils/effect/read_file.rs +++ b/src/utils/effect/read_file.rs @@ -10,7 +10,7 @@ pub struct MockReadFileError {} #[cfg_attr(test, mockall::automock(type Error = MockReadFileError;))] #[async_trait] pub trait ReadFile { - type Error: std::error::Error + Send + 'static; + type Error: std::error::Error + Send + Sync + 'static; async fn read_file(&self, path: &Utf8PathBuf) -> Result; }