Skip to content

Commit ccddfc9

Browse files
committed
Watch subgraphs when rover dev runs
1 parent 9e9f25d commit ccddfc9

File tree

22 files changed

+689
-338
lines changed

22 files changed

+689
-338
lines changed

src/command/dev/next/mod.rs

+53-36
Original file line numberDiff line numberDiff line change
@@ -1,28 +1,24 @@
11
#![warn(missing_docs)]
22

3+
use std::io::stdin;
4+
35
use anyhow::anyhow;
46
use apollo_federation_types::config::RouterVersion;
57
use camino::Utf8PathBuf;
68
use futures::StreamExt;
79
use houston::{Config, Profile};
8-
use router::{
9-
install::InstallRouter,
10-
run::RunRouter,
11-
watchers::{file::FileWatcher, router_config::RouterConfigWatcher},
12-
};
10+
use router::{install::InstallRouter, run::RunRouter, watchers::file::FileWatcher};
1311
use rover_client::operations::config::who_am_i::WhoAmI;
14-
use tower::{Service, ServiceExt};
1512

1613
use crate::{
1714
command::Dev,
18-
composition::runner::OneShotComposition,
19-
subtask::{Subtask, SubtaskRunUnit},
15+
composition::pipeline::CompositionPipeline,
2016
utils::{
2117
client::StudioClientConfig,
2218
effect::{
2319
exec::{TokioCommand, TokioSpawn},
2420
read_file::FsReadFile,
25-
write_file::{FsWriteFile, WriteFileRequest},
21+
write_file::FsWriteFile,
2622
},
2723
},
2824
RoverError, RoverOutput, RoverResult,
@@ -45,7 +41,7 @@ impl Dev {
4541
let elv2_license_accepter = self.opts.plugin_opts.elv2_license_accepter;
4642
let skip_update = self.opts.plugin_opts.skip_update;
4743
let read_file_impl = FsReadFile::default();
48-
let mut write_file_impl = FsWriteFile::default();
44+
let write_file_impl = FsWriteFile::default();
4945
let exec_command_impl = TokioCommand::default();
5046
let router_address = RouterAddress::new(
5147
self.opts.supergraph_opts.supergraph_address,
@@ -63,26 +59,38 @@ impl Dev {
6359
.await
6460
.map_err(|err| RoverError::new(anyhow!("{}", err)))?;
6561

66-
let supergraph_yaml = self.opts.supergraph_opts.clone().supergraph_config_path;
67-
let federation_version = self.opts.supergraph_opts.federation_version.clone();
68-
let profile = self.opts.plugin_opts.profile.clone();
69-
let graph_ref = self.opts.supergraph_opts.graph_ref.clone();
70-
71-
let one_shot_composition = OneShotComposition::builder()
72-
.client_config(client_config.clone())
73-
.profile(profile.clone())
74-
.elv2_license_accepter(elv2_license_accepter)
75-
.skip_update(skip_update)
76-
.and_federation_version(federation_version)
77-
.and_graph_ref(graph_ref.clone())
78-
.and_supergraph_yaml(supergraph_yaml)
79-
.and_override_install_path(override_install_path.clone())
80-
.build();
81-
82-
let supergraph_schema = one_shot_composition
83-
.compose(&read_file_impl, &write_file_impl, &exec_command_impl)
62+
let profile = &self.opts.plugin_opts.profile;
63+
let graph_ref = &self.opts.supergraph_opts.graph_ref;
64+
let supergraph_config_path = &self.opts.supergraph_opts.clone().supergraph_config_path;
65+
66+
let service = client_config.get_authenticated_client(profile)?.service()?;
67+
let service = WhoAmI::new(service);
68+
69+
let composition_pipeline = CompositionPipeline::default()
70+
.init(
71+
&mut stdin(),
72+
&client_config.get_authenticated_client(profile)?,
73+
supergraph_config_path.clone(),
74+
graph_ref.clone(),
75+
)
76+
.await?
77+
.resolve_federation_version(
78+
&client_config,
79+
&client_config.get_authenticated_client(profile)?,
80+
self.opts.supergraph_opts.federation_version.clone(),
81+
)
8482
.await?
85-
.supergraph_sdl;
83+
.install_supergraph_binary(
84+
client_config.clone(),
85+
override_install_path.clone(),
86+
elv2_license_accepter,
87+
skip_update,
88+
)
89+
.await?;
90+
let composition_success = composition_pipeline
91+
.compose(&exec_command_impl, &read_file_impl, &write_file_impl, None)
92+
.await?;
93+
let supergraph_schema = composition_success.supergraph_sdl();
8694

8795
// TODO: figure out how to actually get this; maybe based on fed version? didn't see a cli
8896
// opt
@@ -91,10 +99,19 @@ impl Dev {
9199
let credential =
92100
Profile::get_credential(&profile.profile_name, &Config::new(None::<&String>, None)?)?;
93101

94-
let service = client_config
95-
.get_authenticated_client(&profile)?
96-
.service()?;
97-
let service = WhoAmI::new(service);
102+
let composition_runner = composition_pipeline
103+
.runner(
104+
exec_command_impl,
105+
read_file_impl.clone(),
106+
write_file_impl.clone(),
107+
profile,
108+
&client_config,
109+
self.opts.subgraph_opts.subgraph_polling_interval,
110+
tmp_config_dir_path.clone(),
111+
)
112+
.await?;
113+
114+
let composition_messages = composition_runner.run();
98115

99116
let mut run_router = RunRouter::default()
100117
.install::<InstallRouter>(
@@ -107,17 +124,17 @@ impl Dev {
107124
.await?
108125
.load_config(&read_file_impl, router_address, router_config_path)
109126
.await?
110-
.load_remote_config(service, graph_ref, Some(credential))
127+
.load_remote_config(service, graph_ref.clone(), Some(credential))
111128
.await
112129
.run(
113130
FsWriteFile::default(),
114131
TokioSpawn::default(),
115132
&tmp_config_dir_path,
116-
client_config,
133+
client_config.clone(),
117134
supergraph_schema,
118135
)
119136
.await?
120-
.watch_for_changes(write_file_impl)
137+
.watch_for_changes(write_file_impl, composition_messages)
121138
.await;
122139

123140
while let Some(router_log) = run_router.router_logs().next().await {

src/command/dev/next/router/binary.rs

+61-23
Original file line numberDiff line numberDiff line change
@@ -166,31 +166,69 @@ where
166166
.send(Err(err))
167167
.tap_err(|err| tracing::error!("Failed to send error message {:?}", err));
168168
}
169-
Ok(mut child) => match child.stdout.take() {
170-
Some(stdout) => {
171-
tokio::task::spawn(async move {
172-
let mut lines = BufReader::new(stdout).lines();
173-
while let Ok(Some(line)) = lines.next_line().await.tap_err(|err| {
174-
tracing::error!("Error reading from router stdout: {:?}", err)
175-
}) {
176-
let _ = sender.send(Ok(RouterLog::Stdout(line))).tap_err(|err| {
177-
tracing::error!(
178-
"Failed to send router stdout message. {:?}",
179-
err
180-
)
181-
});
182-
}
183-
});
169+
Ok(mut child) => {
170+
match child.stdout.take() {
171+
Some(stdout) => {
172+
tokio::task::spawn({
173+
let sender = sender.clone();
174+
async move {
175+
let mut lines = BufReader::new(stdout).lines();
176+
while let Ok(Some(line)) =
177+
lines.next_line().await.tap_err(|err| {
178+
tracing::error!(
179+
"Error reading from router stdout: {:?}",
180+
err
181+
)
182+
})
183+
{
184+
let _ = sender.send(Ok(RouterLog::Stdout(line))).tap_err(
185+
|err| {
186+
tracing::error!(
187+
"Failed to send router stdout message. {:?}",
188+
err
189+
)
190+
},
191+
);
192+
}
193+
}
194+
});
195+
}
196+
None => {
197+
let err = RunRouterBinaryError::OutputCapture {
198+
descriptor: "stdin".to_string(),
199+
};
200+
let _ = sender.send(Err(err)).tap_err(|err| {
201+
tracing::error!("Failed to send error message {:?}", err)
202+
});
203+
}
184204
}
185-
None => {
186-
let err = RunRouterBinaryError::OutputCapture {
187-
descriptor: "stdin".to_string(),
188-
};
189-
let _ = sender.send(Err(err)).tap_err(|err| {
190-
tracing::error!("Failed to send error message {:?}", err)
191-
});
205+
match child.stderr.take() {
206+
Some(stderr) => {
207+
tokio::task::spawn(async move {
208+
let mut lines = BufReader::new(stderr).lines();
209+
while let Ok(Some(line)) = lines.next_line().await.tap_err(|err| {
210+
tracing::error!("Error reading from router stderr: {:?}", err)
211+
}) {
212+
let _ =
213+
sender.send(Ok(RouterLog::Stderr(line))).tap_err(|err| {
214+
tracing::error!(
215+
"Failed to send router stderr message. {:?}",
216+
err
217+
)
218+
});
219+
}
220+
});
221+
}
222+
None => {
223+
let err = RunRouterBinaryError::OutputCapture {
224+
descriptor: "stdin".to_string(),
225+
};
226+
let _ = sender.send(Err(err)).tap_err(|err| {
227+
tracing::error!("Failed to send error message {:?}", err)
228+
});
229+
}
192230
}
193-
},
231+
}
194232
}
195233
})
196234
.abort_handle()

src/command/dev/next/router/hot_reload.rs

+2-4
Original file line numberDiff line numberDiff line change
@@ -7,10 +7,8 @@ use crate::{subtask::SubtaskHandleStream, utils::effect::write_file::WriteFile};
77

88
use super::config::RouterConfig;
99

10-
pub struct SupergraphSchema(String);
11-
1210
pub enum RouterUpdateEvent {
13-
SchemaChanged { schema: SupergraphSchema },
11+
SchemaChanged { schema: String },
1412
ConfigChanged { config: RouterConfig },
1513
}
1614

@@ -44,7 +42,7 @@ where
4442
match router_update_event {
4543
RouterUpdateEvent::SchemaChanged { schema } => {
4644
match write_file_impl
47-
.write_file(&self.schema, schema.0.as_bytes())
45+
.write_file(&self.schema, schema.as_bytes())
4846
.await
4947
{
5048
Ok(_) => {

src/command/dev/next/router/run.rs

+34-7
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,10 @@ use std::time::Duration;
22

33
use apollo_federation_types::config::RouterVersion;
44
use camino::{Utf8Path, Utf8PathBuf};
5-
use futures::{stream, StreamExt};
5+
use futures::{
6+
stream::{self, BoxStream},
7+
StreamExt,
8+
};
69
use houston::Credential;
710
use rover_client::{
811
operations::config::who_am_i::{RegistryIdentity, WhoAmIError, WhoAmIRequest},
@@ -16,6 +19,7 @@ use tracing::info;
1619

1720
use crate::{
1821
command::dev::next::FileWatcher,
22+
composition::events::CompositionEvent,
1923
options::LicenseAccepter,
2024
subtask::{Subtask, SubtaskRunStream, SubtaskRunUnit},
2125
utils::{
@@ -131,7 +135,7 @@ impl RunRouter<state::Run> {
131135
spawn: Spawn,
132136
temp_router_dir: &Utf8Path,
133137
studio_client_config: StudioClientConfig,
134-
supergraph_schema: String,
138+
supergraph_schema: &str,
135139
) -> Result<RunRouter<state::Watch>, RunRouterBinaryError>
136140
where
137141
Spawn: Service<ExecCommandConfig, Response = Child> + Send + Clone + 'static,
@@ -171,7 +175,7 @@ impl RunRouter<state::Run> {
171175
.call(
172176
WriteFileRequest::builder()
173177
.path(hot_reload_schema_path.clone())
174-
.contents(supergraph_schema.into_bytes())
178+
.contents(supergraph_schema.as_bytes().to_vec())
175179
.build(),
176180
)
177181
.await
@@ -260,10 +264,15 @@ impl RunRouter<state::Run> {
260264
}
261265

262266
impl RunRouter<state::Watch> {
263-
pub async fn watch_for_changes<WriteF>(self, write_file_impl: WriteF) -> RunRouter<state::Abort>
267+
pub async fn watch_for_changes<WriteF>(
268+
self,
269+
write_file_impl: WriteF,
270+
composition_messages: BoxStream<'static, CompositionEvent>,
271+
) -> RunRouter<state::Abort>
264272
where
265273
WriteF: WriteFile + Send + Clone + 'static,
266274
{
275+
tracing::info!("Watching for subgraph changes");
267276
let (router_config_updates, config_watcher_subtask) = if let Some(config_path) =
268277
self.state.config_path
269278
{
@@ -275,20 +284,36 @@ impl RunRouter<state::Watch> {
275284
(None, None)
276285
};
277286

287+
let composition_messages =
288+
tokio_stream::StreamExt::filter_map(composition_messages, |event| match event {
289+
CompositionEvent::Started => None,
290+
CompositionEvent::Error(err) => {
291+
tracing::error!("Composition error {:?}", err);
292+
None
293+
}
294+
CompositionEvent::Success(success) => Some(RouterUpdateEvent::SchemaChanged {
295+
schema: success.supergraph_sdl().to_string(),
296+
}),
297+
})
298+
.boxed();
299+
278300
let hot_reload_watcher = HotReloadWatcher::builder()
279301
.config(self.state.hot_reload_config_path)
280-
.schema(self.state.hot_reload_schema_path)
302+
.schema(self.state.hot_reload_schema_path.clone())
281303
.write_file_impl(write_file_impl)
282304
.build();
283305

284306
let (hot_reload_events, hot_reload_subtask): (UnboundedReceiverStream<HotReloadEvent>, _) =
285307
Subtask::new(hot_reload_watcher);
286308

287309
let router_config_updates = router_config_updates
288-
.map(|stream| stream.boxed())
310+
.map(move |stream| stream.boxed())
289311
.unwrap_or_else(|| stream::empty().boxed());
290312

291-
let abort_hot_reload = SubtaskRunStream::run(hot_reload_subtask, router_config_updates);
313+
let router_updates =
314+
tokio_stream::StreamExt::merge(router_config_updates, composition_messages);
315+
316+
let abort_hot_reload = SubtaskRunStream::run(hot_reload_subtask, router_updates.boxed());
292317

293318
let abort_config_watcher = config_watcher_subtask.map(SubtaskRunUnit::run);
294319

@@ -299,6 +324,7 @@ impl RunRouter<state::Watch> {
299324
abort_config_watcher,
300325
hot_reload_events,
301326
router_logs: self.state.router_logs,
327+
hot_reload_schema_path: self.state.hot_reload_schema_path,
302328
},
303329
}
304330
}
@@ -352,5 +378,6 @@ mod state {
352378
pub abort_router: AbortHandle,
353379
pub abort_config_watcher: Option<AbortHandle>,
354380
pub abort_hot_reload: AbortHandle,
381+
pub hot_reload_schema_path: Utf8PathBuf,
355382
}
356383
}

0 commit comments

Comments
 (0)