Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Watch subgraphs when rover dev runs #2310

Merged
merged 3 commits into from
Dec 18, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
89 changes: 53 additions & 36 deletions src/command/dev/next/mod.rs
Original file line number Diff line number Diff line change
@@ -1,28 +1,24 @@
#![warn(missing_docs)]

use std::io::stdin;

use anyhow::anyhow;
use apollo_federation_types::config::RouterVersion;
use camino::Utf8PathBuf;
use futures::StreamExt;
use houston::{Config, Profile};
use router::{
install::InstallRouter,
run::RunRouter,
watchers::{file::FileWatcher, router_config::RouterConfigWatcher},
};
use router::{install::InstallRouter, run::RunRouter, watchers::file::FileWatcher};
use rover_client::operations::config::who_am_i::WhoAmI;
use tower::{Service, ServiceExt};

use crate::{
command::Dev,
composition::runner::OneShotComposition,
subtask::{Subtask, SubtaskRunUnit},
composition::pipeline::CompositionPipeline,
utils::{
client::StudioClientConfig,
effect::{
exec::{TokioCommand, TokioSpawn},
read_file::FsReadFile,
write_file::{FsWriteFile, WriteFileRequest},
write_file::FsWriteFile,
},
},
RoverError, RoverOutput, RoverResult,
Expand All @@ -45,7 +41,7 @@ impl Dev {
let elv2_license_accepter = self.opts.plugin_opts.elv2_license_accepter;
let skip_update = self.opts.plugin_opts.skip_update;
let read_file_impl = FsReadFile::default();
let mut write_file_impl = FsWriteFile::default();
let write_file_impl = FsWriteFile::default();
let exec_command_impl = TokioCommand::default();
let router_address = RouterAddress::new(
self.opts.supergraph_opts.supergraph_address,
Expand All @@ -63,26 +59,38 @@ impl Dev {
.await
.map_err(|err| RoverError::new(anyhow!("{}", err)))?;

let supergraph_yaml = self.opts.supergraph_opts.clone().supergraph_config_path;
let federation_version = self.opts.supergraph_opts.federation_version.clone();
let profile = self.opts.plugin_opts.profile.clone();
let graph_ref = self.opts.supergraph_opts.graph_ref.clone();

let one_shot_composition = OneShotComposition::builder()
.client_config(client_config.clone())
.profile(profile.clone())
.elv2_license_accepter(elv2_license_accepter)
.skip_update(skip_update)
.and_federation_version(federation_version)
.and_graph_ref(graph_ref.clone())
.and_supergraph_yaml(supergraph_yaml)
.and_override_install_path(override_install_path.clone())
.build();

let supergraph_schema = one_shot_composition
.compose(&read_file_impl, &write_file_impl, &exec_command_impl)
let profile = &self.opts.plugin_opts.profile;
let graph_ref = &self.opts.supergraph_opts.graph_ref;
let supergraph_config_path = &self.opts.supergraph_opts.clone().supergraph_config_path;

let service = client_config.get_authenticated_client(profile)?.service()?;
let service = WhoAmI::new(service);

let composition_pipeline = CompositionPipeline::default()
.init(
&mut stdin(),
&client_config.get_authenticated_client(profile)?,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I really like this idea, because I actually had to implement something similar for the LSP (because they wanted composition to occur as soon as the pipeline starts). That being said, another requirement from the LSP work is that if you don't have anything in the supergraph.yaml that requires GraphOS you shouldn't need a profile to be configured. Because get authenticated client here causes a profile look up that won't work. Is there any way we can defer this to the subgraph level?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Or could we have a new implementation of FetchRemoteSubgraphs that only initialises a concrete client at the subgraph level?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it might be something where we can defer this until we scan the supergraph config for studio subgraphs. That way we can be up front about requiring the profile before we attempt to resolve the subgraphs. Would that work?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If you could then great

supergraph_config_path.clone(),
graph_ref.clone(),
)
.await?
.resolve_federation_version(
&client_config,
&client_config.get_authenticated_client(profile)?,
self.opts.supergraph_opts.federation_version.clone(),
)
.await?
.supergraph_sdl;
.install_supergraph_binary(
client_config.clone(),
override_install_path.clone(),
elv2_license_accepter,
skip_update,
)
.await?;
let composition_success = composition_pipeline
.compose(&exec_command_impl, &read_file_impl, &write_file_impl, None)
.await?;
let supergraph_schema = composition_success.supergraph_sdl();

// TODO: figure out how to actually get this; maybe based on fed version? didn't see a cli
// opt
Expand All @@ -91,10 +99,19 @@ impl Dev {
let credential =
Profile::get_credential(&profile.profile_name, &Config::new(None::<&String>, None)?)?;

let service = client_config
.get_authenticated_client(&profile)?
.service()?;
let service = WhoAmI::new(service);
let composition_runner = composition_pipeline
.runner(
exec_command_impl,
read_file_impl.clone(),
write_file_impl.clone(),
profile,
&client_config,
self.opts.subgraph_opts.subgraph_polling_interval,
tmp_config_dir_path.clone(),
)
.await?;

let composition_messages = composition_runner.run();

let mut run_router = RunRouter::default()
.install::<InstallRouter>(
Expand All @@ -107,17 +124,17 @@ impl Dev {
.await?
.load_config(&read_file_impl, router_address, router_config_path)
.await?
.load_remote_config(service, graph_ref, Some(credential))
.load_remote_config(service, graph_ref.clone(), Some(credential))
.await
.run(
FsWriteFile::default(),
TokioSpawn::default(),
&tmp_config_dir_path,
client_config,
client_config.clone(),
supergraph_schema,
)
.await?
.watch_for_changes(write_file_impl)
.watch_for_changes(write_file_impl, composition_messages)
.await;

while let Some(router_log) = run_router.router_logs().next().await {
Expand Down
84 changes: 61 additions & 23 deletions src/command/dev/next/router/binary.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,31 +166,69 @@ where
.send(Err(err))
.tap_err(|err| tracing::error!("Failed to send error message {:?}", err));
}
Ok(mut child) => match child.stdout.take() {
Some(stdout) => {
tokio::task::spawn(async move {
let mut lines = BufReader::new(stdout).lines();
while let Ok(Some(line)) = lines.next_line().await.tap_err(|err| {
tracing::error!("Error reading from router stdout: {:?}", err)
}) {
let _ = sender.send(Ok(RouterLog::Stdout(line))).tap_err(|err| {
tracing::error!(
"Failed to send router stdout message. {:?}",
err
)
});
}
});
Ok(mut child) => {
match child.stdout.take() {
Some(stdout) => {
tokio::task::spawn({
let sender = sender.clone();
async move {
let mut lines = BufReader::new(stdout).lines();
while let Ok(Some(line)) =
lines.next_line().await.tap_err(|err| {
tracing::error!(
"Error reading from router stdout: {:?}",
err
)
})
{
let _ = sender.send(Ok(RouterLog::Stdout(line))).tap_err(
|err| {
tracing::error!(
"Failed to send router stdout message. {:?}",
err
)
},
);
}
}
});
}
None => {
let err = RunRouterBinaryError::OutputCapture {
descriptor: "stdin".to_string(),
};
let _ = sender.send(Err(err)).tap_err(|err| {
tracing::error!("Failed to send error message {:?}", err)
});
}
}
None => {
let err = RunRouterBinaryError::OutputCapture {
descriptor: "stdin".to_string(),
};
let _ = sender.send(Err(err)).tap_err(|err| {
tracing::error!("Failed to send error message {:?}", err)
});
match child.stderr.take() {
Some(stderr) => {
tokio::task::spawn(async move {
let mut lines = BufReader::new(stderr).lines();
while let Ok(Some(line)) = lines.next_line().await.tap_err(|err| {
tracing::error!("Error reading from router stderr: {:?}", err)
}) {
let _ =
sender.send(Ok(RouterLog::Stderr(line))).tap_err(|err| {
tracing::error!(
"Failed to send router stderr message. {:?}",
err
)
});
}
});
}
None => {
let err = RunRouterBinaryError::OutputCapture {
descriptor: "stdin".to_string(),
};
let _ = sender.send(Err(err)).tap_err(|err| {
tracing::error!("Failed to send error message {:?}", err)
});
}
}
},
}
}
})
.abort_handle()
Expand Down
6 changes: 2 additions & 4 deletions src/command/dev/next/router/hot_reload.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,8 @@ use crate::{subtask::SubtaskHandleStream, utils::effect::write_file::WriteFile};

use super::config::RouterConfig;

pub struct SupergraphSchema(String);

pub enum RouterUpdateEvent {
SchemaChanged { schema: SupergraphSchema },
SchemaChanged { schema: String },
ConfigChanged { config: RouterConfig },
}

Expand Down Expand Up @@ -44,7 +42,7 @@ where
match router_update_event {
RouterUpdateEvent::SchemaChanged { schema } => {
match write_file_impl
.write_file(&self.schema, schema.0.as_bytes())
.write_file(&self.schema, schema.as_bytes())
.await
{
Ok(_) => {
Expand Down
41 changes: 34 additions & 7 deletions src/command/dev/next/router/run.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,10 @@ use std::time::Duration;

use apollo_federation_types::config::RouterVersion;
use camino::{Utf8Path, Utf8PathBuf};
use futures::{stream, StreamExt};
use futures::{
stream::{self, BoxStream},
StreamExt,
};
use houston::Credential;
use rover_client::{
operations::config::who_am_i::{RegistryIdentity, WhoAmIError, WhoAmIRequest},
Expand All @@ -16,6 +19,7 @@ use tracing::info;

use crate::{
command::dev::next::FileWatcher,
composition::events::CompositionEvent,
options::LicenseAccepter,
subtask::{Subtask, SubtaskRunStream, SubtaskRunUnit},
utils::{
Expand Down Expand Up @@ -131,7 +135,7 @@ impl RunRouter<state::Run> {
spawn: Spawn,
temp_router_dir: &Utf8Path,
studio_client_config: StudioClientConfig,
supergraph_schema: String,
supergraph_schema: &str,
) -> Result<RunRouter<state::Watch>, RunRouterBinaryError>
where
Spawn: Service<ExecCommandConfig, Response = Child> + Send + Clone + 'static,
Expand Down Expand Up @@ -171,7 +175,7 @@ impl RunRouter<state::Run> {
.call(
WriteFileRequest::builder()
.path(hot_reload_schema_path.clone())
.contents(supergraph_schema.into_bytes())
.contents(supergraph_schema.as_bytes().to_vec())
.build(),
)
.await
Expand Down Expand Up @@ -260,10 +264,15 @@ impl RunRouter<state::Run> {
}

impl RunRouter<state::Watch> {
pub async fn watch_for_changes<WriteF>(self, write_file_impl: WriteF) -> RunRouter<state::Abort>
pub async fn watch_for_changes<WriteF>(
self,
write_file_impl: WriteF,
composition_messages: BoxStream<'static, CompositionEvent>,
) -> RunRouter<state::Abort>
where
WriteF: WriteFile + Send + Clone + 'static,
{
tracing::info!("Watching for subgraph changes");
let (router_config_updates, config_watcher_subtask) = if let Some(config_path) =
self.state.config_path
{
Expand All @@ -275,20 +284,36 @@ impl RunRouter<state::Watch> {
(None, None)
};

let composition_messages =
tokio_stream::StreamExt::filter_map(composition_messages, |event| match event {
CompositionEvent::Started => None,
CompositionEvent::Error(err) => {
tracing::error!("Composition error {:?}", err);
None
}
CompositionEvent::Success(success) => Some(RouterUpdateEvent::SchemaChanged {
schema: success.supergraph_sdl().to_string(),
}),
})
.boxed();

let hot_reload_watcher = HotReloadWatcher::builder()
.config(self.state.hot_reload_config_path)
.schema(self.state.hot_reload_schema_path)
.schema(self.state.hot_reload_schema_path.clone())
.write_file_impl(write_file_impl)
.build();

let (hot_reload_events, hot_reload_subtask): (UnboundedReceiverStream<HotReloadEvent>, _) =
Subtask::new(hot_reload_watcher);

let router_config_updates = router_config_updates
.map(|stream| stream.boxed())
.map(move |stream| stream.boxed())
.unwrap_or_else(|| stream::empty().boxed());

let abort_hot_reload = SubtaskRunStream::run(hot_reload_subtask, router_config_updates);
let router_updates =
tokio_stream::StreamExt::merge(router_config_updates, composition_messages);

let abort_hot_reload = SubtaskRunStream::run(hot_reload_subtask, router_updates.boxed());

let abort_config_watcher = config_watcher_subtask.map(SubtaskRunUnit::run);

Expand All @@ -299,6 +324,7 @@ impl RunRouter<state::Watch> {
abort_config_watcher,
hot_reload_events,
router_logs: self.state.router_logs,
hot_reload_schema_path: self.state.hot_reload_schema_path,
},
}
}
Expand Down Expand Up @@ -352,5 +378,6 @@ mod state {
pub abort_router: AbortHandle,
pub abort_config_watcher: Option<AbortHandle>,
pub abort_hot_reload: AbortHandle,
pub hot_reload_schema_path: Utf8PathBuf,
}
}
Loading
Loading