Skip to content

Commit ba57f69

Browse files
aaronArinderdotdat
andcommitted
moving dev_2 over to composition-integration (#2130)
Co-authored-by: Brian George <[email protected]>
1 parent 7c6b910 commit ba57f69

File tree

12 files changed

+351
-2
lines changed

12 files changed

+351
-2
lines changed

Cargo.toml

+1-2
Original file line numberDiff line numberDiff line change
@@ -208,12 +208,12 @@ sputnik = { workspace = true }
208208
strsim = { workspace = true }
209209
strum = { workspace = true }
210210
strum_macros = { workspace = true }
211+
tap = { workspace = true }
211212
tar = { workspace = true }
212213
tempfile = { workspace = true }
213214
timber = { workspace = true }
214215
termimad = { workspace = true }
215216
thiserror = { workspace = true }
216-
tap = { workspace = true }
217217
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "macros", "process", "sync"] }
218218
tokio-stream = { workspace = true }
219219
toml = { workspace = true }
@@ -244,5 +244,4 @@ reqwest = { workspace = true, features = ["native-tls-vendored"] }
244244
rstest = { workspace = true }
245245
serial_test = { workspace = true }
246246
speculoos = { workspace = true }
247-
tokio = { workspace = true }
248247
tracing-test = "0.2.5"

src/composition/mod.rs

+3
Original file line numberDiff line numberDiff line change
@@ -1 +1,4 @@
11
pub mod supergraph;
2+
3+
#[cfg(feature = "composition-js")]
4+
mod watchers;
+1
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
pub mod router_config;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,22 @@
1+
use camino::Utf8PathBuf;
2+
use rover_std::{Fs, RoverStdError};
3+
4+
#[derive(thiserror::Error, Debug)]
5+
pub enum RouterConfigError {
6+
#[error("Unable to write router config file")]
7+
FailedToWriteFile(RoverStdError),
8+
}
9+
10+
pub struct WriteRouterConfig {
11+
path: Utf8PathBuf,
12+
}
13+
14+
impl WriteRouterConfig {
15+
pub fn new(path: Utf8PathBuf) -> WriteRouterConfig {
16+
WriteRouterConfig { path }
17+
}
18+
pub fn run(&self, contents: &str) -> Result<(), RouterConfigError> {
19+
Fs::write_file(self.path.clone(), contents.to_string())
20+
.map_err(|err| RouterConfigError::FailedToWriteFile(err))
21+
}
22+
}

src/composition/watchers/messages.rs

+31
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
use futures::{stream::BoxStream, StreamExt};
2+
use tap::TapFallible;
3+
use tokio::sync::mpsc::unbounded_channel;
4+
use tokio_stream::wrappers::UnboundedReceiverStream;
5+
6+
use super::watcher::router_config::RouterConfigMessage;
7+
8+
pub fn receive_messages(
9+
mut router_config_messages: BoxStream<'static, RouterConfigMessage>,
10+
) -> BoxStream<'static, RoverDevMessage> {
11+
let (tx, rx) = unbounded_channel();
12+
tokio::spawn(async move {
13+
while let Some(message) = router_config_messages.next().await {
14+
let message = RoverDevMessage::from(message);
15+
tx.send(message)
16+
.tap_err(|err| tracing::error!("{:?}", err))
17+
.unwrap();
18+
}
19+
});
20+
UnboundedReceiverStream::new(rx).boxed()
21+
}
22+
23+
pub enum RoverDevMessage {
24+
Config(RouterConfigMessage),
25+
}
26+
27+
impl From<RouterConfigMessage> for RoverDevMessage {
28+
fn from(value: RouterConfigMessage) -> Self {
29+
Self::Config(value)
30+
}
31+
}

src/composition/watchers/mod.rs

+9
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,9 @@
1+
mod handler;
2+
mod messages;
3+
mod run;
4+
mod subtask;
5+
pub mod watcher;
6+
7+
// NB: I removed the dev-related stuff here; that should go on the rover-dev-integration branch,
8+
// but I think Dan already has much if not all of this sorted and we only need to make the
9+
// watchers/etc available to start/listen to them

src/composition/watchers/run.rs

+45
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,45 @@
1+
use anyhow::Result;
2+
use camino::Utf8PathBuf;
3+
use derive_getters::Getters;
4+
use futures::StreamExt;
5+
use tokio::join;
6+
7+
use super::{
8+
handler::router_config::WriteRouterConfig,
9+
messages::receive_messages,
10+
messages::RoverDevMessage,
11+
subtask::{Subtask, SubtaskRunStream, SubtaskRunUnit},
12+
watcher::router_config::RouterConfigMessage,
13+
watcher::{file::FileWatcher, router_config::RouterConfigWatcher},
14+
};
15+
16+
#[derive(Getters)]
17+
pub struct RoverDevConfig {
18+
router: RoverDevRouterConfig,
19+
}
20+
21+
#[derive(Getters)]
22+
pub struct RoverDevRouterConfig {
23+
config_path: Utf8PathBuf,
24+
tmp_config_path: Utf8PathBuf,
25+
}
26+
27+
pub async fn run(config: RoverDevConfig) -> Result<()> {
28+
let (router_config_messages, router_config_subtask) = Subtask::new(RouterConfigWatcher::new(
29+
FileWatcher::new(config.router.config_path().clone()),
30+
));
31+
let write_router_config = WriteRouterConfig::new(config.router.tmp_config_path().clone());
32+
router_config_subtask.run();
33+
let mut messages = receive_messages(router_config_messages.boxed());
34+
let join_handle = tokio::spawn(async move {
35+
while let Some(message) = messages.next().await {
36+
match &message {
37+
RoverDevMessage::Config(RouterConfigMessage::Changed(contents)) => {
38+
write_router_config.run(contents.as_str());
39+
}
40+
}
41+
}
42+
});
43+
join!(join_handle);
44+
Ok(())
45+
}

src/composition/watchers/subtask.rs

+58
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,58 @@
1+
use futures::stream::BoxStream;
2+
use tokio::{
3+
sync::mpsc::{unbounded_channel, UnboundedSender},
4+
task::AbortHandle,
5+
};
6+
use tokio_stream::wrappers::UnboundedReceiverStream;
7+
8+
pub trait SubtaskRunUnit {
9+
fn run(self) -> AbortHandle;
10+
}
11+
12+
pub trait SubtaskRunStream {
13+
type Input;
14+
fn run(self, input: BoxStream<'static, Self::Input>) -> AbortHandle;
15+
}
16+
17+
pub trait SubtaskHandleUnit {
18+
type Output;
19+
fn handle(self, sender: UnboundedSender<Self::Output>) -> AbortHandle;
20+
}
21+
22+
pub trait SubtaskHandleStream {
23+
type Input;
24+
type Output;
25+
fn handle(
26+
self,
27+
sender: UnboundedSender<Self::Output>,
28+
input: BoxStream<'static, Self::Input>,
29+
) -> AbortHandle;
30+
}
31+
32+
pub struct Subtask<T, Output> {
33+
inner: T,
34+
sender: UnboundedSender<Output>,
35+
}
36+
37+
impl<T, Output> Subtask<T, Output> {
38+
pub fn new(inner: T) -> (UnboundedReceiverStream<Output>, Subtask<T, Output>) {
39+
let (tx, rx) = unbounded_channel();
40+
(
41+
UnboundedReceiverStream::new(rx),
42+
Subtask { inner, sender: tx },
43+
)
44+
}
45+
}
46+
47+
impl<T: SubtaskHandleUnit<Output = Output>, Output> SubtaskRunUnit for Subtask<T, Output> {
48+
fn run(self) -> AbortHandle {
49+
self.inner.handle(self.sender)
50+
}
51+
}
52+
53+
impl<T: SubtaskHandleStream<Output = Output>, Output> SubtaskRunStream for Subtask<T, Output> {
54+
type Input = T::Input;
55+
fn run(self, input: BoxStream<'static, Self::Input>) -> AbortHandle {
56+
self.inner.handle(self.sender, input)
57+
}
58+
}
+43
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,43 @@
1+
use camino::Utf8PathBuf;
2+
use futures::{stream::BoxStream, StreamExt};
3+
use tap::TapFallible;
4+
use tokio::sync::mpsc::unbounded_channel;
5+
6+
use rover_std::Fs;
7+
use tokio_stream::wrappers::UnboundedReceiverStream;
8+
9+
#[derive(Clone)]
10+
pub struct FileWatcher {
11+
path: Utf8PathBuf,
12+
}
13+
14+
impl FileWatcher {
15+
pub fn new(path: Utf8PathBuf) -> FileWatcher {
16+
FileWatcher { path }
17+
}
18+
19+
pub fn watch(self) -> BoxStream<'static, String> {
20+
let path = self.path;
21+
let (file_tx, file_rx) = unbounded_channel();
22+
let output = UnboundedReceiverStream::new(file_rx);
23+
Fs::watch_file(path.clone(), file_tx);
24+
output
25+
.filter_map(move |result| {
26+
let path = path.clone();
27+
async move {
28+
result
29+
.and_then(|_| {
30+
Fs::read_file(path).tap_err(|err| {
31+
tracing::error!(
32+
"Could not read router configuration file: {:?}",
33+
err
34+
);
35+
eprintln!("Could not read router configuration file.");
36+
})
37+
})
38+
.ok()
39+
}
40+
})
41+
.boxed()
42+
}
43+
}
+3
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,3 @@
1+
pub mod file;
2+
pub mod router_config;
3+
pub mod supergraph_config;
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
use futures::StreamExt;
2+
3+
use super::super::subtask::SubtaskHandleUnit;
4+
5+
use super::file::FileWatcher;
6+
7+
#[derive(Clone, Debug)]
8+
pub enum RouterConfigMessage {
9+
Changed(String),
10+
}
11+
12+
pub struct RouterConfigWatcher {
13+
file_watcher: FileWatcher,
14+
}
15+
16+
impl RouterConfigWatcher {
17+
pub fn new(file_watcher: FileWatcher) -> RouterConfigWatcher {
18+
RouterConfigWatcher { file_watcher }
19+
}
20+
}
21+
22+
impl SubtaskHandleUnit for RouterConfigWatcher {
23+
type Output = RouterConfigMessage;
24+
25+
fn handle(
26+
self,
27+
sender: tokio::sync::mpsc::UnboundedSender<Self::Output>,
28+
) -> tokio::task::AbortHandle {
29+
tokio::spawn(async move {
30+
while let Some(contents) = self.file_watcher.clone().watch().next().await {
31+
sender.send(RouterConfigMessage::Changed(contents));
32+
}
33+
})
34+
.abort_handle()
35+
}
36+
}
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,99 @@
1+
use std::collections::HashSet;
2+
3+
use apollo_federation_types::{
4+
build::SubgraphDefinition,
5+
config::{ConfigError, SupergraphConfig},
6+
};
7+
use derive_getters::Getters;
8+
use futures::StreamExt;
9+
use tap::TapFallible;
10+
11+
use super::file::FileWatcher;
12+
use crate::composition::watchers::subtask::SubtaskHandleUnit;
13+
14+
pub struct SupergraphConfigWatcher {
15+
file_watcher: FileWatcher,
16+
supergraph_config: SupergraphConfig,
17+
}
18+
19+
impl SupergraphConfigWatcher {
20+
pub fn new(
21+
file_watcher: FileWatcher,
22+
supergraph_config: SupergraphConfig,
23+
) -> SupergraphConfigWatcher {
24+
SupergraphConfigWatcher {
25+
file_watcher,
26+
supergraph_config,
27+
}
28+
}
29+
}
30+
31+
impl SubtaskHandleUnit for SupergraphConfigWatcher {
32+
type Output = SupergraphConfigDiff;
33+
fn handle(
34+
self,
35+
sender: tokio::sync::mpsc::UnboundedSender<Self::Output>,
36+
) -> tokio::task::AbortHandle {
37+
tokio::spawn(async move {
38+
let mut latest_supergraph_config = self.supergraph_config.clone();
39+
while let Some(contents) = self.file_watcher.clone().watch().next().await {
40+
match SupergraphConfig::new_from_yaml(&contents) {
41+
Ok(supergraph_config) => {
42+
if let Ok(supergraph_config_diff) =
43+
SupergraphConfigDiff::new(&latest_supergraph_config, &supergraph_config)
44+
{
45+
let _ = sender
46+
.send(supergraph_config_diff)
47+
.tap_err(|err| tracing::error!("{:?}", err));
48+
}
49+
latest_supergraph_config = supergraph_config;
50+
}
51+
Err(err) => {
52+
tracing::error!("Could not parse supergraph config file. {:?}", err);
53+
eprintln!("Could not parse supergraph config file");
54+
}
55+
}
56+
}
57+
})
58+
.abort_handle()
59+
}
60+
}
61+
62+
#[derive(Getters)]
63+
pub struct SupergraphConfigDiff {
64+
added: Vec<SubgraphDefinition>,
65+
removed: Vec<String>,
66+
}
67+
68+
impl SupergraphConfigDiff {
69+
pub fn new(
70+
old: &SupergraphConfig,
71+
new: &SupergraphConfig,
72+
) -> Result<SupergraphConfigDiff, ConfigError> {
73+
let old_subgraph_defs = old.get_subgraph_definitions().tap_err(|err| {
74+
eprintln!(
75+
"Error getting subgraph definitions from the current supergraph config: {:?}",
76+
err
77+
)
78+
})?;
79+
let old_subgraph_names: HashSet<String> =
80+
HashSet::from_iter(old_subgraph_defs.iter().map(|def| def.name.to_string()));
81+
let new_subgraph_defs = new.get_subgraph_definitions().tap_err(|err| {
82+
eprintln!(
83+
"Error getting subgraph definitions from the modified supergraph config: {:?}",
84+
err
85+
)
86+
})?;
87+
let new_subgraph_names =
88+
HashSet::from_iter(new_subgraph_defs.iter().map(|def| def.name.to_string()));
89+
let added_names: HashSet<String> =
90+
HashSet::from_iter(new_subgraph_names.difference(&old_subgraph_names).cloned());
91+
let removed_names = old_subgraph_names.difference(&new_subgraph_names);
92+
let added = new_subgraph_defs
93+
.into_iter()
94+
.filter(|def| added_names.contains(&def.name))
95+
.collect::<Vec<_>>();
96+
let removed = removed_names.into_iter().cloned().collect::<Vec<_>>();
97+
Ok(SupergraphConfigDiff { added, removed })
98+
}
99+
}

0 commit comments

Comments
 (0)