Skip to content

Commit

Permalink
Watch subgraphs when rover dev runs (#2310)
Browse files Browse the repository at this point in the history
Refactors OneShotComposition into a CompositionPipeline so that we can
consistently create a one-and-done composition artifact and then kick
off subgraph watchers for running with LSP or Rover Dev.

Additionally, this adds watching subgraphs to the pipeline and fixes a
few issues along the way.
  • Loading branch information
dotdat authored Dec 18, 2024
1 parent 9e9f25d commit b803e38
Show file tree
Hide file tree
Showing 24 changed files with 787 additions and 422 deletions.
89 changes: 53 additions & 36 deletions src/command/dev/next/mod.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,24 @@
#![warn(missing_docs)]

use std::io::stdin;

use anyhow::anyhow;
use apollo_federation_types::config::RouterVersion;
use camino::Utf8PathBuf;
use futures::StreamExt;
use houston::{Config, Profile};
use router::{
install::InstallRouter,
run::RunRouter,
watchers::{file::FileWatcher, router_config::RouterConfigWatcher},
};
use router::{install::InstallRouter, run::RunRouter, watchers::file::FileWatcher};
use rover_client::operations::config::who_am_i::WhoAmI;
use tower::{Service, ServiceExt};

use crate::{
command::Dev,
composition::runner::OneShotComposition,
subtask::{Subtask, SubtaskRunUnit},
composition::pipeline::CompositionPipeline,
utils::{
client::StudioClientConfig,
effect::{
exec::{TokioCommand, TokioSpawn},
read_file::FsReadFile,
write_file::{FsWriteFile, WriteFileRequest},
write_file::FsWriteFile,
},
},
RoverError, RoverOutput, RoverResult,
Expand All @@ -45,7 +41,7 @@ impl Dev {
let elv2_license_accepter = self.opts.plugin_opts.elv2_license_accepter;
let skip_update = self.opts.plugin_opts.skip_update;
let read_file_impl = FsReadFile::default();
let mut write_file_impl = FsWriteFile::default();
let write_file_impl = FsWriteFile::default();
let exec_command_impl = TokioCommand::default();
let router_address = RouterAddress::new(
self.opts.supergraph_opts.supergraph_address,
Expand All @@ -63,26 +59,38 @@ impl Dev {
.await
.map_err(|err| RoverError::new(anyhow!("{}", err)))?;

let supergraph_yaml = self.opts.supergraph_opts.clone().supergraph_config_path;
let federation_version = self.opts.supergraph_opts.federation_version.clone();
let profile = self.opts.plugin_opts.profile.clone();
let graph_ref = self.opts.supergraph_opts.graph_ref.clone();

let one_shot_composition = OneShotComposition::builder()
.client_config(client_config.clone())
.profile(profile.clone())
.elv2_license_accepter(elv2_license_accepter)
.skip_update(skip_update)
.and_federation_version(federation_version)
.and_graph_ref(graph_ref.clone())
.and_supergraph_yaml(supergraph_yaml)
.and_override_install_path(override_install_path.clone())
.build();

let supergraph_schema = one_shot_composition
.compose(&read_file_impl, &write_file_impl, &exec_command_impl)
let profile = &self.opts.plugin_opts.profile;
let graph_ref = &self.opts.supergraph_opts.graph_ref;
let supergraph_config_path = &self.opts.supergraph_opts.clone().supergraph_config_path;

let service = client_config.get_authenticated_client(profile)?.service()?;
let service = WhoAmI::new(service);

let composition_pipeline = CompositionPipeline::default()
.init(
&mut stdin(),
&client_config.get_authenticated_client(profile)?,
supergraph_config_path.clone(),
graph_ref.clone(),
)
.await?
.resolve_federation_version(
&client_config,
&client_config.get_authenticated_client(profile)?,
self.opts.supergraph_opts.federation_version.clone(),
)
.await?
.supergraph_sdl;
.install_supergraph_binary(
client_config.clone(),
override_install_path.clone(),
elv2_license_accepter,
skip_update,
)
.await?;
let composition_success = composition_pipeline
.compose(&exec_command_impl, &read_file_impl, &write_file_impl, None)
.await?;
let supergraph_schema = composition_success.supergraph_sdl();

// TODO: figure out how to actually get this; maybe based on fed version? didn't see a cli
// opt
Expand All @@ -91,10 +99,19 @@ impl Dev {
let credential =
Profile::get_credential(&profile.profile_name, &Config::new(None::<&String>, None)?)?;

let service = client_config
.get_authenticated_client(&profile)?
.service()?;
let service = WhoAmI::new(service);
let composition_runner = composition_pipeline
.runner(
exec_command_impl,
read_file_impl.clone(),
write_file_impl.clone(),
profile,
&client_config,
self.opts.subgraph_opts.subgraph_polling_interval,
tmp_config_dir_path.clone(),
)
.await?;

let composition_messages = composition_runner.run();

let mut run_router = RunRouter::default()
.install::<InstallRouter>(
Expand All @@ -107,17 +124,17 @@ impl Dev {
.await?
.load_config(&read_file_impl, router_address, router_config_path)
.await?
.load_remote_config(service, graph_ref, Some(credential))
.load_remote_config(service, graph_ref.clone(), Some(credential))
.await
.run(
FsWriteFile::default(),
TokioSpawn::default(),
&tmp_config_dir_path,
client_config,
client_config.clone(),
supergraph_schema,
)
.await?
.watch_for_changes(write_file_impl)
.watch_for_changes(write_file_impl, composition_messages)
.await;

while let Some(router_log) = run_router.router_logs().next().await {
Expand Down
84 changes: 61 additions & 23 deletions src/command/dev/next/router/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,31 +166,69 @@ where
.send(Err(err))
.tap_err(|err| tracing::error!("Failed to send error message {:?}", err));
}
Ok(mut child) => match child.stdout.take() {
Some(stdout) => {
tokio::task::spawn(async move {
let mut lines = BufReader::new(stdout).lines();
while let Ok(Some(line)) = lines.next_line().await.tap_err(|err| {
tracing::error!("Error reading from router stdout: {:?}", err)
}) {
let _ = sender.send(Ok(RouterLog::Stdout(line))).tap_err(|err| {
tracing::error!(
"Failed to send router stdout message. {:?}",
err
)
});
}
});
Ok(mut child) => {
match child.stdout.take() {
Some(stdout) => {
tokio::task::spawn({
let sender = sender.clone();
async move {
let mut lines = BufReader::new(stdout).lines();
while let Ok(Some(line)) =
lines.next_line().await.tap_err(|err| {
tracing::error!(
"Error reading from router stdout: {:?}",
err
)
})
{
let _ = sender.send(Ok(RouterLog::Stdout(line))).tap_err(
|err| {
tracing::error!(
"Failed to send router stdout message. {:?}",
err
)
},
);
}
}
});
}
None => {
let err = RunRouterBinaryError::OutputCapture {
descriptor: "stdin".to_string(),
};
let _ = sender.send(Err(err)).tap_err(|err| {
tracing::error!("Failed to send error message {:?}", err)
});
}
}
None => {
let err = RunRouterBinaryError::OutputCapture {
descriptor: "stdin".to_string(),
};
let _ = sender.send(Err(err)).tap_err(|err| {
tracing::error!("Failed to send error message {:?}", err)
});
match child.stderr.take() {
Some(stderr) => {
tokio::task::spawn(async move {
let mut lines = BufReader::new(stderr).lines();
while let Ok(Some(line)) = lines.next_line().await.tap_err(|err| {
tracing::error!("Error reading from router stderr: {:?}", err)
}) {
let _ =
sender.send(Ok(RouterLog::Stderr(line))).tap_err(|err| {
tracing::error!(
"Failed to send router stderr message. {:?}",
err
)
});
}
});
}
None => {
let err = RunRouterBinaryError::OutputCapture {
descriptor: "stdin".to_string(),
};
let _ = sender.send(Err(err)).tap_err(|err| {
tracing::error!("Failed to send error message {:?}", err)
});
}
}
},
}
}
})
.abort_handle()
Expand Down
6 changes: 2 additions & 4 deletions src/command/dev/next/router/hot_reload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@ use crate::{subtask::SubtaskHandleStream, utils::effect::write_file::WriteFile};

use super::config::RouterConfig;

pub struct SupergraphSchema(String);

pub enum RouterUpdateEvent {
SchemaChanged { schema: SupergraphSchema },
SchemaChanged { schema: String },
ConfigChanged { config: RouterConfig },
}

Expand Down Expand Up @@ -44,7 +42,7 @@ where
match router_update_event {
RouterUpdateEvent::SchemaChanged { schema } => {
match write_file_impl
.write_file(&self.schema, schema.0.as_bytes())
.write_file(&self.schema, schema.as_bytes())
.await
{
Ok(_) => {
Expand Down
41 changes: 34 additions & 7 deletions src/command/dev/next/router/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use std::time::Duration;

use apollo_federation_types::config::RouterVersion;
use camino::{Utf8Path, Utf8PathBuf};
use futures::{stream, StreamExt};
use futures::{
stream::{self, BoxStream},
StreamExt,
};
use houston::Credential;
use rover_client::{
operations::config::who_am_i::{RegistryIdentity, WhoAmIError, WhoAmIRequest},
Expand All @@ -16,6 +19,7 @@ use tracing::info;

use crate::{
command::dev::next::FileWatcher,
composition::events::CompositionEvent,
options::LicenseAccepter,
subtask::{Subtask, SubtaskRunStream, SubtaskRunUnit},
utils::{
Expand Down Expand Up @@ -131,7 +135,7 @@ impl RunRouter<state::Run> {
spawn: Spawn,
temp_router_dir: &Utf8Path,
studio_client_config: StudioClientConfig,
supergraph_schema: String,
supergraph_schema: &str,
) -> Result<RunRouter<state::Watch>, RunRouterBinaryError>
where
Spawn: Service<ExecCommandConfig, Response = Child> + Send + Clone + 'static,
Expand Down Expand Up @@ -171,7 +175,7 @@ impl RunRouter<state::Run> {
.call(
WriteFileRequest::builder()
.path(hot_reload_schema_path.clone())
.contents(supergraph_schema.into_bytes())
.contents(supergraph_schema.as_bytes().to_vec())
.build(),
)
.await
Expand Down Expand Up @@ -260,10 +264,15 @@ impl RunRouter<state::Run> {
}

impl RunRouter<state::Watch> {
pub async fn watch_for_changes<WriteF>(self, write_file_impl: WriteF) -> RunRouter<state::Abort>
pub async fn watch_for_changes<WriteF>(
self,
write_file_impl: WriteF,
composition_messages: BoxStream<'static, CompositionEvent>,
) -> RunRouter<state::Abort>
where
WriteF: WriteFile + Send + Clone + 'static,
{
tracing::info!("Watching for subgraph changes");
let (router_config_updates, config_watcher_subtask) = if let Some(config_path) =
self.state.config_path
{
Expand All @@ -275,20 +284,36 @@ impl RunRouter<state::Watch> {
(None, None)
};

let composition_messages =
tokio_stream::StreamExt::filter_map(composition_messages, |event| match event {
CompositionEvent::Started => None,
CompositionEvent::Error(err) => {
tracing::error!("Composition error {:?}", err);
None
}
CompositionEvent::Success(success) => Some(RouterUpdateEvent::SchemaChanged {
schema: success.supergraph_sdl().to_string(),
}),
})
.boxed();

let hot_reload_watcher = HotReloadWatcher::builder()
.config(self.state.hot_reload_config_path)
.schema(self.state.hot_reload_schema_path)
.schema(self.state.hot_reload_schema_path.clone())
.write_file_impl(write_file_impl)
.build();

let (hot_reload_events, hot_reload_subtask): (UnboundedReceiverStream<HotReloadEvent>, _) =
Subtask::new(hot_reload_watcher);

let router_config_updates = router_config_updates
.map(|stream| stream.boxed())
.map(move |stream| stream.boxed())
.unwrap_or_else(|| stream::empty().boxed());

let abort_hot_reload = SubtaskRunStream::run(hot_reload_subtask, router_config_updates);
let router_updates =
tokio_stream::StreamExt::merge(router_config_updates, composition_messages);

let abort_hot_reload = SubtaskRunStream::run(hot_reload_subtask, router_updates.boxed());

let abort_config_watcher = config_watcher_subtask.map(SubtaskRunUnit::run);

Expand All @@ -299,6 +324,7 @@ impl RunRouter<state::Watch> {
abort_config_watcher,
hot_reload_events,
router_logs: self.state.router_logs,
hot_reload_schema_path: self.state.hot_reload_schema_path,
},
}
}
Expand Down Expand Up @@ -352,5 +378,6 @@ mod state {
pub abort_router: AbortHandle,
pub abort_config_watcher: Option<AbortHandle>,
pub abort_hot_reload: AbortHandle,
pub hot_reload_schema_path: Utf8PathBuf,
}
}
Loading

0 comments on commit b803e38

Please sign in to comment.