Skip to content

Commit

Permalink
moving dev_2 over to composition-integration (#2130)
Browse files Browse the repository at this point in the history
Co-authored-by: Brian George <[email protected]>
  • Loading branch information
aaronArinder and dotdat committed Sep 17, 2024
1 parent c72e9e3 commit a692e31
Show file tree
Hide file tree
Showing 13 changed files with 353 additions and 2 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ apollo-federation-types = { workspace = true }
apollo-parser = { workspace = true }
billboard = { workspace = true }
binstall = { workspace = true }
buildstructor = { workspace = true }
calm_io = { workspace = true }
camino = { workspace = true }
clap = { workspace = true, features = ["color", "derive", "env"] }
Expand Down Expand Up @@ -204,12 +205,12 @@ sputnik = { workspace = true }
strsim = { workspace = true }
strum = { workspace = true }
strum_macros = { workspace = true }
tap = { workspace = true }
tar = { workspace = true }
tempfile = { workspace = true }
timber = { workspace = true }
termimad = { workspace = true }
thiserror = { workspace = true }
tap = { workspace = true }
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "macros", "process", "sync"] }
tokio-stream = { workspace = true }
toml = { workspace = true }
Expand Down Expand Up @@ -240,5 +241,4 @@ reqwest = { workspace = true, features = ["native-tls-vendored"] }
rstest = { workspace = true }
serial_test = { workspace = true }
speculoos = { workspace = true }
tokio = { workspace = true }
tracing-test = "0.2.5"
3 changes: 3 additions & 0 deletions src/composition/mod.rs
Original file line number Diff line number Diff line change
@@ -1 +1,4 @@
pub mod supergraph;

#[cfg(feature = "composition-js")]
mod watchers;
1 change: 1 addition & 0 deletions src/composition/watchers/handler/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
pub mod router_config;
22 changes: 22 additions & 0 deletions src/composition/watchers/handler/router_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
use camino::Utf8PathBuf;
use rover_std::{Fs, RoverStdError};

#[derive(thiserror::Error, Debug)]
pub enum RouterConfigError {
#[error("Unable to write router config file")]
FailedToWriteFile(RoverStdError),
}

pub struct WriteRouterConfig {
path: Utf8PathBuf,
}

impl WriteRouterConfig {
pub fn new(path: Utf8PathBuf) -> WriteRouterConfig {
WriteRouterConfig { path }
}
pub fn run(&self, contents: &str) -> Result<(), RouterConfigError> {
Fs::write_file(self.path.clone(), contents.to_string())
.map_err(|err| RouterConfigError::FailedToWriteFile(err))
}
}
31 changes: 31 additions & 0 deletions src/composition/watchers/messages.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
use futures::{stream::BoxStream, StreamExt};
use tap::TapFallible;
use tokio::sync::mpsc::unbounded_channel;
use tokio_stream::wrappers::UnboundedReceiverStream;

use super::watcher::router_config::RouterConfigMessage;

pub fn receive_messages(
mut router_config_messages: BoxStream<'static, RouterConfigMessage>,
) -> BoxStream<'static, RoverDevMessage> {
let (tx, rx) = unbounded_channel();
tokio::spawn(async move {
while let Some(message) = router_config_messages.next().await {
let message = RoverDevMessage::from(message);
tx.send(message)
.tap_err(|err| tracing::error!("{:?}", err))
.unwrap();
}
});
UnboundedReceiverStream::new(rx).boxed()
}

pub enum RoverDevMessage {
Config(RouterConfigMessage),
}

impl From<RouterConfigMessage> for RoverDevMessage {
fn from(value: RouterConfigMessage) -> Self {
Self::Config(value)
}
}
9 changes: 9 additions & 0 deletions src/composition/watchers/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
mod handler;
mod messages;
mod run;
mod subtask;
pub mod watcher;

// NB: I removed the dev-related stuff here; that should go on the rover-dev-integration branch,
// but I think Dan already has much if not all of this sorted and we only need to make the
// watchers/etc available to start/listen to them
45 changes: 45 additions & 0 deletions src/composition/watchers/run.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,45 @@
use anyhow::Result;
use camino::Utf8PathBuf;
use derive_getters::Getters;
use futures::StreamExt;
use tokio::join;

use super::{
handler::router_config::WriteRouterConfig,
messages::receive_messages,
messages::RoverDevMessage,
subtask::{Subtask, SubtaskRunStream, SubtaskRunUnit},
watcher::router_config::RouterConfigMessage,
watcher::{file::FileWatcher, router_config::RouterConfigWatcher},
};

#[derive(Getters)]
pub struct RoverDevConfig {
router: RoverDevRouterConfig,
}

#[derive(Getters)]
pub struct RoverDevRouterConfig {
config_path: Utf8PathBuf,
tmp_config_path: Utf8PathBuf,
}

pub async fn run(config: RoverDevConfig) -> Result<()> {
let (router_config_messages, router_config_subtask) = Subtask::new(RouterConfigWatcher::new(
FileWatcher::new(config.router.config_path().clone()),
));
let write_router_config = WriteRouterConfig::new(config.router.tmp_config_path().clone());
router_config_subtask.run();
let mut messages = receive_messages(router_config_messages.boxed());
let join_handle = tokio::spawn(async move {
while let Some(message) = messages.next().await {
match &message {
RoverDevMessage::Config(RouterConfigMessage::Changed(contents)) => {
write_router_config.run(contents.as_str());
}
}
}
});
join!(join_handle);
Ok(())
}
58 changes: 58 additions & 0 deletions src/composition/watchers/subtask.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
use futures::stream::BoxStream;
use tokio::{
sync::mpsc::{unbounded_channel, UnboundedSender},
task::AbortHandle,
};
use tokio_stream::wrappers::UnboundedReceiverStream;

pub trait SubtaskRunUnit {
fn run(self) -> AbortHandle;
}

pub trait SubtaskRunStream {
type Input;
fn run(self, input: BoxStream<'static, Self::Input>) -> AbortHandle;
}

pub trait SubtaskHandleUnit {
type Output;
fn handle(self, sender: UnboundedSender<Self::Output>) -> AbortHandle;
}

pub trait SubtaskHandleStream {
type Input;
type Output;
fn handle(
self,
sender: UnboundedSender<Self::Output>,
input: BoxStream<'static, Self::Input>,
) -> AbortHandle;
}

pub struct Subtask<T, Output> {
inner: T,
sender: UnboundedSender<Output>,
}

impl<T, Output> Subtask<T, Output> {
pub fn new(inner: T) -> (UnboundedReceiverStream<Output>, Subtask<T, Output>) {
let (tx, rx) = unbounded_channel();
(
UnboundedReceiverStream::new(rx),
Subtask { inner, sender: tx },
)
}
}

impl<T: SubtaskHandleUnit<Output = Output>, Output> SubtaskRunUnit for Subtask<T, Output> {
fn run(self) -> AbortHandle {
self.inner.handle(self.sender)
}
}

impl<T: SubtaskHandleStream<Output = Output>, Output> SubtaskRunStream for Subtask<T, Output> {
type Input = T::Input;
fn run(self, input: BoxStream<'static, Self::Input>) -> AbortHandle {
self.inner.handle(self.sender, input)
}
}
43 changes: 43 additions & 0 deletions src/composition/watchers/watcher/file.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
use camino::Utf8PathBuf;
use futures::{stream::BoxStream, StreamExt};
use tap::TapFallible;
use tokio::sync::mpsc::unbounded_channel;

use rover_std::Fs;
use tokio_stream::wrappers::UnboundedReceiverStream;

#[derive(Clone)]
pub struct FileWatcher {
path: Utf8PathBuf,
}

impl FileWatcher {
pub fn new(path: Utf8PathBuf) -> FileWatcher {
FileWatcher { path }
}

pub fn watch(self) -> BoxStream<'static, String> {
let path = self.path;
let (file_tx, file_rx) = unbounded_channel();
let output = UnboundedReceiverStream::new(file_rx);
Fs::watch_file(path.clone(), file_tx);
output
.filter_map(move |result| {
let path = path.clone();
async move {
result
.and_then(|_| {
Fs::read_file(path).tap_err(|err| {
tracing::error!(
"Could not read router configuration file: {:?}",
err
);
eprintln!("Could not read router configuration file.");
})
})
.ok()
}
})
.boxed()
}
}
3 changes: 3 additions & 0 deletions src/composition/watchers/watcher/mod.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
pub mod file;
pub mod router_config;
pub mod supergraph_config;
36 changes: 36 additions & 0 deletions src/composition/watchers/watcher/router_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,36 @@
use futures::StreamExt;

use super::super::subtask::SubtaskHandleUnit;

use super::file::FileWatcher;

#[derive(Clone, Debug)]
pub enum RouterConfigMessage {
Changed(String),
}

pub struct RouterConfigWatcher {
file_watcher: FileWatcher,
}

impl RouterConfigWatcher {
pub fn new(file_watcher: FileWatcher) -> RouterConfigWatcher {
RouterConfigWatcher { file_watcher }
}
}

impl SubtaskHandleUnit for RouterConfigWatcher {
type Output = RouterConfigMessage;

fn handle(
self,
sender: tokio::sync::mpsc::UnboundedSender<Self::Output>,
) -> tokio::task::AbortHandle {
tokio::spawn(async move {
while let Some(contents) = self.file_watcher.clone().watch().next().await {
sender.send(RouterConfigMessage::Changed(contents));
}
})
.abort_handle()
}
}
99 changes: 99 additions & 0 deletions src/composition/watchers/watcher/supergraph_config.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,99 @@
use std::collections::HashSet;

use apollo_federation_types::{
build::SubgraphDefinition,
config::{ConfigError, SupergraphConfig},
};
use derive_getters::Getters;
use futures::StreamExt;
use tap::TapFallible;

use super::file::FileWatcher;
use crate::composition::watchers::subtask::SubtaskHandleUnit;

pub struct SupergraphConfigWatcher {
file_watcher: FileWatcher,
supergraph_config: SupergraphConfig,
}

impl SupergraphConfigWatcher {
pub fn new(
file_watcher: FileWatcher,
supergraph_config: SupergraphConfig,
) -> SupergraphConfigWatcher {
SupergraphConfigWatcher {
file_watcher,
supergraph_config,
}
}
}

impl SubtaskHandleUnit for SupergraphConfigWatcher {
type Output = SupergraphConfigDiff;
fn handle(
self,
sender: tokio::sync::mpsc::UnboundedSender<Self::Output>,
) -> tokio::task::AbortHandle {
tokio::spawn(async move {
let mut latest_supergraph_config = self.supergraph_config.clone();
while let Some(contents) = self.file_watcher.clone().watch().next().await {
match SupergraphConfig::new_from_yaml(&contents) {
Ok(supergraph_config) => {
if let Ok(supergraph_config_diff) =
SupergraphConfigDiff::new(&latest_supergraph_config, &supergraph_config)
{
let _ = sender
.send(supergraph_config_diff)
.tap_err(|err| tracing::error!("{:?}", err));
}
latest_supergraph_config = supergraph_config;
}
Err(err) => {
tracing::error!("Could not parse supergraph config file. {:?}", err);
eprintln!("Could not parse supergraph config file");
}
}
}
})
.abort_handle()
}
}

#[derive(Getters)]
pub struct SupergraphConfigDiff {
added: Vec<SubgraphDefinition>,
removed: Vec<String>,
}

impl SupergraphConfigDiff {
pub fn new(
old: &SupergraphConfig,
new: &SupergraphConfig,
) -> Result<SupergraphConfigDiff, ConfigError> {
let old_subgraph_defs = old.get_subgraph_definitions().tap_err(|err| {
eprintln!(
"Error getting subgraph definitions from the current supergraph config: {:?}",
err
)
})?;
let old_subgraph_names: HashSet<String> =
HashSet::from_iter(old_subgraph_defs.iter().map(|def| def.name.to_string()));
let new_subgraph_defs = new.get_subgraph_definitions().tap_err(|err| {
eprintln!(
"Error getting subgraph definitions from the modified supergraph config: {:?}",
err
)
})?;
let new_subgraph_names =
HashSet::from_iter(new_subgraph_defs.iter().map(|def| def.name.to_string()));
let added_names: HashSet<String> =
HashSet::from_iter(new_subgraph_names.difference(&old_subgraph_names).cloned());
let removed_names = old_subgraph_names.difference(&new_subgraph_names);
let added = new_subgraph_defs
.into_iter()
.filter(|def| added_names.contains(&def.name))
.collect::<Vec<_>>();
let removed = removed_names.into_iter().cloned().collect::<Vec<_>>();
Ok(SupergraphConfigDiff { added, removed })
}
}

0 comments on commit a692e31

Please sign in to comment.