Skip to content

Commit 06b581b

Browse files
loshzaaronArinder
authored andcommitted
Aaron/subgraph watch introspection (#2142)
<!-- First, 🌠 thank you 🌠 for taking the time to consider a contribution to Apollo! Here are some important details to follow: * ⏰ Your time is important To save your precious time, if the contribution you are making will take more than an hour, please make sure it has been discussed in an issue first. This is especially true for feature requests! * 💡 Features Feature requests can be created and discussed within a GitHub Issue. Be sure to search for existing feature requests (and related issues!) prior to opening a new request. If an existing issue covers the need, please upvote that issue by using the 👍 emote, rather than opening a new issue. * 🕷 Bug fixes These can be created and discussed in this repository. When fixing a bug, please _try_ to add a test which verifies the fix. If you cannot, you should still submit the PR but we may still ask you (and help you!) to create a test. * 📖 Contribution guidelines Follow https://github.com/apollographql/rover/blob/HEAD/CONTRIBUTING.md when submitting a pull request. Make sure existing tests still pass, and add tests for all new behavior. * ✏️ Explain your pull request Describe the big picture of your changes here to communicate to what your pull request is meant to accomplish. Provide 🔗 links 🔗 to associated issues! We hope you will find this to be a positive experience! Open source contribution can be intimidating and we hope to alleviate that pain as much as possible. Without following these guidelines, you may be missing context that can help you succeed with your contribution, which is why we encourage discussion first. Ultimately, there is no guarantee that we will be able to merge your pull-request, but by following these guidelines we can try to avoid disappointment. --> --------- Co-authored-by: Aaron Arinder <[email protected]>
1 parent 1e7ee25 commit 06b581b

File tree

5 files changed

+142
-47
lines changed

5 files changed

+142
-47
lines changed

src/command/dev/runner.rs

+20-22
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,5 @@
11
use anyhow::anyhow;
2-
use apollo_federation_types::config::{SchemaSource, SupergraphConfig};
2+
use apollo_federation_types::config::SupergraphConfig;
33
use futures::stream::StreamExt;
44
use tokio::task::JoinHandle;
55

@@ -18,6 +18,7 @@ use crate::{
1818
RoverError, RoverResult,
1919
};
2020

21+
// TODO: handle retry flag for subgraphs (see rover dev help)
2122
pub struct Runner {
2223
client_config: StudioClientConfig,
2324
supergraph_opts: SupergraphOpts,
@@ -69,27 +70,24 @@ impl Runner {
6970

7071
// Create subgraph config watchers.
7172
for (subgraph, subgraph_config) in supergraph_config.into_iter() {
72-
match subgraph_config.schema {
73-
SchemaSource::File { ref file } => {
74-
// Create a new file watcher kind.
75-
let kind = SubgraphConfigWatcherKind::File(FileWatcher::new(file.clone()));
76-
// Construct a subgraph config watcher from the file watcher kind.
77-
let watcher = SubgraphConfigWatcher::new(kind, subgraph_config);
78-
// Create and run the file watcher in a sub task.
79-
let (mut stream, subtask) = Subtask::new(watcher);
80-
subtask.run();
81-
82-
let task = tokio::task::spawn(async move {
83-
while let Some(_) = stream.next().await {
84-
eprintln!("subgraph update: {subgraph}");
85-
}
86-
});
87-
futs.push(task);
73+
// FIXME: remove unwrap
74+
// Create a new file watcher kind.
75+
let watcher_kind: SubgraphConfigWatcherKind =
76+
subgraph_config.schema.try_into().unwrap();
77+
78+
// Construct a subgraph config watcher from the file watcher kind.
79+
let watcher = SubgraphConfigWatcher::new(watcher_kind, &subgraph);
80+
// Create and run the file watcher in a sub task.
81+
let (mut stream, subtask) = Subtask::new(watcher);
82+
subtask.run();
83+
84+
let task = tokio::task::spawn(async move {
85+
while let Some(_) = stream.next().await {
86+
eprintln!("subgraph update: {subgraph}");
8887
}
89-
SchemaSource::SubgraphIntrospection { .. } => todo!(),
90-
SchemaSource::Sdl { .. } => todo!(),
91-
SchemaSource::Subgraph { .. } => todo!(),
92-
};
88+
});
89+
90+
futs.push(task);
9391
}
9492

9593
futs
@@ -105,7 +103,7 @@ impl Runner {
105103
false,
106104
)
107105
.await
108-
.map_err(|_| RoverError::new(anyhow!("TODO: get actual error")))?
106+
.map_err(|err| RoverError::new(anyhow!("{err}")))?
109107
.ok_or_else(|| RoverError::new(anyhow!("Why is supergraph config None?")))
110108
}
111109
}

src/command/dev/watcher/subgraph_config.rs

+119-22
Original file line numberDiff line numberDiff line change
@@ -1,12 +1,19 @@
1+
use std::fs::OpenOptions;
12
use std::{marker::Send, pin::Pin};
23

3-
use apollo_federation_types::config::SubgraphConfig;
4+
use anyhow::{anyhow, Error};
5+
use apollo_federation_types::config::{SchemaSource, SubgraphConfig};
6+
use camino::Utf8PathBuf;
7+
use futures::stream::BoxStream;
48
use futures::{Stream, StreamExt};
59
use rover_std::errln;
610
use tap::TapFallible;
711
use tokio::{sync::mpsc::UnboundedSender, task::AbortHandle};
812

9-
use crate::command::dev::{introspect::IntrospectRunnerKind, subtask::SubtaskHandleUnit};
13+
use crate::cli::RoverOutputFormatKind;
14+
use crate::command::dev::{subtask::SubtaskHandleUnit, types::SubgraphUrl};
15+
use crate::command::subgraph::introspect::Introspect as SubgraphIntrospect;
16+
use crate::options::{IntrospectOpts, OutputOpts};
1017

1118
use super::file::FileWatcher;
1219

@@ -15,31 +22,117 @@ pub enum SubgraphConfigWatcherKind {
1522
/// Watch a file on disk.
1623
File(FileWatcher),
1724
/// Poll an endpoint via introspection.
18-
_Introspect(IntrospectRunnerKind, u64),
25+
Introspect(SubgraphIntrospection),
1926
/// Don't ever update, schema is only pulled once.
2027
_Once(String),
2128
}
2229

30+
#[derive(Debug, Clone)]
31+
pub struct SubgraphIntrospection {
32+
endpoint: SubgraphUrl,
33+
// TODO: ticket using a hashmap, not a tuple, in introspect opts as eventual cleanup
34+
headers: Option<Vec<(String, String)>>,
35+
}
36+
37+
//TODO: impl retry (needed at least for dev)
38+
impl SubgraphIntrospection {
39+
fn new(endpoint: SubgraphUrl, headers: Option<Vec<(String, String)>>) -> Self {
40+
Self { endpoint, headers }
41+
}
42+
43+
async fn watch(&self, subgraph_name: &str) -> FileWatcher {
44+
let client = reqwest::Client::new();
45+
46+
//FIXME: unwrap removed
47+
// TODO: does this re-use tmp dirs? or, what? don't want errors second time we run
48+
// TODO: clean up after?
49+
let tmp_dir = tempfile::Builder::new().tempdir().unwrap();
50+
let tmp_config_dir_path = Utf8PathBuf::try_from(tmp_dir.into_path()).unwrap();
51+
52+
// NOTE: this assumes subgraph names are unique; are they?
53+
let tmp_introspection_file = tmp_config_dir_path.join(subgraph_name);
54+
55+
let _ = OpenOptions::new()
56+
.write(true)
57+
.create(true)
58+
.truncate(true)
59+
.open(tmp_introspection_file.clone())
60+
// FIXME: unwrap
61+
.unwrap();
62+
63+
let output_opts = OutputOpts {
64+
format_kind: RoverOutputFormatKind::default(),
65+
output_file: Some(tmp_introspection_file.clone()),
66+
};
67+
68+
let endpoint = self.endpoint.clone();
69+
let headers = self.headers.clone();
70+
71+
tokio::spawn(async move {
72+
let _ = SubgraphIntrospect {
73+
opts: IntrospectOpts {
74+
endpoint,
75+
headers,
76+
// TODO impl retries (at least for dev from cli flag)
77+
watch: true,
78+
},
79+
}
80+
.run(client, &output_opts, None)
81+
.await
82+
.map_err(|err| anyhow!(err));
83+
});
84+
85+
FileWatcher::new(tmp_introspection_file)
86+
}
87+
}
88+
2389
impl SubgraphConfigWatcherKind {
24-
async fn watch(&self) -> Pin<Box<dyn Stream<Item = String> + Send>> {
90+
async fn watch(&self, subgraph_name: &str) -> Pin<Box<dyn Stream<Item = String> + Send>> {
2591
match self {
2692
Self::File(file_watcher) => file_watcher.clone().watch(),
27-
Self::_Introspect(_, _) => todo!(),
93+
Self::Introspect(introspection) => {
94+
let watcher = introspection.watch(subgraph_name).await;
95+
println!("watcher: {watcher:?}");
96+
97+
watcher.watch()
98+
}
2899
Self::_Once(_) => todo!(),
29100
}
30101
}
31102
}
32103

104+
impl TryFrom<SchemaSource> for SubgraphConfigWatcherKind {
105+
// FIXME: anyhow error -> bespoke error with impl From to rovererror or whatever
106+
type Error = anyhow::Error;
107+
fn try_from(schema_source: SchemaSource) -> Result<Self, Self::Error> {
108+
match schema_source {
109+
SchemaSource::File { file } => Ok(Self::File(FileWatcher::new(file))),
110+
SchemaSource::SubgraphIntrospection {
111+
subgraph_url,
112+
introspection_headers,
113+
} => Ok(Self::Introspect(SubgraphIntrospection {
114+
endpoint: subgraph_url,
115+
headers: introspection_headers.map(|header_map| header_map.into_iter().collect()),
116+
})),
117+
// SDL (stdin? not sure) / Subgraph (ie, from graph-ref)
118+
unsupported_source => Err(anyhow!(
119+
"unsupported subgraph introspection source: {unsupported_source:?}"
120+
)),
121+
}
122+
}
123+
}
124+
33125
pub struct SubgraphConfigWatcher {
126+
subgraph_name: String,
34127
watcher: SubgraphConfigWatcherKind,
35-
//subgraph_config: SubgraphConfig,
36128
}
37129

38130
impl SubgraphConfigWatcher {
39-
pub fn new(watcher: SubgraphConfigWatcherKind, _subgraph_config: SubgraphConfig) -> Self {
131+
// not sure we need the subgraph config?
132+
pub fn new(watcher: SubgraphConfigWatcherKind, subgraph_name: &str) -> Self {
40133
Self {
41134
watcher,
42-
//subgraph_config,
135+
subgraph_name: subgraph_name.to_string(),
43136
}
44137
}
45138
}
@@ -52,23 +145,27 @@ impl SubtaskHandleUnit for SubgraphConfigWatcher {
52145

53146
fn handle(self, sender: UnboundedSender<Self::Output>) -> AbortHandle {
54147
tokio::spawn(async move {
55-
while let Some(content) = self.watcher.watch().await.next().await {
56-
let parsed_config: Result<SubgraphConfig, serde_yaml::Error> =
57-
serde_yaml::from_str(&content);
148+
while let Some(content) = self.watcher.watch(&self.subgraph_name).await.next().await {
149+
// TODO: fix parsing; see wtf is up
150+
//let parsed_config: Result<SubgraphConfig, serde_yaml::Error> =
151+
// serde_yaml::from_str(&content);
152+
let _ = sender
153+
.send(SubgraphChanged)
154+
.tap_err(|err| tracing::error!("{:?}", err));
58155

59156
// We're only looking at whether a subgraph has changed, but we won't emit events
60157
// if the subgraph config can't be parsed to fail early for composition
61-
match parsed_config {
62-
Ok(_subgraph_config) => {
63-
let _ = sender
64-
.send(SubgraphChanged)
65-
.tap_err(|err| tracing::error!("{:?}", err));
66-
}
67-
Err(err) => {
68-
tracing::error!("Could not parse subgraph config file: {:?}", err);
69-
errln!("could not parse subgraph config file");
70-
}
71-
}
158+
//match parsed_config {
159+
// Ok(_subgraph_config) => {
160+
// let _ = sender
161+
// .send(SubgraphChanged)
162+
// .tap_err(|err| tracing::error!("{:?}", err));
163+
// }
164+
// Err(err) => {
165+
// tracing::error!("Could not parse subgraph config file: {:?}", err);
166+
// errln!("could not parse subgraph config file");
167+
// }
168+
//}
72169
}
73170
})
74171
.abort_handle()

src/command/subgraph/introspect.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -56,7 +56,7 @@ impl Introspect {
5656
)
5757
}
5858

59-
pub async fn exec_and_watch(
59+
async fn exec_and_watch(
6060
&self,
6161
client: &Client,
6262
output_opts: &OutputOpts,

src/command/subgraph/mod.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
mod check;
22
mod delete;
33
mod fetch;
4-
mod introspect;
4+
pub mod introspect;
55
mod lint;
66
mod list;
77
mod publish;

src/options/output.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -81,7 +81,7 @@ impl RoverPrinter for RoverError {
8181
pub struct OutputOpts {
8282
/// Specify Rover's format type
8383
#[arg(long = "format", global = true, default_value_t)]
84-
format_kind: RoverOutputFormatKind,
84+
pub format_kind: RoverOutputFormatKind,
8585

8686
/// Specify a file to write Rover's output to
8787
#[arg(long = "output", short = 'o', global = true, value_parser = Self::parse_absolute_path)]

0 commit comments

Comments
 (0)