From cd68ffc8c6b59e65234c386b9077feda273d533f Mon Sep 17 00:00:00 2001 From: Brian George Date: Wed, 18 Dec 2024 18:08:06 -0500 Subject: [PATCH 1/5] Defer subgraph fetch until execution --- .../src/operations/subgraph/fetch/mod.rs | 2 + .../src/operations/subgraph/fetch/runner.rs | 171 +------- .../src/operations/subgraph/fetch/service.rs | 238 +++++++++++ .../src/operations/subgraph/fetch/types.rs | 16 +- .../src/operations/subgraph/fetch_all/mod.rs | 2 + .../operations/subgraph/fetch_all/runner.rs | 243 +---------- .../operations/subgraph/fetch_all/service.rs | 387 ++++++++++++++++++ .../operations/subgraph/fetch_all/types.rs | 97 +---- src/command/dev/next/mod.rs | 26 +- src/command/supergraph/compose/do_compose.rs | 23 +- src/composition/pipeline.rs | 39 +- .../supergraph/config/full/subgraph.rs | 37 +- .../supergraph/config/full/supergraph.rs | 24 +- .../config/resolver/fetch_remote_subgraph.rs | 153 +++++++ .../config/resolver/fetch_remote_subgraphs.rs | 128 ++++++ .../supergraph/config/resolver/mod.rs | 51 ++- src/utils/effect/fetch_remote_subgraph.rs | 171 -------- src/utils/effect/fetch_remote_subgraphs.rs | 51 --- src/utils/effect/mod.rs | 2 - 19 files changed, 1093 insertions(+), 768 deletions(-) create mode 100644 crates/rover-client/src/operations/subgraph/fetch/service.rs create mode 100644 crates/rover-client/src/operations/subgraph/fetch_all/service.rs create mode 100644 src/composition/supergraph/config/resolver/fetch_remote_subgraph.rs create mode 100644 src/composition/supergraph/config/resolver/fetch_remote_subgraphs.rs delete mode 100644 src/utils/effect/fetch_remote_subgraph.rs delete mode 100644 src/utils/effect/fetch_remote_subgraphs.rs diff --git a/crates/rover-client/src/operations/subgraph/fetch/mod.rs b/crates/rover-client/src/operations/subgraph/fetch/mod.rs index 65ea4d534..4fd69a3e7 100644 --- a/crates/rover-client/src/operations/subgraph/fetch/mod.rs +++ b/crates/rover-client/src/operations/subgraph/fetch/mod.rs @@ -1,5 +1,7 @@ mod runner; +mod service; mod types; pub use runner::run; +pub use service::{SubgraphFetch, SubgraphFetchRequest}; pub use types::SubgraphFetchInput; diff --git a/crates/rover-client/src/operations/subgraph/fetch/runner.rs b/crates/rover-client/src/operations/subgraph/fetch/runner.rs index e9beaac21..7f2ed329a 100644 --- a/crates/rover-client/src/operations/subgraph/fetch/runner.rs +++ b/crates/rover-client/src/operations/subgraph/fetch/runner.rs @@ -1,168 +1,23 @@ -use super::types::*; +use tower::{Service, ServiceExt}; + use crate::blocking::StudioClient; -use crate::shared::{FetchResponse, Sdl, SdlType}; +use crate::shared::FetchResponse; use crate::RoverClientError; -use graphql_client::*; - -#[derive(GraphQLQuery)] -// The paths are relative to the directory where your `Cargo.toml` is located. -// Both json and the GraphQL schema language are supported as sources for the schema -#[graphql( - query_path = "src/operations/subgraph/fetch/fetch_query.graphql", - schema_path = ".schema/schema.graphql", - response_derives = "Eq, PartialEq, Debug, Serialize, Deserialize", - deprecated = "warn" -)] -/// This struct is used to generate the module containing `Variables` and -/// `ResponseData` structs. -/// Snake case of this name is the mod name. i.e. subgraph_fetch_query -pub(crate) struct SubgraphFetchQuery; +use super::service::{SubgraphFetch, SubgraphFetchRequest}; +use super::types::*; /// Fetches a schema from apollo studio and returns its SDL (String) pub async fn run( input: SubgraphFetchInput, client: &StudioClient, ) -> Result { - let variables = input.clone().into(); - let response_data = client.post::(variables).await?; - get_sdl_from_response_data(input, response_data) -} - -fn get_sdl_from_response_data( - input: SubgraphFetchInput, - response_data: SubgraphFetchResponseData, -) -> Result { - let subgraph = get_subgraph_from_response_data(input, response_data)?; - Ok(FetchResponse { - sdl: Sdl { - contents: subgraph.sdl, - r#type: SdlType::Subgraph { - routing_url: subgraph.url, - }, - }, - }) -} - -#[derive(Debug, PartialEq)] -struct Subgraph { - url: Option, - sdl: String, -} - -fn get_subgraph_from_response_data( - input: SubgraphFetchInput, - response_data: SubgraphFetchResponseData, -) -> Result { - if let Some(maybe_variant) = response_data.variant { - match maybe_variant { - SubgraphFetchGraphVariant::GraphVariant(variant) => { - if let Some(subgraph) = variant.subgraph { - Ok(Subgraph { - url: subgraph.url.clone(), - sdl: subgraph.active_partial_schema.sdl, - }) - } else if let Some(subgraphs) = variant.subgraphs { - let valid_subgraphs = subgraphs - .iter() - .map(|subgraph| subgraph.name.clone()) - .collect(); - Err(RoverClientError::NoSubgraphInGraph { - invalid_subgraph: input.subgraph_name, - valid_subgraphs, - }) - } else { - Err(RoverClientError::ExpectedFederatedGraph { - graph_ref: input.graph_ref, - can_operation_convert: true, - }) - } - } - _ => Err(RoverClientError::InvalidGraphRef), - } - } else { - Err(RoverClientError::GraphNotFound { - graph_ref: input.graph_ref, - }) - } -} - -#[cfg(test)] -mod tests { - use super::*; - use crate::shared::GraphRef; - use serde_json::json; - - #[test] - fn get_services_from_response_data_works() { - let sdl = "extend type User @key(fields: \"id\") {\n id: ID! @external\n age: Int\n}\n" - .to_string(); - let url = "http://my.subgraph.com".to_string(); - let input = mock_input(); - let json_response = json!({ - "variant": { - "__typename": "GraphVariant", - "subgraphs": [ - { "name": "accounts" }, - { "name": &input.subgraph_name } - ], - "subgraph": { - "url": &url, - "activePartialSchema": { - "sdl": &sdl - } - } - } - }); - let data: SubgraphFetchResponseData = serde_json::from_value(json_response).unwrap(); - let expected_subgraph = Subgraph { - url: Some(url), - sdl, - }; - let output = get_subgraph_from_response_data(input, data); - - assert!(output.is_ok()); - assert_eq!(output.unwrap(), expected_subgraph); - } - - #[test] - fn get_services_from_response_data_errs_with_no_variant() { - let json_response = json!({ "variant": null }); - let data: SubgraphFetchResponseData = serde_json::from_value(json_response).unwrap(); - let output = get_subgraph_from_response_data(mock_input(), data); - assert!(output.is_err()); - } - - #[test] - fn get_sdl_for_service_errs_on_invalid_name() { - let input = mock_input(); - let json_response = json!({ - "variant": { - "__typename": "GraphVariant", - "subgraphs": [ - { "name": "accounts" }, - { "name": &input.subgraph_name } - ], - "subgraph": null - } - }); - let data: SubgraphFetchResponseData = serde_json::from_value(json_response).unwrap(); - let output = get_subgraph_from_response_data(input, data); - - assert!(output.is_err()); - } - - fn mock_input() -> SubgraphFetchInput { - let graph_ref = GraphRef { - name: "mygraph".to_string(), - variant: "current".to_string(), - }; - - let subgraph_name = "products".to_string(); - - SubgraphFetchInput { - graph_ref, - subgraph_name, - } - } + let mut service = SubgraphFetch::new( + client + .service() + .map_err(|err| RoverClientError::ServiceError(Box::new(err)))?, + ); + let service = service.ready().await?; + let fetch_response = service.call(SubgraphFetchRequest::from(input)).await?; + Ok(fetch_response) } diff --git a/crates/rover-client/src/operations/subgraph/fetch/service.rs b/crates/rover-client/src/operations/subgraph/fetch/service.rs new file mode 100644 index 000000000..eb8e23542 --- /dev/null +++ b/crates/rover-client/src/operations/subgraph/fetch/service.rs @@ -0,0 +1,238 @@ +use std::{fmt, future::Future, pin::Pin}; + +use buildstructor::Builder; +use graphql_client::GraphQLQuery; +use rover_graphql::{GraphQLRequest, GraphQLServiceError}; +use tower::Service; + +use crate::{ + shared::{FetchResponse, GraphRef, Sdl, SdlType}, + RoverClientError, +}; + +#[derive(GraphQLQuery)] +// The paths are relative to the directory where your `Cargo.toml` is located. +// Both json and the GraphQL schema language are supported as sources for the schema +#[graphql( + query_path = "src/operations/subgraph/fetch/fetch_query.graphql", + schema_path = ".schema/schema.graphql", + response_derives = "Eq, PartialEq, Debug, Serialize, Deserialize", + deprecated = "warn" +)] +/// This struct is used to generate the module containing `Variables` and +/// `ResponseData` structs. +/// Snake case of this name is the mod name. i.e. subgraph_fetch_query +pub(crate) struct SubgraphFetchQuery; + +impl fmt::Debug for subgraph_fetch_query::Variables { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + f.debug_struct("Variables") + .field("graph_ref", &self.graph_ref) + .field("subgraph_name", &self.subgraph_name) + .finish() + } +} + +impl PartialEq for subgraph_fetch_query::Variables { + fn eq(&self, other: &Self) -> bool { + self.graph_ref == other.graph_ref && self.subgraph_name == other.subgraph_name + } +} + +#[derive(Builder)] +pub struct SubgraphFetchRequest { + graph_ref: GraphRef, + subgraph_name: String, +} + +#[derive(Clone)] +pub struct SubgraphFetch { + inner: S, +} + +impl SubgraphFetch { + pub fn new(inner: S) -> SubgraphFetch { + SubgraphFetch { inner } + } +} + +impl Service for SubgraphFetch +where + S: Service< + GraphQLRequest, + Response = subgraph_fetch_query::ResponseData, + Error = GraphQLServiceError, + Future = Fut, + > + Clone + + Send + + 'static, + Fut: Future> + Send, +{ + type Response = FetchResponse; + type Error = RoverClientError; + type Future = Pin> + Send>>; + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + tower::Service::>::poll_ready(&mut self.inner, cx) + .map_err(|err| RoverClientError::ServiceError(Box::new(err))) + } + + fn call(&mut self, req: SubgraphFetchRequest) -> Self::Future { + let cloned = self.inner.clone(); + let mut inner = std::mem::replace(&mut self.inner, cloned); + let fut = async move { + let variables = subgraph_fetch_query::Variables { + graph_ref: req.graph_ref.to_string(), + subgraph_name: req.subgraph_name.to_string(), + }; + let response_data = inner.call(GraphQLRequest::new(variables)).await?; + get_sdl_from_response_data(req.graph_ref, req.subgraph_name, response_data) + }; + Box::pin(fut) + } +} + +fn get_sdl_from_response_data( + graph_ref: GraphRef, + subgraph_name: String, + response_data: subgraph_fetch_query::ResponseData, +) -> Result { + let subgraph = get_subgraph_from_response_data(graph_ref, subgraph_name, response_data)?; + Ok(FetchResponse { + sdl: Sdl { + contents: subgraph.sdl, + r#type: SdlType::Subgraph { + routing_url: subgraph.url, + }, + }, + }) +} + +#[derive(Debug, PartialEq)] +struct Subgraph { + url: Option, + sdl: String, +} + +fn get_subgraph_from_response_data( + graph_ref: GraphRef, + subgraph_name: String, + response_data: subgraph_fetch_query::ResponseData, +) -> Result { + if let Some(maybe_variant) = response_data.variant { + match maybe_variant { + subgraph_fetch_query::SubgraphFetchQueryVariant::GraphVariant(variant) => { + if let Some(subgraph) = variant.subgraph { + Ok(Subgraph { + url: subgraph.url.clone(), + sdl: subgraph.active_partial_schema.sdl, + }) + } else if let Some(subgraphs) = variant.subgraphs { + let valid_subgraphs = subgraphs + .iter() + .map(|subgraph| subgraph.name.clone()) + .collect(); + Err(RoverClientError::NoSubgraphInGraph { + invalid_subgraph: subgraph_name, + valid_subgraphs, + }) + } else { + Err(RoverClientError::ExpectedFederatedGraph { + graph_ref, + can_operation_convert: true, + }) + } + } + _ => Err(RoverClientError::InvalidGraphRef), + } + } else { + Err(RoverClientError::GraphNotFound { graph_ref }) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use crate::shared::GraphRef; + use rstest::{fixture, rstest}; + use serde_json::json; + + #[rstest] + fn get_services_from_response_data_works(subgraph_name: String, graph_ref: GraphRef) { + let sdl = "extend type User @key(fields: \"id\") {\n id: ID! @external\n age: Int\n}\n" + .to_string(); + let url = "http://my.subgraph.com".to_string(); + let json_response = json!({ + "variant": { + "__typename": "GraphVariant", + "subgraphs": [ + { "name": "accounts" }, + { "name": &subgraph_name } + ], + "subgraph": { + "url": &url, + "activePartialSchema": { + "sdl": &sdl + } + } + } + }); + let data: subgraph_fetch_query::ResponseData = + serde_json::from_value(json_response).unwrap(); + let expected_subgraph = Subgraph { + url: Some(url), + sdl, + }; + let output = get_subgraph_from_response_data(graph_ref, subgraph_name, data); + + assert!(output.is_ok()); + assert_eq!(output.unwrap(), expected_subgraph); + } + + #[rstest] + fn get_services_from_response_data_errs_with_no_variant( + subgraph_name: String, + graph_ref: GraphRef, + ) { + let json_response = json!({ "variant": null }); + let data: subgraph_fetch_query::ResponseData = + serde_json::from_value(json_response).unwrap(); + let output = get_subgraph_from_response_data(graph_ref, subgraph_name, data); + assert!(output.is_err()); + } + + #[rstest] + fn get_sdl_for_service_errs_on_invalid_name(subgraph_name: String, graph_ref: GraphRef) { + let json_response = json!({ + "variant": { + "__typename": "GraphVariant", + "subgraphs": [ + { "name": "accounts" }, + { "name": &subgraph_name } + ], + "subgraph": null + } + }); + let data: subgraph_fetch_query::ResponseData = + serde_json::from_value(json_response).unwrap(); + let output = get_subgraph_from_response_data(graph_ref, subgraph_name, data); + + assert!(output.is_err()); + } + + #[fixture] + fn graph_ref() -> GraphRef { + GraphRef { + name: "mygraph".to_string(), + variant: "current".to_string(), + } + } + + #[fixture] + fn subgraph_name() -> String { + "products".to_string() + } +} diff --git a/crates/rover-client/src/operations/subgraph/fetch/types.rs b/crates/rover-client/src/operations/subgraph/fetch/types.rs index 3c5d78dfa..9bcad81d0 100644 --- a/crates/rover-client/src/operations/subgraph/fetch/types.rs +++ b/crates/rover-client/src/operations/subgraph/fetch/types.rs @@ -1,10 +1,6 @@ use crate::shared::GraphRef; -use super::runner::subgraph_fetch_query; - -pub(crate) type SubgraphFetchResponseData = subgraph_fetch_query::ResponseData; -pub(crate) type SubgraphFetchGraphVariant = subgraph_fetch_query::SubgraphFetchQueryVariant; -pub(crate) type QueryVariables = subgraph_fetch_query::Variables; +use super::service::SubgraphFetchRequest; #[derive(Debug, Clone, Eq, PartialEq)] pub struct SubgraphFetchInput { @@ -12,11 +8,11 @@ pub struct SubgraphFetchInput { pub subgraph_name: String, } -impl From for QueryVariables { +impl From for SubgraphFetchRequest { fn from(input: SubgraphFetchInput) -> Self { - Self { - graph_ref: input.graph_ref.to_string(), - subgraph_name: input.subgraph_name, - } + Self::builder() + .graph_ref(input.graph_ref) + .subgraph_name(input.subgraph_name) + .build() } } diff --git a/crates/rover-client/src/operations/subgraph/fetch_all/mod.rs b/crates/rover-client/src/operations/subgraph/fetch_all/mod.rs index 293e68530..db4cc4ae9 100644 --- a/crates/rover-client/src/operations/subgraph/fetch_all/mod.rs +++ b/crates/rover-client/src/operations/subgraph/fetch_all/mod.rs @@ -1,5 +1,7 @@ mod runner; +mod service; mod types; pub use runner::run; +pub use service::{SubgraphFetchAll, SubgraphFetchAllRequest}; pub use types::{SubgraphFetchAllInput, SubgraphFetchAllResponse}; diff --git a/crates/rover-client/src/operations/subgraph/fetch_all/runner.rs b/crates/rover-client/src/operations/subgraph/fetch_all/runner.rs index e47dfe57d..ebf2bd2e1 100644 --- a/crates/rover-client/src/operations/subgraph/fetch_all/runner.rs +++ b/crates/rover-client/src/operations/subgraph/fetch_all/runner.rs @@ -1,243 +1,24 @@ -use graphql_client::*; +use tower::{Service, ServiceExt}; use crate::blocking::StudioClient; -use crate::shared::GraphRef; use crate::RoverClientError; +use super::service::{SubgraphFetchAll, SubgraphFetchAllRequest}; use super::types::*; -#[derive(GraphQLQuery)] -// The paths are relative to the directory where your `Cargo.toml` is located. -// Both json and the GraphQL schema language are supported as sources for the schema -#[graphql( - query_path = "src/operations/subgraph/fetch_all/fetch_all_query.graphql", - schema_path = ".schema/schema.graphql", - response_derives = "Eq, PartialEq, Debug, Serialize, Deserialize", - deprecated = "warn" -)] -/// This struct is used to generate the module containing `Variables` and -/// `ResponseData` structs. -/// Snake case of this name is the mod name. i.e. subgraph_fetch_all_query -pub(crate) struct SubgraphFetchAllQuery; - /// For a given graph return all of its subgraphs as a list pub async fn run( input: SubgraphFetchAllInput, client: &StudioClient, ) -> Result { - let variables = input.clone().into(); - let response_data = client.post::(variables).await?; - get_subgraphs_from_response_data(input, response_data) -} - -fn get_subgraphs_from_response_data( - input: SubgraphFetchAllInput, - response_data: SubgraphFetchAllResponseData, -) -> Result { - match response_data.variant { - None => Err(RoverClientError::GraphNotFound { - graph_ref: input.graph_ref, - }), - Some(SubgraphFetchAllGraphVariant::GraphVariant(variant)) => { - extract_subgraphs_from_response(variant, input.graph_ref) - } - _ => Err(RoverClientError::InvalidGraphRef), - } -} -fn extract_subgraphs_from_response( - value: SubgraphFetchAllQueryVariantOnGraphVariant, - graph_ref: GraphRef, -) -> Result { - match (value.subgraphs, value.source_variant) { - // If we get null back in both branches or the query, or we get a structure in the - // sourceVariant half but there are no subgraphs in it. Then we return an error - // because this isn't a FederatedSubgraph **as far as we can tell**. - (None, None) - | ( - None, - Some(SubgraphFetchAllQueryVariantOnGraphVariantSourceVariant { - subgraphs: None, .. - }), - ) => Err(RoverClientError::ExpectedFederatedGraph { - graph_ref, - can_operation_convert: true, - }), - // If we get nothing from querying the subgraphs directly, but we do get some subgraphs - // on the sourceVariant side of the query, we just return those. - ( - None, - Some(SubgraphFetchAllQueryVariantOnGraphVariantSourceVariant { - subgraphs: Some(subgraphs), - latest_launch, - }), - ) => Ok(SubgraphFetchAllResponse { - subgraphs: subgraphs - .into_iter() - .map(|subgraph| subgraph.into()) - .collect(), - federation_version: latest_launch.and_then(|it| it.into()), - }), - // Here there are three cases where we might want to return the subgraphs we got from - // directly querying the graphVariant: - // 1. If we get subgraphs back from the graphVariant directly and nothing from the sourceVariant - // 2. If we get subgraphs back from the graphVariant directly and a structure from the - // sourceVariant, but it contains no subgraphs - // 3. If we get subgraphs back from both 'sides' of the query, we take the results from - // querying the **graphVariant**, as this is closest to the original behaviour, before - // we introduced the querying of the sourceVariant. - (Some(subgraphs), _) => Ok(SubgraphFetchAllResponse { - subgraphs: subgraphs - .into_iter() - .map(|subgraph| subgraph.into()) - .collect(), - federation_version: value.latest_launch.and_then(|it| it.into()), - }), - } -} - -#[cfg(test)] -mod tests { - use apollo_federation_types::config::FederationVersion; - use rstest::{fixture, rstest}; - use semver::Version; - use serde_json::{json, Value}; - - use crate::shared::GraphRef; - - use super::*; - - const SDL: &'static str = - "extend type User @key(fields: \"id\") {\n id: ID! @external\n age: Int\n}\n"; - const URL: &'static str = "http://my.subgraph.com"; - const SUBGRAPH_NAME: &'static str = "accounts"; - - #[rstest] - #[case::subgraphs_returned_direct_from_variant(json!( - { - "variant": { - "__typename": "GraphVariant", - "subgraphs": [ - { - "name": SUBGRAPH_NAME, - "url": URL, - "activePartialSchema": { - "sdl": SDL - } - }, - ], - "latestLaunch": { - "buildInput": { - "__typename": "CompositionBuildInput", - "version": "2.3.4" - } - }, - "sourceVariant": null - } - }), Some(SubgraphFetchAllResponse { - subgraphs: vec![Subgraph::builder().url(URL).sdl(SDL).name(SUBGRAPH_NAME).build()], - federation_version: Some(FederationVersion::ExactFedTwo(Version::new(2, 3, 4))), - }))] - #[case::subgraphs_returned_via_source_variant(json!( - { - "variant": { - "__typename": "GraphVariant", - "subgraphs": null, - "sourceVariant": { - "subgraphs": [ - { - "name": SUBGRAPH_NAME, - "url": URL, - "activePartialSchema": { - "sdl": SDL - } - } - ], - "latestLaunch": { - "buildInput": { - "__typename": "CompositionBuildInput", - "version": "2.3.4" - } - } - } - } - }), Some(SubgraphFetchAllResponse { - subgraphs: vec![Subgraph::builder().url(URL).sdl(SDL).name(SUBGRAPH_NAME).build()], - federation_version: Some(FederationVersion::ExactFedTwo(Version::new(2, 3, 4))), - }))] - #[case::no_subgraphs_returned_in_either_case(json!( - { - "variant": { - "__typename": "GraphVariant", - "subgraphs": null, - "sourceVariant": { - "subgraphs": null - } - } - }), None)] - #[case::subgraphs_returned_from_both_sides_of_the_query_means_we_get_the_variants_subgraphs(json!( - { - "variant": { - "__typename": "GraphVariant", - "subgraphs": [ - { - "name": SUBGRAPH_NAME, - "url": URL, - "activePartialSchema": { - "sdl": SDL - } - } - ], - "latestLaunch": { - "buildInput": { - "__typename": "CompositionBuildInput", - "version": "2.3.4" - } - }, - "sourceVariant": { - "subgraphs": [ - { - "name": "banana", - "url": URL, - "activePartialSchema": { - "sdl": SDL - } - } - ], - "latestLaunch": { - "buildInput": { - "__typename": "CompositionBuildInput", - "version": "2.9.9" - } - } - } - } - }), Some(SubgraphFetchAllResponse { - subgraphs: vec![Subgraph::builder().url(URL).sdl(SDL).name(SUBGRAPH_NAME).build()], - federation_version: Some(FederationVersion::ExactFedTwo(Version::new(2, 3, 4))), - }))] - fn get_services_from_response_data_works( - #[from(mock_input)] input: SubgraphFetchAllInput, - #[case] json_response: Value, - #[case] expected_subgraphs: Option, - ) { - let data: SubgraphFetchAllResponseData = serde_json::from_value(json_response).unwrap(); - let output = get_subgraphs_from_response_data(input, data); - - if expected_subgraphs.is_some() { - assert!(output.is_ok()); - assert_eq!(output.unwrap(), expected_subgraphs.unwrap()); - } else { - assert!(output.is_err()); - }; - } - - #[fixture] - fn mock_input() -> SubgraphFetchAllInput { - let graph_ref = GraphRef { - name: "mygraph".to_string(), - variant: "current".to_string(), - }; - - SubgraphFetchAllInput { graph_ref } - } + let mut service = SubgraphFetchAll::new( + client + .service() + .map_err(|err| RoverClientError::ServiceError(Box::new(err)))?, + ); + let service = service.ready().await?; + let subgraphs = service + .call(SubgraphFetchAllRequest::new(input.graph_ref.clone())) + .await?; + Ok(subgraphs) } diff --git a/crates/rover-client/src/operations/subgraph/fetch_all/service.rs b/crates/rover-client/src/operations/subgraph/fetch_all/service.rs new file mode 100644 index 000000000..0c302fc3d --- /dev/null +++ b/crates/rover-client/src/operations/subgraph/fetch_all/service.rs @@ -0,0 +1,387 @@ +use std::{fmt, future::Future, pin::Pin, str::FromStr}; + +use apollo_federation_types::config::{FederationVersion, SchemaSource, SubgraphConfig}; +use graphql_client::GraphQLQuery; +use rover_graphql::{GraphQLRequest, GraphQLServiceError}; +use tower::Service; + +use crate::{shared::GraphRef, RoverClientError}; + +use super::{types::Subgraph, SubgraphFetchAllResponse}; + +#[derive(GraphQLQuery)] +// The paths are relative to the directory where your `Cargo.toml` is located. +// Both json and the GraphQL schema language are supported as sources for the schema +#[graphql( + query_path = "src/operations/subgraph/fetch_all/fetch_all_query.graphql", + schema_path = ".schema/schema.graphql", + response_derives = "Eq, PartialEq, Debug, Serialize, Deserialize", + deprecated = "warn" +)] +/// This struct is used to generate the module containing `Variables` and +/// `ResponseData` structs. +/// Snake case of this name is the mod name. i.e. subgraph_fetch_all_query +pub struct SubgraphFetchAllQuery; + +impl fmt::Debug for subgraph_fetch_all_query::Variables { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> Result<(), fmt::Error> { + f.debug_struct("Variables") + .field("graph_ref", &self.graph_ref) + .finish() + } +} + +impl PartialEq for subgraph_fetch_all_query::Variables { + fn eq(&self, other: &Self) -> bool { + self.graph_ref == other.graph_ref + } +} + +pub struct SubgraphFetchAllRequest { + graph_ref: GraphRef, +} + +impl SubgraphFetchAllRequest { + pub fn new(graph_ref: GraphRef) -> SubgraphFetchAllRequest { + SubgraphFetchAllRequest { graph_ref } + } +} + +#[derive(Clone)] +pub struct SubgraphFetchAll { + inner: S, +} + +impl SubgraphFetchAll { + pub fn new(inner: S) -> SubgraphFetchAll { + SubgraphFetchAll { inner } + } +} + +impl Service for SubgraphFetchAll +where + S: Service< + GraphQLRequest, + Response = subgraph_fetch_all_query::ResponseData, + Error = GraphQLServiceError, + Future = Fut, + > + Clone + + Send + + 'static, + Fut: Future> + Send, +{ + type Response = SubgraphFetchAllResponse; + type Error = RoverClientError; + type Future = Pin> + Send>>; + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + tower::Service::>::poll_ready(&mut self.inner, cx) + .map_err(|err| RoverClientError::ServiceError(Box::new(err))) + } + + fn call(&mut self, req: SubgraphFetchAllRequest) -> Self::Future { + let cloned = self.inner.clone(); + let mut inner = std::mem::replace(&mut self.inner, cloned); + let fut = async move { + let variables = subgraph_fetch_all_query::Variables { + graph_ref: req.graph_ref.to_string(), + }; + inner + .call(GraphQLRequest::::new(variables)) + .await + .map_err(|err| RoverClientError::ServiceError(Box::new(err))) + .and_then(|response_data| { + get_subgraphs_from_response_data(req.graph_ref, response_data) + }) + }; + Box::pin(fut) + } +} + +fn get_subgraphs_from_response_data( + graph_ref: GraphRef, + response_data: subgraph_fetch_all_query::ResponseData, +) -> Result { + match response_data.variant { + None => Err(RoverClientError::GraphNotFound { graph_ref }), + Some(subgraph_fetch_all_query::SubgraphFetchAllQueryVariant::GraphVariant(variant)) => { + extract_subgraphs_from_response(variant, graph_ref) + } + _ => Err(RoverClientError::InvalidGraphRef), + } +} +fn extract_subgraphs_from_response( + value: subgraph_fetch_all_query::SubgraphFetchAllQueryVariantOnGraphVariant, + graph_ref: GraphRef, +) -> Result { + match (value.subgraphs, value.source_variant) { + // If we get null back in both branches or the query, or we get a structure in the + // sourceVariant half but there are no subgraphs in it. Then we return an error + // because this isn't a FederatedSubgraph **as far as we can tell**. + (None, None) + | ( + None, + Some( + subgraph_fetch_all_query::SubgraphFetchAllQueryVariantOnGraphVariantSourceVariant { + subgraphs: None, + .. + }, + ), + ) => Err(RoverClientError::ExpectedFederatedGraph { + graph_ref, + can_operation_convert: true, + }), + // If we get nothing from querying the subgraphs directly, but we do get some subgraphs + // on the sourceVariant side of the query, we just return those. + ( + None, + Some( + subgraph_fetch_all_query::SubgraphFetchAllQueryVariantOnGraphVariantSourceVariant { + subgraphs: Some(subgraphs), + latest_launch, + }, + ), + ) => Ok(SubgraphFetchAllResponse { + subgraphs: subgraphs + .into_iter() + .map(|subgraph| subgraph.into()) + .collect(), + federation_version: latest_launch.and_then(|it| it.into()), + }), + // Here there are three cases where we might want to return the subgraphs we got from + // directly querying the graphVariant: + // 1. If we get subgraphs back from the graphVariant directly and nothing from the sourceVariant + // 2. If we get subgraphs back from the graphVariant directly and a structure from the + // sourceVariant, but it contains no subgraphs + // 3. If we get subgraphs back from both 'sides' of the query, we take the results from + // querying the **graphVariant**, as this is closest to the original behaviour, before + // we introduced the querying of the sourceVariant. + (Some(subgraphs), _) => Ok(SubgraphFetchAllResponse { + subgraphs: subgraphs + .into_iter() + .map(|subgraph| subgraph.into()) + .collect(), + federation_version: value.latest_launch.and_then(|it| it.into()), + }), + } +} + +impl From for SubgraphConfig { + fn from(value: Subgraph) -> Self { + Self { + routing_url: value.url().clone(), + schema: SchemaSource::Sdl { + sdl: value.sdl().clone(), + }, + } + } +} + +impl From + for Subgraph +{ + fn from( + value: subgraph_fetch_all_query::SubgraphFetchAllQueryVariantOnGraphVariantSubgraphs, + ) -> Self { + Subgraph::builder() + .name(value.name) + .and_url(value.url) + .sdl(value.active_partial_schema.sdl) + .build() + } +} + +impl + From + for Subgraph +{ + fn from( + value: subgraph_fetch_all_query::SubgraphFetchAllQueryVariantOnGraphVariantSourceVariantSubgraphs, + ) -> Self { + Subgraph::builder() + .name(value.name) + .and_url(value.url) + .sdl(value.active_partial_schema.sdl) + .build() + } +} + +impl From + for Option +{ + fn from( + value: subgraph_fetch_all_query::SubgraphFetchAllQueryVariantOnGraphVariantLatestLaunch, + ) -> Self { + if let subgraph_fetch_all_query::SubgraphFetchAllQueryVariantOnGraphVariantLatestLaunchBuildInput::CompositionBuildInput(composition_build_input) = value.build_input { + composition_build_input + .version + .as_ref() + .and_then(|v| FederationVersion::from_str(&("=".to_owned() + v)).ok()) + } else { + None + } + } +} + +impl From + for Option +{ + fn from( + value: subgraph_fetch_all_query::SubgraphFetchAllQueryVariantOnGraphVariantSourceVariantLatestLaunch, + ) -> Self { + if let subgraph_fetch_all_query::SubgraphFetchAllQueryVariantOnGraphVariantSourceVariantLatestLaunchBuildInput::CompositionBuildInput(composition_build_input) = value.build_input { + composition_build_input.version.as_ref().and_then(|v| FederationVersion::from_str(&("=".to_owned() + v)).ok()) + } else { + None + } + } +} + +#[cfg(test)] +mod tests { + use apollo_federation_types::config::FederationVersion; + use rstest::{fixture, rstest}; + use semver::Version; + use serde_json::{json, Value}; + + use crate::shared::GraphRef; + + use super::*; + + const SDL: &str = + "extend type User @key(fields: \"id\") {\n id: ID! @external\n age: Int\n}\n"; + const URL: &str = "http://my.subgraph.com"; + const SUBGRAPH_NAME: &str = "accounts"; + + #[rstest] + #[case::subgraphs_returned_direct_from_variant(json!( + { + "variant": { + "__typename": "GraphVariant", + "subgraphs": [ + { + "name": SUBGRAPH_NAME, + "url": URL, + "activePartialSchema": { + "sdl": SDL + } + }, + ], + "latestLaunch": { + "buildInput": { + "__typename": "CompositionBuildInput", + "version": "2.3.4" + } + }, + "sourceVariant": null + } + }), Some(SubgraphFetchAllResponse { + subgraphs: vec![Subgraph::builder().url(URL).sdl(SDL).name(SUBGRAPH_NAME).build()], + federation_version: Some(FederationVersion::ExactFedTwo(Version::new(2, 3, 4))), + }))] + #[case::subgraphs_returned_via_source_variant(json!( + { + "variant": { + "__typename": "GraphVariant", + "subgraphs": null, + "sourceVariant": { + "subgraphs": [ + { + "name": SUBGRAPH_NAME, + "url": URL, + "activePartialSchema": { + "sdl": SDL + } + } + ], + "latestLaunch": { + "buildInput": { + "__typename": "CompositionBuildInput", + "version": "2.3.4" + } + } + } + } + }), Some(SubgraphFetchAllResponse { + subgraphs: vec![Subgraph::builder().url(URL).sdl(SDL).name(SUBGRAPH_NAME).build()], + federation_version: Some(FederationVersion::ExactFedTwo(Version::new(2, 3, 4))), + }))] + #[case::no_subgraphs_returned_in_either_case(json!( + { + "variant": { + "__typename": "GraphVariant", + "subgraphs": null, + "sourceVariant": { + "subgraphs": null + } + } + }), None)] + #[case::subgraphs_returned_from_both_sides_of_the_query_means_we_get_the_variants_subgraphs(json!( + { + "variant": { + "__typename": "GraphVariant", + "subgraphs": [ + { + "name": SUBGRAPH_NAME, + "url": URL, + "activePartialSchema": { + "sdl": SDL + } + } + ], + "latestLaunch": { + "buildInput": { + "__typename": "CompositionBuildInput", + "version": "2.3.4" + } + }, + "sourceVariant": { + "subgraphs": [ + { + "name": "banana", + "url": URL, + "activePartialSchema": { + "sdl": SDL + } + } + ], + "latestLaunch": { + "buildInput": { + "__typename": "CompositionBuildInput", + "version": "2.9.9" + } + } + } + } + }), Some(SubgraphFetchAllResponse { + subgraphs: vec![Subgraph::builder().url(URL).sdl(SDL).name(SUBGRAPH_NAME).build()], + federation_version: Some(FederationVersion::ExactFedTwo(Version::new(2, 3, 4))), + }))] + fn get_services_from_response_data_works( + graph_ref: GraphRef, + #[case] json_response: Value, + #[case] expected_subgraphs: Option, + ) { + let data: subgraph_fetch_all_query::ResponseData = + serde_json::from_value(json_response).unwrap(); + let output = get_subgraphs_from_response_data(graph_ref, data); + + if expected_subgraphs.is_some() { + assert!(output.is_ok()); + assert_eq!(output.unwrap(), expected_subgraphs.unwrap()); + } else { + assert!(output.is_err()); + }; + } + + #[fixture] + fn graph_ref() -> GraphRef { + GraphRef { + name: "mygraph".to_string(), + variant: "current".to_string(), + } + } +} diff --git a/crates/rover-client/src/operations/subgraph/fetch_all/types.rs b/crates/rover-client/src/operations/subgraph/fetch_all/types.rs index 1f64101a7..1995d5467 100644 --- a/crates/rover-client/src/operations/subgraph/fetch_all/types.rs +++ b/crates/rover-client/src/operations/subgraph/fetch_all/types.rs @@ -1,40 +1,14 @@ -use std::str::FromStr; - -use apollo_federation_types::config::{FederationVersion, SchemaSource, SubgraphConfig}; +use apollo_federation_types::config::FederationVersion; use buildstructor::Builder; use derive_getters::Getters; -pub(crate) use subgraph_fetch_all_query::SubgraphFetchAllQueryVariantOnGraphVariant; -pub(crate) use subgraph_fetch_all_query::SubgraphFetchAllQueryVariantOnGraphVariantSourceVariant; - use crate::shared::GraphRef; -use super::runner::subgraph_fetch_all_query; - -use subgraph_fetch_all_query::SubgraphFetchAllQueryVariantOnGraphVariantLatestLaunchBuildInput::CompositionBuildInput - as OuterCompositionBuildInput; -use subgraph_fetch_all_query::SubgraphFetchAllQueryVariantOnGraphVariantSourceVariantLatestLaunchBuildInput::CompositionBuildInput - as InnerCompositionBuildInput; - -pub(crate) type SubgraphFetchAllResponseData = subgraph_fetch_all_query::ResponseData; -pub(crate) type SubgraphFetchAllGraphVariant = - subgraph_fetch_all_query::SubgraphFetchAllQueryVariant; - -pub(crate) type QueryVariables = subgraph_fetch_all_query::Variables; - #[derive(Debug, Clone, Eq, PartialEq)] pub struct SubgraphFetchAllInput { pub graph_ref: GraphRef, } -impl From for QueryVariables { - fn from(input: SubgraphFetchAllInput) -> Self { - Self { - graph_ref: input.graph_ref.to_string(), - } - } -} - #[derive(Debug, PartialEq)] pub struct SubgraphFetchAllResponse { pub subgraphs: Vec, @@ -47,72 +21,3 @@ pub struct Subgraph { url: Option, sdl: String, } - -impl From for SubgraphConfig { - fn from(value: Subgraph) -> Self { - Self { - routing_url: value.url, - schema: SchemaSource::Sdl { sdl: value.sdl }, - } - } -} - -impl From - for Subgraph -{ - fn from( - value: subgraph_fetch_all_query::SubgraphFetchAllQueryVariantOnGraphVariantSubgraphs, - ) -> Self { - Subgraph::builder() - .name(value.name) - .and_url(value.url) - .sdl(value.active_partial_schema.sdl) - .build() - } -} - -impl - From - for Subgraph -{ - fn from( - value: subgraph_fetch_all_query::SubgraphFetchAllQueryVariantOnGraphVariantSourceVariantSubgraphs, - ) -> Self { - Subgraph::builder() - .name(value.name) - .and_url(value.url) - .sdl(value.active_partial_schema.sdl) - .build() - } -} - -impl From - for Option -{ - fn from( - value: subgraph_fetch_all_query::SubgraphFetchAllQueryVariantOnGraphVariantLatestLaunch, - ) -> Self { - if let OuterCompositionBuildInput(composition_build_input) = value.build_input { - composition_build_input - .version - .as_ref() - .and_then(|v| FederationVersion::from_str(&("=".to_owned() + v)).ok()) - } else { - None - } - } -} - -impl From - for Option -{ - fn from( - value: subgraph_fetch_all_query::SubgraphFetchAllQueryVariantOnGraphVariantSourceVariantLatestLaunch, - ) -> Self { - if let InnerCompositionBuildInput(composition_build_input) = value.build_input { - composition_build_input.version.as_ref().and_then(|v| FederationVersion::from_str(&("=".to_owned() + v)).ok()) - } else { - None - } - } -} diff --git a/src/command/dev/next/mod.rs b/src/command/dev/next/mod.rs index 050083aeb..f48666081 100644 --- a/src/command/dev/next/mod.rs +++ b/src/command/dev/next/mod.rs @@ -12,7 +12,13 @@ use rover_client::operations::config::who_am_i::WhoAmI; use crate::{ command::Dev, - composition::pipeline::CompositionPipeline, + composition::{ + pipeline::CompositionPipeline, + supergraph::config::resolver::{ + fetch_remote_subgraph::MakeFetchRemoteSubgraph, + fetch_remote_subgraphs::MakeFetchRemoteSubgraphs, + }, + }, utils::{ client::StudioClientConfig, effect::{ @@ -24,10 +30,7 @@ use crate::{ RoverError, RoverOutput, RoverResult, }; -use self::router::{ - binary::RouterLog, - config::{RouterAddress, RunRouterConfig}, -}; +use self::router::config::{RouterAddress, RunRouterConfig}; mod router; @@ -66,17 +69,26 @@ impl Dev { let service = client_config.get_authenticated_client(profile)?.service()?; let service = WhoAmI::new(service); + let make_fetch_remote_subgraphs = MakeFetchRemoteSubgraphs::builder() + .studio_client_config(client_config.clone()) + .profile(profile.clone()) + .build(); + let make_fetch_remote_subgraph = MakeFetchRemoteSubgraph::builder() + .studio_client_config(client_config.clone()) + .profile(profile.clone()) + .build(); + let composition_pipeline = CompositionPipeline::default() .init( &mut stdin(), - &client_config.get_authenticated_client(profile)?, + make_fetch_remote_subgraphs, supergraph_config_path.clone(), graph_ref.clone(), ) .await? .resolve_federation_version( &client_config, - &client_config.get_authenticated_client(profile)?, + make_fetch_remote_subgraph, self.opts.supergraph_opts.federation_version.clone(), ) .await? diff --git a/src/command/supergraph/compose/do_compose.rs b/src/command/supergraph/compose/do_compose.rs index 5d1fe547d..8202d9dbb 100644 --- a/src/command/supergraph/compose/do_compose.rs +++ b/src/command/supergraph/compose/do_compose.rs @@ -57,7 +57,6 @@ use crate::{ client::StudioClientConfig, effect::{ exec::TokioCommand, - fetch_remote_subgraph::RemoteSubgraph, install::InstallBinary, read_file::FsReadFile, write_file::{FsWriteFile, WriteFile}, @@ -163,7 +162,13 @@ impl Compose { client_config: StudioClientConfig, output_file: Option, ) -> RoverResult { - use crate::composition::pipeline::CompositionPipeline; + use crate::composition::{ + pipeline::CompositionPipeline, + supergraph::config::resolver::{ + fetch_remote_subgraph::MakeFetchRemoteSubgraph, + fetch_remote_subgraphs::MakeFetchRemoteSubgraphs, + }, + }; let read_file_impl = FsReadFile::default(); let write_file_impl = FsWriteFile::default(); @@ -178,17 +183,27 @@ impl Compose { let profile = self.opts.plugin_opts.profile.clone(); let graph_ref = self.opts.supergraph_config_source.graph_ref.clone(); + let make_fetch_remote_subgraphs = MakeFetchRemoteSubgraphs::builder() + .studio_client_config(client_config.clone()) + .profile(profile.clone()) + .build(); + + let make_fetch_remote_subgraph = MakeFetchRemoteSubgraph::builder() + .studio_client_config(client_config.clone()) + .profile(profile.clone()) + .build(); + let composition_pipeline = CompositionPipeline::default() .init( &mut stdin(), - &client_config.get_authenticated_client(&profile)?, + make_fetch_remote_subgraphs, supergraph_yaml, graph_ref.clone(), ) .await? .resolve_federation_version( &client_config, - &client_config.get_authenticated_client(&profile)?, + make_fetch_remote_subgraph, self.opts.federation_version.clone(), ) .await? diff --git a/src/composition/pipeline.rs b/src/composition/pipeline.rs index bf0327727..c956879df 100644 --- a/src/composition/pipeline.rs +++ b/src/composition/pipeline.rs @@ -1,19 +1,18 @@ -use std::{env::current_dir, fmt::Debug, fs::canonicalize}; +use std::{collections::BTreeMap, env::current_dir, fmt::Debug, fs::canonicalize}; -use apollo_federation_types::config::{FederationVersion, SupergraphConfig}; +use apollo_federation_types::config::{FederationVersion, SubgraphConfig, SupergraphConfig}; use camino::Utf8PathBuf; use rover_client::shared::GraphRef; use tempfile::tempdir; +use tower::MakeService; use crate::{ options::{LicenseAccepter, ProfileOpt}, utils::{ client::StudioClientConfig, effect::{ - exec::ExecCommand, fetch_remote_subgraph::FetchRemoteSubgraph, - fetch_remote_subgraphs::FetchRemoteSubgraphs, install::InstallBinary, - introspect::IntrospectSubgraph, read_file::ReadFile, read_stdin::ReadStdin, - write_file::WriteFile, + exec::ExecCommand, install::InstallBinary, introspect::IntrospectSubgraph, + read_file::ReadFile, read_stdin::ReadStdin, write_file::WriteFile, }, parsers::FileDescriptorType, }, @@ -24,6 +23,8 @@ use super::{ supergraph::{ binary::OutputTarget, config::resolver::{ + fetch_remote_subgraph::{FetchRemoteSubgraphRequest, RemoteSubgraph}, + fetch_remote_subgraphs::FetchRemoteSubgraphsRequest, LoadRemoteSubgraphsError, LoadSupergraphConfigError, ResolveSupergraphConfigError, SupergraphConfigResolver, }, @@ -64,13 +65,21 @@ impl Default for CompositionPipeline { } impl CompositionPipeline { - pub async fn init( + pub async fn init( self, read_stdin_impl: &mut impl ReadStdin, - fetch_remote_subgraphs_impl: &impl FetchRemoteSubgraphs, + fetch_remote_subgraphs_factory: S, supergraph_yaml: Option, graph_ref: Option, ) -> Result, CompositionPipelineError> + where + S: MakeService< + (), + FetchRemoteSubgraphsRequest, + Response = BTreeMap, + >, + S::MakeError: std::error::Error + Send + Sync + 'static, + S::Error: std::error::Error + Send + Sync + 'static, { let supergraph_yaml = supergraph_yaml.and_then(|supergraph_yaml| match supergraph_yaml { FileDescriptorType::File(file) => canonicalize(file) @@ -90,7 +99,7 @@ impl CompositionPipeline { FileDescriptorType::Stdin => None, }); let resolver = SupergraphConfigResolver::default() - .load_remote_subgraphs(fetch_remote_subgraphs_impl, graph_ref.as_ref()) + .load_remote_subgraphs(fetch_remote_subgraphs_factory, graph_ref.as_ref()) .await? .load_from_file_descriptor(read_stdin_impl, supergraph_yaml.as_ref())?; Ok(CompositionPipeline { @@ -103,12 +112,18 @@ impl CompositionPipeline { } impl CompositionPipeline { - pub async fn resolve_federation_version( + pub async fn resolve_federation_version( self, introspect_subgraph_impl: &impl IntrospectSubgraph, - fetch_remote_subgraph_impl: &impl FetchRemoteSubgraph, + fetch_remote_subgraph_impl: MakeFetchSubgraph, federation_version: Option, - ) -> Result, CompositionPipelineError> { + ) -> Result, CompositionPipelineError> + where + MakeFetchSubgraph: + MakeService<(), FetchRemoteSubgraphRequest, Response = RemoteSubgraph> + Clone, + MakeFetchSubgraph::MakeError: std::error::Error + Send + Sync + 'static, + MakeFetchSubgraph::Error: std::error::Error + Send + Sync + 'static, + { let fully_resolved_supergraph_config = self .state .resolver diff --git a/src/composition/supergraph/config/full/subgraph.rs b/src/composition/supergraph/config/full/subgraph.rs index 298b72648..4891a1605 100644 --- a/src/composition/supergraph/config/full/subgraph.rs +++ b/src/composition/supergraph/config/full/subgraph.rs @@ -7,12 +7,15 @@ use camino::Utf8PathBuf; use derive_getters::Getters; use rover_client::shared::GraphRef; use rover_std::Fs; +use tower::{MakeService, Service, ServiceExt}; use crate::{ composition::supergraph::config::{ - error::ResolveSubgraphError, unresolved::UnresolvedSubgraph, + error::ResolveSubgraphError, + resolver::fetch_remote_subgraph::{FetchRemoteSubgraphRequest, RemoteSubgraph}, + unresolved::UnresolvedSubgraph, }, - utils::effect::{fetch_remote_subgraph::FetchRemoteSubgraph, introspect::IntrospectSubgraph}, + utils::effect::introspect::IntrospectSubgraph, }; /// Represents a [`SubgraphConfig`] that has been resolved down to an SDL @@ -37,12 +40,17 @@ impl FullyResolvedSubgraph { } } /// Resolves a [`UnresolvedSubgraph`] to a [`FullyResolvedSubgraph`] - pub async fn resolve( + pub async fn resolve( introspect_subgraph_impl: &impl IntrospectSubgraph, - fetch_remote_subgraph_impl: &impl FetchRemoteSubgraph, + mut fetch_remote_subgraph_impl: MakeFetchSubgraph, supergraph_config_root: Option<&Utf8PathBuf>, unresolved_subgraph: UnresolvedSubgraph, - ) -> Result { + ) -> Result + where + MakeFetchSubgraph: MakeService<(), FetchRemoteSubgraphRequest, Response = RemoteSubgraph>, + MakeFetchSubgraph::MakeError: std::error::Error + Send + Sync + 'static, + MakeFetchSubgraph::Error: std::error::Error + Send + Sync + 'static, + { match unresolved_subgraph.schema() { SchemaSource::File { file } => { let supergraph_config_root = @@ -89,7 +97,24 @@ impl FullyResolvedSubgraph { } })?; let remote_subgraph = fetch_remote_subgraph_impl - .fetch_remote_subgraph(graph_ref, subgraph.to_string()) + .make_service(()) + .await + .map_err(|err| ResolveSubgraphError::FetchRemoteSdlError { + subgraph_name: subgraph.to_string(), + source: Box::new(err), + })? + .ready() + .await + .map_err(|err| ResolveSubgraphError::FetchRemoteSdlError { + subgraph_name: subgraph.to_string(), + source: Box::new(err), + })? + .call( + FetchRemoteSubgraphRequest::builder() + .graph_ref(graph_ref) + .subgraph_name(subgraph.to_string()) + .build(), + ) .await .map_err(|err| ResolveSubgraphError::FetchRemoteSdlError { subgraph_name: subgraph.to_string(), diff --git a/src/composition/supergraph/config/full/supergraph.rs b/src/composition/supergraph/config/full/supergraph.rs index 209cd84d3..d3d449303 100644 --- a/src/composition/supergraph/config/full/supergraph.rs +++ b/src/composition/supergraph/config/full/supergraph.rs @@ -5,13 +5,18 @@ use camino::Utf8PathBuf; use derive_getters::Getters; use futures::{stream, StreamExt, TryFutureExt}; use itertools::Itertools; +use tower::MakeService; use crate::{ composition::supergraph::config::{ - error::ResolveSubgraphError, resolver::ResolveSupergraphConfigError, + error::ResolveSubgraphError, + resolver::{ + fetch_remote_subgraph::{FetchRemoteSubgraphRequest, RemoteSubgraph}, + ResolveSupergraphConfigError, + }, unresolved::UnresolvedSupergraphConfig, }, - utils::effect::{fetch_remote_subgraph::FetchRemoteSubgraph, introspect::IntrospectSubgraph}, + utils::effect::introspect::IntrospectSubgraph, }; use super::FullyResolvedSubgraph; @@ -40,14 +45,21 @@ impl From for SupergraphConfig { impl FullyResolvedSupergraphConfig { /// Resolves an [`UnresolvedSupergraphConfig`] into a [`FullyResolvedSupergraphConfig`] /// by resolving the individual subgraphs concurrently and calculating the [`FederationVersion`] - pub async fn resolve( + pub async fn resolve( introspect_subgraph_impl: &impl IntrospectSubgraph, - fetch_remote_subgraph_impl: &impl FetchRemoteSubgraph, + fetch_remote_subgraph_impl: MakeFetchSubgraph, supergraph_config_root: Option<&Utf8PathBuf>, unresolved_supergraph_config: UnresolvedSupergraphConfig, - ) -> Result { + ) -> Result + where + MakeFetchSubgraph: + MakeService<(), FetchRemoteSubgraphRequest, Response = RemoteSubgraph> + Clone, + MakeFetchSubgraph::MakeError: std::error::Error + Send + Sync + 'static, + MakeFetchSubgraph::Error: std::error::Error + Send + Sync + 'static, + { let subgraphs = stream::iter(unresolved_supergraph_config.subgraphs().iter().map( - |(name, unresolved_subgraph)| { + move |(name, unresolved_subgraph)| { + let fetch_remote_subgraph_impl = fetch_remote_subgraph_impl.clone(); FullyResolvedSubgraph::resolve( introspect_subgraph_impl, fetch_remote_subgraph_impl, diff --git a/src/composition/supergraph/config/resolver/fetch_remote_subgraph.rs b/src/composition/supergraph/config/resolver/fetch_remote_subgraph.rs new file mode 100644 index 000000000..6ea935b3b --- /dev/null +++ b/src/composition/supergraph/config/resolver/fetch_remote_subgraph.rs @@ -0,0 +1,153 @@ +use std::{convert::Infallible, pin::Pin}; + +use buildstructor::Builder; +use derive_getters::Getters; +use futures::Future; +use rover_client::{ + operations::subgraph::fetch::{SubgraphFetch, SubgraphFetchRequest}, + shared::{FetchResponse, GraphRef, SdlType}, + RoverClientError, +}; +use rover_graphql::{GraphQLLayer, GraphQLService}; +use rover_http::HttpService; +use tower::{Service, ServiceBuilder}; + +use crate::{options::ProfileOpt, utils::client::StudioClientConfig}; + +#[derive(thiserror::Error, Debug)] +pub enum MakeFetchRemoteSubgraphError { + #[error("Service failed to reach a ready state.\n{}", .0)] + ReadyFailed(Box), + #[error("Failed to create the FetchRemoteSubgraphService.\n{}", .0)] + StudioClient(anyhow::Error), +} + +#[derive(Builder, Clone)] +pub struct MakeFetchRemoteSubgraph { + studio_client_config: StudioClientConfig, + profile: ProfileOpt, +} + +impl Service<()> for MakeFetchRemoteSubgraph { + type Response = FetchRemoteSubgraph>>; + type Error = MakeFetchRemoteSubgraphError; + type Future = Pin> + Send>>; + + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok::<_, Infallible>(())) + .map_err(|err| MakeFetchRemoteSubgraphError::ReadyFailed(Box::new(err))) + } + + fn call(&mut self, _req: ()) -> Self::Future { + let studio_client_config = self.studio_client_config.clone(); + let profile = self.profile.clone(); + let fut = async move { + let http_service = studio_client_config + .authenticated_service(&profile) + .map_err(MakeFetchRemoteSubgraphError::StudioClient)?; + let graphql_service = ServiceBuilder::new() + .layer(GraphQLLayer::default()) + .service(http_service); + let subgraph_fetch_all = SubgraphFetch::new(graphql_service); + Ok::<_, MakeFetchRemoteSubgraphError>(FetchRemoteSubgraph::new(subgraph_fetch_all)) + }; + Box::pin(fut) + } +} + +#[derive(Clone, Debug, Eq, PartialEq, Builder, Getters)] +pub struct RemoteSubgraph { + name: String, + routing_url: String, + schema: String, +} + +#[derive(thiserror::Error, Debug)] +pub enum FetchRemoteSubgraphError { + #[error(transparent)] + RoverClient(#[from] RoverClientError), + #[error("Response contained an invalid SDL type: {:?}", .0)] + InvalidSdlType(SdlType), + #[error("Inner service failed to become ready.\n{}", .0)] + Service(Box), +} + +#[derive(Builder)] +pub struct FetchRemoteSubgraphRequest { + subgraph_name: String, + graph_ref: GraphRef, +} + +impl From for SubgraphFetchRequest { + fn from(value: FetchRemoteSubgraphRequest) -> Self { + SubgraphFetchRequest::builder() + .graph_ref(value.graph_ref) + .subgraph_name(value.subgraph_name) + .build() + } +} + +pub struct FetchRemoteSubgraph { + inner: S, +} + +impl FetchRemoteSubgraph { + pub fn new(inner: S) -> FetchRemoteSubgraph { + FetchRemoteSubgraph { inner } + } +} + +impl Service for FetchRemoteSubgraph +where + S: Service< + SubgraphFetchRequest, + Response = FetchResponse, + Error = RoverClientError, + Future = Fut, + > + Clone + + Send + + 'static, + Fut: Future> + Send, +{ + type Response = RemoteSubgraph; + type Error = FetchRemoteSubgraphError; + type Future = Pin> + Send>>; + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner + .poll_ready(cx) + .map_err(|err| FetchRemoteSubgraphError::Service(Box::new(err))) + } + + fn call(&mut self, req: FetchRemoteSubgraphRequest) -> Self::Future { + let cloned = self.inner.clone(); + let mut inner = std::mem::replace(&mut self.inner, cloned); + let fut = { + let subgraph_name = req.subgraph_name.to_string(); + async move { + let fetch_response = inner.call(SubgraphFetchRequest::from(req)).await?; + if let rover_client::shared::SdlType::Subgraph { + routing_url: Some(graph_registry_routing_url), + } = fetch_response.sdl.r#type + { + Ok(RemoteSubgraph { + name: subgraph_name, + routing_url: graph_registry_routing_url, + schema: fetch_response.sdl.contents, + }) + } else { + Err(FetchRemoteSubgraphError::InvalidSdlType( + fetch_response.sdl.r#type, + )) + } + } + }; + Box::pin(fut) + } +} diff --git a/src/composition/supergraph/config/resolver/fetch_remote_subgraphs.rs b/src/composition/supergraph/config/resolver/fetch_remote_subgraphs.rs new file mode 100644 index 000000000..1f94a0e5f --- /dev/null +++ b/src/composition/supergraph/config/resolver/fetch_remote_subgraphs.rs @@ -0,0 +1,128 @@ +//! Provides services and utilities to fetch subgraphs from Studio + +use std::{collections::BTreeMap, convert::Infallible, pin::Pin}; + +use apollo_federation_types::config::SubgraphConfig; +use buildstructor::Builder; +use futures::Future; +use rover_client::{ + operations::subgraph::{ + fetch::SubgraphFetch, + fetch_all::{SubgraphFetchAll, SubgraphFetchAllRequest, SubgraphFetchAllResponse}, + }, + shared::GraphRef, + RoverClientError, +}; +use rover_graphql::{GraphQLLayer, GraphQLService}; +use rover_http::HttpService; +use tower::{service_fn, Service, ServiceBuilder}; + +use crate::{options::ProfileOpt, utils::client::StudioClientConfig}; + +#[derive(thiserror::Error, Debug)] +pub enum MakeFetchRemoteSubgraphsError { + #[error("Service failed to reach a ready state.\n{}", .0)] + ReadyFailed(Box), + #[error("Failed to create the FetchRemoteSubgraphsService.\n{}", .0)] + StudioClient(anyhow::Error), +} + +#[derive(Builder, Clone)] +pub struct MakeFetchRemoteSubgraphs { + studio_client_config: StudioClientConfig, + profile: ProfileOpt, +} + +impl Service<()> for MakeFetchRemoteSubgraphs { + type Response = FetchRemoteSubgraphs>>; + type Error = MakeFetchRemoteSubgraphsError; + type Future = Pin> + Send>>; + + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok::<_, Infallible>(())) + .map_err(|err| MakeFetchRemoteSubgraphsError::ReadyFailed(Box::new(err))) + } + + fn call(&mut self, _req: ()) -> Self::Future { + let studio_client_config = self.studio_client_config.clone(); + let profile = self.profile.clone(); + let fut = async move { + let http_service = studio_client_config + .authenticated_service(&profile) + .map_err(MakeFetchRemoteSubgraphsError::StudioClient)?; + let graphql_service = ServiceBuilder::new() + .layer(GraphQLLayer::default()) + .service(http_service); + let subgraph_fetch_all = SubgraphFetchAll::new(graphql_service); + Ok::<_, MakeFetchRemoteSubgraphsError>(FetchRemoteSubgraphs::new(subgraph_fetch_all)) + }; + Box::pin(fut) + } +} + +/// Request to fetch subgraphs from Studio +#[derive(Clone, Debug)] +pub struct FetchRemoteSubgraphsRequest { + graph_ref: GraphRef, +} + +impl FetchRemoteSubgraphsRequest { + /// Creates a new [`FetchRemoteSubgraphrequest`] from a [`GraphRef`] + pub fn new(graph_ref: GraphRef) -> FetchRemoteSubgraphsRequest { + FetchRemoteSubgraphsRequest { graph_ref } + } +} + +/// Service that fetches subgraphs from Studio +pub struct FetchRemoteSubgraphs { + inner: S, +} + +impl FetchRemoteSubgraphs { + pub fn new(inner: S) -> FetchRemoteSubgraphs { + FetchRemoteSubgraphs { inner } + } +} + +impl Service for FetchRemoteSubgraphs +where + S: Service< + SubgraphFetchAllRequest, + Response = SubgraphFetchAllResponse, + Error = RoverClientError, + Future = Fut, + > + Clone + + Send + + 'static, + Fut: Future> + Send, +{ + type Response = BTreeMap; + type Error = RoverClientError; + type Future = Pin> + Send>>; + + fn poll_ready( + &mut self, + cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + self.inner.poll_ready(cx) + } + + fn call(&mut self, req: FetchRemoteSubgraphsRequest) -> Self::Future { + let cloned = self.inner.clone(); + let mut inner = std::mem::replace(&mut self.inner, cloned); + let fut = async move { + let SubgraphFetchAllResponse { subgraphs, .. } = inner + .call(SubgraphFetchAllRequest::new(req.graph_ref)) + .await?; + let subgraphs = subgraphs + .into_iter() + .map(|subgraph| (subgraph.name().clone(), subgraph.into())) + .collect(); + Ok(subgraphs) + }; + Box::pin(fut) + } +} diff --git a/src/composition/supergraph/config/resolver/mod.rs b/src/composition/supergraph/config/resolver/mod.rs index 42bcacb1a..48563808d 100644 --- a/src/composition/supergraph/config/resolver/mod.rs +++ b/src/composition/supergraph/config/resolver/mod.rs @@ -19,21 +19,16 @@ use apollo_federation_types::config::{ }; use camino::Utf8PathBuf; use rover_client::shared::GraphRef; +use tower::{MakeService, Service, ServiceExt}; use crate::{ utils::{ - effect::{ - fetch_remote_subgraph::FetchRemoteSubgraph, - fetch_remote_subgraphs::FetchRemoteSubgraphs, introspect::IntrospectSubgraph, - read_stdin::ReadStdin, - }, + effect::{introspect::IntrospectSubgraph, read_stdin::ReadStdin}, parsers::FileDescriptorType, }, RoverError, }; -use self::state::ResolveSubgraphs; - use super::{ error::ResolveSubgraphError, federation::{ @@ -45,6 +40,14 @@ use super::{ unresolved::UnresolvedSupergraphConfig, }; +use self::{ + fetch_remote_subgraph::{FetchRemoteSubgraphRequest, RemoteSubgraph}, + fetch_remote_subgraphs::FetchRemoteSubgraphsRequest, + state::ResolveSubgraphs, +}; + +pub mod fetch_remote_subgraph; +pub mod fetch_remote_subgraphs; mod state; /// This is a state-based resolver for the different stages of resolving a supergraph config @@ -88,15 +91,29 @@ pub enum LoadRemoteSubgraphsError { impl SupergraphConfigResolver { /// Optionally loads subgraphs from the Studio API using the contents of the `--graph-ref` flag /// and an implementation of [`FetchRemoteSubgraphs`] - pub async fn load_remote_subgraphs( + pub async fn load_remote_subgraphs( self, - fetch_remote_subgraphs_impl: &impl FetchRemoteSubgraphs, + mut fetch_remote_subgraphs_factory: S, graph_ref: Option<&GraphRef>, ) -> Result, LoadRemoteSubgraphsError> + where + S: MakeService< + (), + FetchRemoteSubgraphsRequest, + Response = BTreeMap, + >, + S::MakeError: std::error::Error + Send + Sync + 'static, + S::Error: std::error::Error + Send + Sync + 'static, { if let Some(graph_ref) = graph_ref { - let remote_subgraphs = fetch_remote_subgraphs_impl - .fetch_remote_subgraphs(graph_ref) + let remote_subgraphs = fetch_remote_subgraphs_factory + .make_service(()) + .await + .map_err(|err| LoadRemoteSubgraphsError::FetchRemoteSubgraphsError(Box::new(err)))? + .ready() + .await + .map_err(|err| LoadRemoteSubgraphsError::FetchRemoteSubgraphsError(Box::new(err)))? + .call(FetchRemoteSubgraphsRequest::new(graph_ref.clone())) .await .map_err(|err| { LoadRemoteSubgraphsError::FetchRemoteSubgraphsError(Box::new(err)) @@ -213,12 +230,18 @@ pub type InitializedSupergraphConfigResolver = SupergraphConfigResolver { /// Fully resolves the subgraph configurations in the supergraph config file to their SDLs - pub async fn fully_resolve_subgraphs( + pub async fn fully_resolve_subgraphs( &self, introspect_subgraph_impl: &impl IntrospectSubgraph, - fetch_remote_subgraph_impl: &impl FetchRemoteSubgraph, + fetch_remote_subgraph_impl: MakeFetchSubgraph, supergraph_config_root: Option<&Utf8PathBuf>, - ) -> Result { + ) -> Result + where + MakeFetchSubgraph: + MakeService<(), FetchRemoteSubgraphRequest, Response = RemoteSubgraph> + Clone, + MakeFetchSubgraph::MakeError: std::error::Error + Send + Sync + 'static, + MakeFetchSubgraph::Error: std::error::Error + Send + Sync + 'static, + { if !self.state.subgraphs.is_empty() { let unresolved_supergraph_config = UnresolvedSupergraphConfig::builder() .subgraphs(self.state.subgraphs.clone()) diff --git a/src/utils/effect/fetch_remote_subgraph.rs b/src/utils/effect/fetch_remote_subgraph.rs deleted file mode 100644 index 42c9a7326..000000000 --- a/src/utils/effect/fetch_remote_subgraph.rs +++ /dev/null @@ -1,171 +0,0 @@ -use async_trait::async_trait; -use buildstructor::Builder; -use derive_getters::Getters; -use rover_client::{ - blocking::StudioClient, - operations::subgraph::fetch::{self, SubgraphFetchInput}, - shared::{GraphRef, SdlType}, - RoverClientError, -}; - -use crate::RoverError; - -#[derive(Clone, Debug, Eq, PartialEq, Builder, Getters)] -pub struct RemoteSubgraph { - name: String, - routing_url: String, - schema: String, -} - -#[cfg_attr(test, derive(thiserror::Error, Debug))] -#[cfg_attr(test, error("{}", .0))] -#[cfg(test)] -pub struct MockFetchRemoteSubgraphError(String); - -#[cfg_attr(test, mockall::automock(type Error = MockFetchRemoteSubgraphError;))] -#[async_trait] -pub trait FetchRemoteSubgraph { - type Error: std::error::Error + Send + Sync + 'static; - async fn fetch_remote_subgraph( - &self, - graph_ref: GraphRef, - subgraph_name: String, - ) -> Result; -} - -#[derive(thiserror::Error, Debug)] -pub enum StudioFetchRemoteSdlError { - #[error("Failed to build the client.\n{}", .0)] - Reqwest(RoverError), - #[error("Failed to fetch the subgraph from remote.\n{}", .0)] - FetchSubgraph(#[from] RoverClientError), - #[error("Got an invalid SDL type: {:?}", .0)] - InvalidSdlType(SdlType), -} - -#[async_trait] -impl FetchRemoteSubgraph for StudioClient { - type Error = StudioFetchRemoteSdlError; - async fn fetch_remote_subgraph( - &self, - graph_ref: GraphRef, - subgraph_name: String, - ) -> Result { - fetch::run( - SubgraphFetchInput { - graph_ref, - subgraph_name: subgraph_name.clone(), - }, - self, - ) - .await - .map_err(StudioFetchRemoteSdlError::from) - .and_then(|result| { - // We don't require a routing_url in config for this variant of a schema, - // if one isn't provided, just use the routing URL from the graph registry (if it exists). - if let rover_client::shared::SdlType::Subgraph { - routing_url: Some(graph_registry_routing_url), - } = result.sdl.r#type - { - Ok(RemoteSubgraph { - name: subgraph_name, - routing_url: graph_registry_routing_url, - schema: result.sdl.contents, - }) - } else { - Err(StudioFetchRemoteSdlError::InvalidSdlType(result.sdl.r#type)) - } - }) - } -} - -#[cfg(test)] -mod test { - - use std::{str::FromStr, time::Duration}; - - use anyhow::Result; - use houston::Credential; - use httpmock::MockServer; - use rover_client::{blocking::StudioClient, shared::GraphRef}; - use rstest::{fixture, rstest}; - use serde_json::json; - use speculoos::prelude::*; - - use crate::utils::effect::test::SUBGRAPH_FETCH_QUERY; - - use super::{FetchRemoteSubgraph, RemoteSubgraph}; - - #[fixture] - #[once] - fn query() -> &'static str { - SUBGRAPH_FETCH_QUERY - } - - #[rstest] - #[timeout(Duration::from_secs(1))] - #[tokio::test] - async fn test_studio_fetch_remote_subgraph_success(query: &str) -> Result<()> { - let version = "test-version"; - let is_sudo = false; - let server = MockServer::start(); - let reqwest_client = reqwest::Client::new(); - let server_address = server.address(); - let endpoint = format!( - "http://{}:{}/graphql", - server_address.ip(), - server_address.port() - ); - let studio_client = StudioClient::new( - Credential { - api_key: "test-api-key".to_string(), - origin: houston::CredentialOrigin::EnvVar, - }, - &endpoint, - version, - is_sudo, - reqwest_client, - None, - ); - let _mock = server.mock(|when, then| { - let expected_body = json!({ - "query": query, - "variables": { - "graph_ref": "graph@variant", - "subgraph_name": "subgraph_name" - }, - "operationName": "SubgraphFetchQuery" - }); - when.path("/graphql") - .method(httpmock::Method::POST) - .json_body_obj(&expected_body); - let result_body = json!({ - "data": { - "variant": { - "__typename": "GraphVariant", - "subgraph": { - "url": "http://example.com/graphql", - "activePartialSchema": { - "sdl": "def", - }, - "subgraphs": [{ - "name": "ghi" - }] - } - } - } - }); - then.json_body_obj(&result_body); - }); - let graph_ref = GraphRef::from_str("graph@variant")?; - let result = studio_client - .fetch_remote_subgraph(graph_ref, "subgraph_name".to_string()) - .await; - assert_that!(result).is_ok().is_equal_to(RemoteSubgraph { - name: "subgraph_name".to_string(), - routing_url: "http://example.com/graphql".to_string(), - schema: "def".to_string(), - }); - Ok(()) - } -} diff --git a/src/utils/effect/fetch_remote_subgraphs.rs b/src/utils/effect/fetch_remote_subgraphs.rs deleted file mode 100644 index 781e2572a..000000000 --- a/src/utils/effect/fetch_remote_subgraphs.rs +++ /dev/null @@ -1,51 +0,0 @@ -use std::collections::BTreeMap; - -use apollo_federation_types::config::SubgraphConfig; -use async_trait::async_trait; -use rover_client::{ - blocking::StudioClient, - operations::subgraph::{ - self, - fetch_all::{SubgraphFetchAllInput, SubgraphFetchAllResponse}, - }, - shared::GraphRef, - RoverClientError, -}; - -#[cfg_attr(test, derive(thiserror::Error, Debug))] -#[cfg(test)] -#[cfg_attr(test, error("MockFetchRemoteSubgraphsError"))] -pub struct MockFetchRemoteSubgraphsError {} - -#[cfg_attr(test, mockall::automock(type Error = MockFetchRemoteSubgraphsError;))] -#[async_trait] -pub trait FetchRemoteSubgraphs { - type Error: std::error::Error + Send + Sync + 'static; - async fn fetch_remote_subgraphs( - &self, - graph_ref: &GraphRef, - ) -> Result, Self::Error>; -} - -#[async_trait] -impl FetchRemoteSubgraphs for StudioClient { - type Error = RoverClientError; - /// Fetches [`SubgraphConfig`]s with `SchemaSource::Subgraph`s from Studio - async fn fetch_remote_subgraphs( - &self, - graph_ref: &GraphRef, - ) -> Result, Self::Error> { - let SubgraphFetchAllResponse { subgraphs, .. } = subgraph::fetch_all::run( - SubgraphFetchAllInput { - graph_ref: graph_ref.clone(), - }, - self, - ) - .await?; - let subgraphs = subgraphs - .into_iter() - .map(|subgraph| (subgraph.name().clone(), subgraph.into())) - .collect(); - Ok(subgraphs) - } -} diff --git a/src/utils/effect/mod.rs b/src/utils/effect/mod.rs index 7a6f05569..d1f8d3346 100644 --- a/src/utils/effect/mod.rs +++ b/src/utils/effect/mod.rs @@ -1,6 +1,4 @@ pub mod exec; -pub mod fetch_remote_subgraph; -pub mod fetch_remote_subgraphs; pub mod install; pub mod introspect; pub mod read_file; From 9f9eb753dca9cf7c0f326f030d1a59220fade43f Mon Sep 17 00:00:00 2001 From: Brian George Date: Wed, 18 Dec 2024 18:24:44 -0500 Subject: [PATCH 2/5] Fix initial composition routine --- src/composition/supergraph/config/full/subgraph.rs | 4 ++-- src/composition/watchers/subgraphs.rs | 2 +- src/composition/watchers/watcher/introspection.rs | 3 +++ src/composition/watchers/watcher/subgraph.rs | 4 ++-- 4 files changed, 8 insertions(+), 5 deletions(-) diff --git a/src/composition/supergraph/config/full/subgraph.rs b/src/composition/supergraph/config/full/subgraph.rs index 4891a1605..ee949ad4c 100644 --- a/src/composition/supergraph/config/full/subgraph.rs +++ b/src/composition/supergraph/config/full/subgraph.rs @@ -80,9 +80,9 @@ impl FullyResolvedSubgraph { let routing_url = unresolved_subgraph .routing_url() .clone() - .or_else(|| Some(subgraph_url.to_string())); + .unwrap_or_else(|| subgraph_url.to_string()); Ok(FullyResolvedSubgraph::builder() - .and_routing_url(routing_url) + .routing_url(routing_url) .schema(schema) .build()) } diff --git a/src/composition/watchers/subgraphs.rs b/src/composition/watchers/subgraphs.rs index 64f76c8a7..7a30d4d64 100644 --- a/src/composition/watchers/subgraphs.rs +++ b/src/composition/watchers/subgraphs.rs @@ -78,7 +78,7 @@ pub enum SubgraphEvent { } /// An event denoting that the subgraph has changed, emitting its name and the SDL reflecting that /// change -#[derive(derive_getters::Getters, Eq, PartialEq)] +#[derive(derive_getters::Getters, Eq, PartialEq, Debug)] pub struct SubgraphSchemaChanged { /// Subgraph name name: String, diff --git a/src/composition/watchers/watcher/introspection.rs b/src/composition/watchers/watcher/introspection.rs index d0935f4b1..527f779db 100644 --- a/src/composition/watchers/watcher/introspection.rs +++ b/src/composition/watchers/watcher/introspection.rs @@ -92,7 +92,10 @@ impl SubgraphIntrospection { // Stream any subgraph changes, filtering out empty responses (None) while passing along // the sdl changes + // this skips the first event, since the inner function always produces a result when it's + // initialized rx_stream + .skip(1) .filter_map(|change| async move { match change { OutputChannelKind::Sdl(sdl) => Some(sdl), diff --git a/src/composition/watchers/watcher/subgraph.rs b/src/composition/watchers/watcher/subgraph.rs index 85fd95631..e3a555d73 100644 --- a/src/composition/watchers/watcher/subgraph.rs +++ b/src/composition/watchers/watcher/subgraph.rs @@ -77,12 +77,12 @@ impl SubgraphWatcher { introspection_headers, } => Ok(Self { watcher: SubgraphWatcherKind::Introspect(SubgraphIntrospection::new( - subgraph_url, + subgraph_url.clone(), introspection_headers.map(|header_map| header_map.into_iter().collect()), client_config, introspection_polling_interval, )), - routing_url, + routing_url: Some(subgraph_url.to_string()), }), SchemaSource::Subgraph { graphref, subgraph } => Ok(Self { watcher: SubgraphWatcherKind::Once(NonRepeatingFetch::RemoteSchema( From 3b1341070c9e1a76c29be27003c0f3df6eb2c760 Mon Sep 17 00:00:00 2001 From: Brian George Date: Wed, 18 Dec 2024 18:37:16 -0500 Subject: [PATCH 3/5] Add docs --- src/command/dev/next/router/binary.rs | 1 + src/command/dev/next/router/config/mod.rs | 1 + src/command/dev/next/router/hot_reload.rs | 4 ++-- src/command/dev/next/router/run.rs | 5 +++++ .../config/resolver/fetch_remote_subgraph.rs | 14 ++++++++++++++ .../config/resolver/fetch_remote_subgraphs.rs | 12 ++++++++---- 6 files changed, 31 insertions(+), 6 deletions(-) diff --git a/src/command/dev/next/router/binary.rs b/src/command/dev/next/router/binary.rs index 048f84e0d..151942128 100644 --- a/src/command/dev/next/router/binary.rs +++ b/src/command/dev/next/router/binary.rs @@ -91,6 +91,7 @@ pub enum RunRouterBinaryError { #[cfg_attr(test, derive(derive_getters::Getters))] pub struct RouterBinary { exe: Utf8PathBuf, + #[allow(unused)] version: Version, } diff --git a/src/command/dev/next/router/config/mod.rs b/src/command/dev/next/router/config/mod.rs index 70d367b39..6db3cff74 100644 --- a/src/command/dev/next/router/config/mod.rs +++ b/src/command/dev/next/router/config/mod.rs @@ -178,6 +178,7 @@ impl RunRouterConfig { &self.state.health_check_endpoint } + #[allow(unused)] pub fn router_config(&self) -> RouterConfig { RouterConfig(self.state.raw_config.to_string()) } diff --git a/src/command/dev/next/router/hot_reload.rs b/src/command/dev/next/router/hot_reload.rs index 327c80cf2..3a0f3a0f5 100644 --- a/src/command/dev/next/router/hot_reload.rs +++ b/src/command/dev/next/router/hot_reload.rs @@ -14,8 +14,8 @@ pub enum RouterUpdateEvent { #[derive(Debug)] pub enum HotReloadEvent { - ConfigWritten(Result<(), Box>), - SchemaWritten(Result<(), Box>), + ConfigWritten(#[allow(unused)] Result<(), Box>), + SchemaWritten(#[allow(unused)] Result<(), Box>), } #[derive(Builder)] diff --git a/src/command/dev/next/router/run.rs b/src/command/dev/next/router/run.rs index 749c95b31..d88561f84 100644 --- a/src/command/dev/next/router/run.rs +++ b/src/command/dev/next/router/run.rs @@ -374,10 +374,15 @@ mod state { } pub struct Abort { pub router_logs: UnboundedReceiverStream>, + #[allow(unused)] pub hot_reload_events: UnboundedReceiverStream, + #[allow(unused)] pub abort_router: AbortHandle, + #[allow(unused)] pub abort_config_watcher: Option, + #[allow(unused)] pub abort_hot_reload: AbortHandle, + #[allow(unused)] pub hot_reload_schema_path: Utf8PathBuf, } } diff --git a/src/composition/supergraph/config/resolver/fetch_remote_subgraph.rs b/src/composition/supergraph/config/resolver/fetch_remote_subgraph.rs index 6ea935b3b..0c0e80b10 100644 --- a/src/composition/supergraph/config/resolver/fetch_remote_subgraph.rs +++ b/src/composition/supergraph/config/resolver/fetch_remote_subgraph.rs @@ -1,3 +1,5 @@ +//! Service and objects related to fetching a subgraph from Studio + use std::{convert::Infallible, pin::Pin}; use buildstructor::Builder; @@ -14,14 +16,18 @@ use tower::{Service, ServiceBuilder}; use crate::{options::ProfileOpt, utils::client::StudioClientConfig}; +/// Errors that occur when constructing a [`FetchRemoteSubgraph`] service #[derive(thiserror::Error, Debug)] pub enum MakeFetchRemoteSubgraphError { + /// Occurs when the factory service fails to be ready #[error("Service failed to reach a ready state.\n{}", .0)] ReadyFailed(Box), + /// Occurs when the [`FetchRemoteSubgraph`] service cannot be created #[error("Failed to create the FetchRemoteSubgraphService.\n{}", .0)] StudioClient(anyhow::Error), } +/// Factory that creates a [`FetchRemoteSubgraph`] service #[derive(Builder, Clone)] pub struct MakeFetchRemoteSubgraph { studio_client_config: StudioClientConfig, @@ -58,6 +64,7 @@ impl Service<()> for MakeFetchRemoteSubgraph { } } +/// Represents a subgraph fetched from Studio #[derive(Clone, Debug, Eq, PartialEq, Builder, Getters)] pub struct RemoteSubgraph { name: String, @@ -65,16 +72,21 @@ pub struct RemoteSubgraph { schema: String, } +/// Errors that occur when fetching a subgraph from Studio #[derive(thiserror::Error, Debug)] pub enum FetchRemoteSubgraphError { + /// Errors originating from [`rover_client`] #[error(transparent)] RoverClient(#[from] RoverClientError), + /// Occurs when rover gets an SDL type that it doesn't recognize #[error("Response contained an invalid SDL type: {:?}", .0)] InvalidSdlType(SdlType), + /// Error that occurs when the inner service fails to become ready #[error("Inner service failed to become ready.\n{}", .0)] Service(Box), } +/// Request that fetches a subgraph from Studio by graph ref and name #[derive(Builder)] pub struct FetchRemoteSubgraphRequest { subgraph_name: String, @@ -90,11 +102,13 @@ impl From for SubgraphFetchRequest { } } +/// Service that is able to fetch a subgraph from Studio pub struct FetchRemoteSubgraph { inner: S, } impl FetchRemoteSubgraph { + /// Creates a new [`FetchRemoteSubgraph`] pub fn new(inner: S) -> FetchRemoteSubgraph { FetchRemoteSubgraph { inner } } diff --git a/src/composition/supergraph/config/resolver/fetch_remote_subgraphs.rs b/src/composition/supergraph/config/resolver/fetch_remote_subgraphs.rs index 1f94a0e5f..e11f4f470 100644 --- a/src/composition/supergraph/config/resolver/fetch_remote_subgraphs.rs +++ b/src/composition/supergraph/config/resolver/fetch_remote_subgraphs.rs @@ -6,27 +6,30 @@ use apollo_federation_types::config::SubgraphConfig; use buildstructor::Builder; use futures::Future; use rover_client::{ - operations::subgraph::{ - fetch::SubgraphFetch, - fetch_all::{SubgraphFetchAll, SubgraphFetchAllRequest, SubgraphFetchAllResponse}, + operations::subgraph::fetch_all::{ + SubgraphFetchAll, SubgraphFetchAllRequest, SubgraphFetchAllResponse, }, shared::GraphRef, RoverClientError, }; use rover_graphql::{GraphQLLayer, GraphQLService}; use rover_http::HttpService; -use tower::{service_fn, Service, ServiceBuilder}; +use tower::{Service, ServiceBuilder}; use crate::{options::ProfileOpt, utils::client::StudioClientConfig}; +/// Errors that occur when constructing a [`FetchRemoteSubgraphs`] service #[derive(thiserror::Error, Debug)] pub enum MakeFetchRemoteSubgraphsError { + /// Occurs when the factory service fails to be ready #[error("Service failed to reach a ready state.\n{}", .0)] ReadyFailed(Box), + /// Occurs when the [`FetchRemoteSubgraphs`] service cannot be created #[error("Failed to create the FetchRemoteSubgraphsService.\n{}", .0)] StudioClient(anyhow::Error), } +/// Factory that creates a [`FetchRemoteSubgraphs`] service #[derive(Builder, Clone)] pub struct MakeFetchRemoteSubgraphs { studio_client_config: StudioClientConfig, @@ -82,6 +85,7 @@ pub struct FetchRemoteSubgraphs { } impl FetchRemoteSubgraphs { + /// Creates a new [`FetchRemoteSubgraphs`] pub fn new(inner: S) -> FetchRemoteSubgraphs { FetchRemoteSubgraphs { inner } } From bf13fb400037aa3e9ff387a166287dadafd06d83 Mon Sep 17 00:00:00 2001 From: Brian George Date: Thu, 19 Dec 2024 02:08:17 -0500 Subject: [PATCH 4/5] Fix tests --- Cargo.lock | 1 + Cargo.toml | 1 + .../config/resolver/fetch_remote_subgraph.rs | 2 +- .../config/resolver/fetch_remote_subgraphs.rs | 2 +- .../supergraph/config/resolver/mod.rs | 183 +++++++++++------- .../config/unresolved/supergraph.rs | 105 ++++++---- src/utils/effect/test/mod.rs | 1 - .../effect/test/subgraph_fetch_query.graphql | 16 -- src/utils/mod.rs | 1 + src/utils/service/mod.rs | 2 + src/utils/service/test.rs | 47 +++++ 11 files changed, 231 insertions(+), 130 deletions(-) delete mode 100644 src/utils/effect/test/subgraph_fetch_query.graphql create mode 100644 src/utils/service/mod.rs create mode 100644 src/utils/service/test.rs diff --git a/Cargo.lock b/Cargo.lock index b3e8cfe5f..12df30aaa 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4888,6 +4888,7 @@ dependencies = [ "tokio-stream", "toml", "tower 0.5.1", + "tower-test", "tracing", "tracing-test", "url", diff --git a/Cargo.toml b/Cargo.toml index 32db967f2..fd84d4860 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -247,5 +247,6 @@ reqwest = { workspace = true, features = ["native-tls-vendored"] } rstest = { workspace = true } serial_test = { workspace = true } speculoos = { workspace = true } +tower-test = { workspace = true } tracing-test = { workspace = true } temp-env = { version = "0.3.6", features = ["async_closure"] } diff --git a/src/composition/supergraph/config/resolver/fetch_remote_subgraph.rs b/src/composition/supergraph/config/resolver/fetch_remote_subgraph.rs index 0c0e80b10..aacbfb0ab 100644 --- a/src/composition/supergraph/config/resolver/fetch_remote_subgraph.rs +++ b/src/composition/supergraph/config/resolver/fetch_remote_subgraph.rs @@ -87,7 +87,7 @@ pub enum FetchRemoteSubgraphError { } /// Request that fetches a subgraph from Studio by graph ref and name -#[derive(Builder)] +#[derive(Clone, Debug, Builder, PartialEq, Eq)] pub struct FetchRemoteSubgraphRequest { subgraph_name: String, graph_ref: GraphRef, diff --git a/src/composition/supergraph/config/resolver/fetch_remote_subgraphs.rs b/src/composition/supergraph/config/resolver/fetch_remote_subgraphs.rs index e11f4f470..6d850fd5f 100644 --- a/src/composition/supergraph/config/resolver/fetch_remote_subgraphs.rs +++ b/src/composition/supergraph/config/resolver/fetch_remote_subgraphs.rs @@ -67,7 +67,7 @@ impl Service<()> for MakeFetchRemoteSubgraphs { } /// Request to fetch subgraphs from Studio -#[derive(Clone, Debug)] +#[derive(Clone, Debug, PartialEq, Eq)] pub struct FetchRemoteSubgraphsRequest { graph_ref: GraphRef, } diff --git a/src/composition/supergraph/config/resolver/mod.rs b/src/composition/supergraph/config/resolver/mod.rs index 48563808d..8a210d65c 100644 --- a/src/composition/supergraph/config/resolver/mod.rs +++ b/src/composition/supergraph/config/resolver/mod.rs @@ -309,21 +309,23 @@ mod tests { use rstest::rstest; use semver::Version; use speculoos::prelude::*; + use tower::ServiceBuilder; + use tower_test::mock::Handle; use crate::{ composition::supergraph::config::scenario::*, utils::{ - effect::{ - fetch_remote_subgraph::{MockFetchRemoteSubgraph, RemoteSubgraph}, - fetch_remote_subgraphs::MockFetchRemoteSubgraphs, - introspect::MockIntrospectSubgraph, - read_stdin::MockReadStdin, - }, + effect::{introspect::MockIntrospectSubgraph, read_stdin::MockReadStdin}, parsers::FileDescriptorType, + service::test::{FakeError, FakeMakeService}, }, }; - use super::SupergraphConfigResolver; + use super::{ + fetch_remote_subgraph::{FetchRemoteSubgraphRequest, RemoteSubgraph}, + fetch_remote_subgraphs::FetchRemoteSubgraphsRequest, + SupergraphConfigResolver, + }; /// Test showing that federation version is selected from the user-specified fed version /// over local supergraph config, remote composition version, or version inferred from @@ -411,15 +413,18 @@ mod tests { FederationVersion::ExactFedTwo(Version::from_str("2.7.1").unwrap()); let mut subgraphs = BTreeMap::new(); - let mut mock_fetch_remote_subgraphs = MockFetchRemoteSubgraphs::new(); - let mut mock_fetch_remote_subgraph = MockFetchRemoteSubgraph::new(); + let (fetch_remote_subgraphs_service, fetch_remote_subgraphs_handle) = + tower_test::mock::spawn::>( + ); + let (fetch_remote_subgraph_service, fetch_remote_subgraph_handle) = + tower_test::mock::spawn::(); setup_remote_subgraph_scenario( fetch_remote_subgraph_from_config, remote_subgraph_scenario.as_ref(), &mut subgraphs, - &mut mock_fetch_remote_subgraphs, - &mut mock_fetch_remote_subgraph, + fetch_remote_subgraphs_handle, + fetch_remote_subgraph_handle, ); setup_sdl_subgraph_scenario(sdl_subgraph_scenario.as_ref(), &mut subgraphs); @@ -460,30 +465,38 @@ mod tests { } }); + let make_fetch_remote_subgraphs_service = FakeMakeService::new( + ServiceBuilder::new() + .map_err(FakeError::from) + .service(fetch_remote_subgraphs_service.into_inner()), + ); + // load remote subgraphs let resolver = resolver - .load_remote_subgraphs(&mock_fetch_remote_subgraphs, graph_ref.as_ref()) + .load_remote_subgraphs(make_fetch_remote_subgraphs_service, graph_ref.as_ref()) .await?; // load from the file descriptor let resolver = resolver .load_from_file_descriptor(&mut mock_read_stdin, Some(&file_descriptor_type))?; - // validate that the correct effect has been invoked - mock_fetch_remote_subgraphs.checkpoint(); + let make_fetch_remote_subgraph_service = FakeMakeService::new( + ServiceBuilder::new() + .map_err(FakeError::from) + .service(fetch_remote_subgraph_service.into_inner()), + ); // fully resolve subgraphs into their SDLs let fully_resolved_supergraph_config = resolver .fully_resolve_subgraphs( &mock_introspect_subgraph, - &mock_fetch_remote_subgraph, + make_fetch_remote_subgraph_service, Some(&local_supergraph_config_path), ) .await?; // validate that the correct effects have been invoked mock_introspect_subgraph.checkpoint(); - mock_fetch_remote_subgraph.checkpoint(); // validate that the federation version is correct assert_that!(fully_resolved_supergraph_config.federation_version()) @@ -576,15 +589,18 @@ mod tests { let mut subgraphs = BTreeMap::new(); - let mut mock_fetch_remote_subgraphs = MockFetchRemoteSubgraphs::new(); - let mut mock_fetch_remote_subgraph = MockFetchRemoteSubgraph::new(); + let (fetch_remote_subgraphs_service, fetch_remote_subgraphs_handle) = + tower_test::mock::spawn::>( + ); + let (fetch_remote_subgraph_service, fetch_remote_subgraph_handle) = + tower_test::mock::spawn::(); setup_remote_subgraph_scenario( fetch_remote_subgraph_from_config, remote_subgraph_scenario.as_ref(), &mut subgraphs, - &mut mock_fetch_remote_subgraphs, - &mut mock_fetch_remote_subgraph, + fetch_remote_subgraphs_handle, + fetch_remote_subgraph_handle, ); setup_sdl_subgraph_scenario(sdl_subgraph_scenario.as_ref(), &mut subgraphs); @@ -625,30 +641,38 @@ mod tests { } }); + let make_fetch_remote_subgraphs_service = FakeMakeService::new( + ServiceBuilder::new() + .map_err(FakeError::from) + .service(fetch_remote_subgraphs_service.into_inner()), + ); + // load remote subgraphs let resolver = resolver - .load_remote_subgraphs(&mock_fetch_remote_subgraphs, graph_ref.as_ref()) + .load_remote_subgraphs(make_fetch_remote_subgraphs_service, graph_ref.as_ref()) .await?; // load from the file descriptor let resolver = resolver .load_from_file_descriptor(&mut mock_read_stdin, Some(&file_descriptor_type))?; - // validate that the correct effect has been invoked - mock_fetch_remote_subgraphs.checkpoint(); + let make_fetch_remote_subgraph_service = FakeMakeService::new( + ServiceBuilder::new() + .map_err(FakeError::from) + .service(fetch_remote_subgraph_service.into_inner()), + ); // fully resolve subgraphs into their SDLs let fully_resolved_supergraph_config = resolver .fully_resolve_subgraphs( &mock_introspect_subgraph, - &mock_fetch_remote_subgraph, + make_fetch_remote_subgraph_service, Some(&local_supergraph_config_path), ) .await?; // validate that the correct effects have been invoked mock_introspect_subgraph.checkpoint(); - mock_fetch_remote_subgraph.checkpoint(); // validate that the federation version is correct assert_that!(fully_resolved_supergraph_config.federation_version()) @@ -737,15 +761,18 @@ mod tests { ) -> Result<()> { let mut subgraphs = BTreeMap::new(); - let mut mock_fetch_remote_subgraphs = MockFetchRemoteSubgraphs::new(); - let mut mock_fetch_remote_subgraph = MockFetchRemoteSubgraph::new(); + let (fetch_remote_subgraphs_service, fetch_remote_subgraphs_handle) = + tower_test::mock::spawn::>( + ); + let (fetch_remote_subgraph_service, fetch_remote_subgraph_handle) = + tower_test::mock::spawn::(); setup_remote_subgraph_scenario( fetch_remote_subgraph_from_config, remote_subgraph_scenario.as_ref(), &mut subgraphs, - &mut mock_fetch_remote_subgraphs, - &mut mock_fetch_remote_subgraph, + fetch_remote_subgraphs_handle, + fetch_remote_subgraph_handle, ); setup_sdl_subgraph_scenario(sdl_subgraph_scenario.as_ref(), &mut subgraphs); @@ -785,31 +812,36 @@ mod tests { } }); + let make_fetch_remote_subgraphs_service = FakeMakeService::new( + ServiceBuilder::new() + .map_err(FakeError::from) + .service(fetch_remote_subgraphs_service.into_inner()), + ); + // load remote subgraphs let resolver = resolver - .load_remote_subgraphs(&mock_fetch_remote_subgraphs, graph_ref.as_ref()) + .load_remote_subgraphs(make_fetch_remote_subgraphs_service, graph_ref.as_ref()) .await?; // load from the file descriptor let resolver = resolver .load_from_file_descriptor(&mut mock_read_stdin, Some(&file_descriptor_type))?; - // validate that the correct effect has been invoked - mock_fetch_remote_subgraphs.checkpoint(); + let make_fetch_remote_subgraph_service = FakeMakeService::new( + ServiceBuilder::new() + .map_err(FakeError::from) + .service(fetch_remote_subgraph_service.into_inner()), + ); // fully resolve subgraphs into their SDLs let fully_resolved_supergraph_config = resolver .fully_resolve_subgraphs( &mock_introspect_subgraph, - &mock_fetch_remote_subgraph, + make_fetch_remote_subgraph_service, Some(&local_supergraph_config_path), ) .await?; - // validate that the correct effects have been invoked - mock_introspect_subgraph.checkpoint(); - mock_fetch_remote_subgraph.checkpoint(); - // validate that the federation version is correct assert_that!(fully_resolved_supergraph_config.federation_version()) .is_equal_to(&FederationVersion::LatestFedTwo); @@ -838,8 +870,11 @@ mod tests { fetch_remote_subgraph_from_config: bool, remote_subgraph_scenario: Option<&RemoteSubgraphScenario>, local_subgraphs: &mut BTreeMap, - mock_fetch_remote_subgraphs: &mut MockFetchRemoteSubgraphs, - mock_fetch_remote_subgraph: &mut MockFetchRemoteSubgraph, + mut fetch_remote_subgraphs_handle: Handle< + FetchRemoteSubgraphsRequest, + BTreeMap, + >, + mut fetch_remote_subgraph_handle: Handle, ) { if let Some(remote_subgraph_scenario) = remote_subgraph_scenario { let schema_source = SchemaSource::Subgraph { @@ -853,55 +888,57 @@ mod tests { // If the remote subgraph scenario exists, add a SubgraphConfig for it to the supergraph config if fetch_remote_subgraph_from_config { local_subgraphs.insert("remote-subgraph".to_string(), subgraph_config); - mock_fetch_remote_subgraphs - .expect_fetch_remote_subgraphs() - .times(0); + fetch_remote_subgraphs_handle.allow(0); } // Otherwise, fetch it by --graph_ref else { - mock_fetch_remote_subgraphs - .expect_fetch_remote_subgraphs() - .times(1) - .with(predicate::eq(remote_subgraph_scenario.graph_ref.clone())) - .returning({ + fetch_remote_subgraphs_handle.allow(1); + tokio::spawn({ + let remote_subgraph_scenario = remote_subgraph_scenario.clone(); + async move { + let (req, send_response) = + fetch_remote_subgraphs_handle.next_request().await.unwrap(); + assert_that!(req).is_equal_to(FetchRemoteSubgraphsRequest::new( + remote_subgraph_scenario.graph_ref.clone(), + )); let subgraph_name = remote_subgraph_scenario.subgraph_name.to_string(); - move |_| { - Ok(BTreeMap::from_iter([( - subgraph_name.to_string(), - subgraph_config.clone(), - )])) - } - }); + send_response.send_response(BTreeMap::from_iter([( + subgraph_name.to_string(), + subgraph_config.clone(), + )])); + } + }); } // we always fetch the SDLs from remote - mock_fetch_remote_subgraph - .expect_fetch_remote_subgraph() - .times(1) - .with( - predicate::eq(remote_subgraph_scenario.graph_ref.clone()), - predicate::eq(remote_subgraph_scenario.subgraph_name.clone()), - ) - .returning({ + fetch_remote_subgraph_handle.allow(1); + tokio::spawn({ + let remote_subgraph_scenario = remote_subgraph_scenario.clone(); + async move { + let (req, send_response) = + fetch_remote_subgraph_handle.next_request().await.unwrap(); + assert_that!(req).is_equal_to( + FetchRemoteSubgraphRequest::builder() + .graph_ref(remote_subgraph_scenario.graph_ref.clone()) + .subgraph_name(remote_subgraph_scenario.subgraph_name.clone()) + .build(), + ); let subgraph_name = remote_subgraph_scenario.subgraph_name.to_string(); let routing_url = remote_subgraph_scenario.routing_url.to_string(); let sdl = remote_subgraph_scenario.sdl.to_string(); - move |_, _| { - Ok(RemoteSubgraph::builder() + send_response.send_response( + RemoteSubgraph::builder() .name(subgraph_name.to_string()) .routing_url(routing_url.to_string()) .schema(sdl.to_string()) - .build()) - } - }); + .build(), + ) + } + }); } else { // if no remote subgraph schemas exist, don't expect them to fetched - mock_fetch_remote_subgraphs - .expect_fetch_remote_subgraphs() - .times(0); - mock_fetch_remote_subgraph - .expect_fetch_remote_subgraph() - .times(0); + fetch_remote_subgraphs_handle.allow(0); + fetch_remote_subgraph_handle.allow(0); } } diff --git a/src/composition/supergraph/config/unresolved/supergraph.rs b/src/composition/supergraph/config/unresolved/supergraph.rs index f8cabac0d..1c4f77650 100644 --- a/src/composition/supergraph/config/unresolved/supergraph.rs +++ b/src/composition/supergraph/config/unresolved/supergraph.rs @@ -62,19 +62,23 @@ mod tests { use mockall::predicate; use rstest::{fixture, rstest}; use speculoos::prelude::*; + use tower::ServiceBuilder; use crate::{ composition::supergraph::config::{ federation::FederationVersionResolverFromSubgraphs, full::{FullyResolvedSubgraph, FullyResolvedSupergraphConfig}, lazy::{LazilyResolvedSubgraph, LazilyResolvedSupergraphConfig}, - resolver::ResolveSupergraphConfigError, + resolver::{ + fetch_remote_subgraph::{FetchRemoteSubgraphRequest, RemoteSubgraph}, + ResolveSupergraphConfigError, + }, scenario::*, unresolved::UnresolvedSupergraphConfig, }, - utils::effect::{ - fetch_remote_subgraph::{MockFetchRemoteSubgraph, RemoteSubgraph}, - introspect::MockIntrospectSubgraph, + utils::{ + effect::introspect::MockIntrospectSubgraph, + service::test::{FakeError, FakeMakeService}, }, }; @@ -302,25 +306,33 @@ mod tests { .. } = remote_subgraph_scenario; - let mut mock_fetch_remote_subgraph = MockFetchRemoteSubgraph::new(); - mock_fetch_remote_subgraph - .expect_fetch_remote_subgraph() - .times(1) - .with( - predicate::eq(remote_subgraph_graph_ref.clone()), - predicate::eq(remote_subgraph_subgraph_name.to_string()), - ) - .returning({ + let (fetch_remote_subgraph_service, mut fetch_remote_subgraph_handle) = + tower_test::mock::spawn::(); + fetch_remote_subgraph_handle.allow(1); + tokio::spawn({ + let remote_subgraph_sdl = remote_subgraph_sdl.clone(); + let remote_subgraph_routing_url = remote_subgraph_routing_url.clone(); + async move { + let (req, send_response) = + fetch_remote_subgraph_handle.next_request().await.unwrap(); + let subgraph_name = remote_subgraph_subgraph_name.to_string(); + assert_that!(req).is_equal_to( + FetchRemoteSubgraphRequest::builder() + .graph_ref(remote_subgraph_graph_ref.clone()) + .subgraph_name(subgraph_name.to_string()) + .build(), + ); let remote_subgraph_sdl = remote_subgraph_sdl.to_string(); let remote_subgraph_routing_url = remote_subgraph_routing_url.to_string(); - move |_, name| { - Ok(RemoteSubgraph::builder() - .name(name.to_string()) + send_response.send_response( + RemoteSubgraph::builder() + .name(subgraph_name) .routing_url(remote_subgraph_routing_url.to_string()) .schema(remote_subgraph_sdl.to_string()) - .build()) - } - }); + .build(), + ) + } + }); let IntrospectSubgraphScenario { sdl: ref introspect_subgraph_sdl, @@ -342,9 +354,15 @@ mod tests { move |_, _| Ok(introspect_subgraph_sdl.to_string()) }); + let make_fetch_remote_subgraph_service = FakeMakeService::new( + ServiceBuilder::new() + .map_err(FakeError::from) + .service(fetch_remote_subgraph_service.into_inner()), + ); + let result = FullyResolvedSupergraphConfig::resolve( &mock_introspect_subgraph, - &mock_fetch_remote_subgraph, + make_fetch_remote_subgraph_service, Some( &Utf8PathBuf::from_path_buf(supergraph_config_root_dir.path().to_path_buf()) .unwrap(), @@ -353,7 +371,6 @@ mod tests { ) .await; - mock_fetch_remote_subgraph.checkpoint(); mock_introspect_subgraph.checkpoint(); let resolved_supergraph_config = assert_that!(result).is_ok().subject; @@ -489,25 +506,38 @@ mod tests { .. } = remote_subgraph_scenario; - let mut mock_fetch_remote_subgraph = MockFetchRemoteSubgraph::new(); - mock_fetch_remote_subgraph - .expect_fetch_remote_subgraph() - .times(1) - .with( - predicate::eq(remote_subgraph_graph_ref.clone()), - predicate::eq(remote_subgraph_subgraph_name.to_string()), - ) - .returning({ + let (fetch_remote_subgraph_service, mut fetch_remote_subgraph_handle) = + tower_test::mock::spawn::(); + + fetch_remote_subgraph_handle.allow(1); + tokio::spawn({ + let remote_subgraph_sdl = remote_subgraph_sdl.clone(); + async move { + let (req, send_response) = + fetch_remote_subgraph_handle.next_request().await.unwrap(); + assert_that!(req).is_equal_to( + FetchRemoteSubgraphRequest::builder() + .graph_ref(remote_subgraph_graph_ref.clone()) + .subgraph_name(remote_subgraph_subgraph_name.to_string()) + .build(), + ); let remote_subgraph_sdl = remote_subgraph_sdl.to_string(); let remote_subgraph_routing_url = remote_subgraph_routing_url.to_string(); - move |_, name| { - Ok(RemoteSubgraph::builder() - .name(name.to_string()) + send_response.send_response( + RemoteSubgraph::builder() + .name(remote_subgraph_subgraph_name.to_string().to_string()) .routing_url(remote_subgraph_routing_url.to_string()) .schema(remote_subgraph_sdl.to_string()) - .build()) - } - }); + .build(), + ); + } + }); + + let make_fetch_remote_subgraph_service = FakeMakeService::new( + ServiceBuilder::new() + .map_err(FakeError::from) + .service(fetch_remote_subgraph_service.into_inner()), + ); let IntrospectSubgraphScenario { sdl: ref introspect_subgraph_sdl, @@ -531,7 +561,7 @@ mod tests { let result = FullyResolvedSupergraphConfig::resolve( &mock_introspect_subgraph, - &mock_fetch_remote_subgraph, + make_fetch_remote_subgraph_service, Some( &Utf8PathBuf::from_path_buf(supergraph_config_root_dir.path().to_path_buf()) .unwrap(), @@ -540,7 +570,6 @@ mod tests { ) .await; - mock_fetch_remote_subgraph.checkpoint(); mock_introspect_subgraph.checkpoint(); let mut fed_two_subgraph_names = HashSet::new(); diff --git a/src/utils/effect/test/mod.rs b/src/utils/effect/test/mod.rs index c0720c1e0..38638209d 100644 --- a/src/utils/effect/test/mod.rs +++ b/src/utils/effect/test/mod.rs @@ -1,3 +1,2 @@ pub const SUBGRAPH_INTROSPECTION_QUERY: &str = include_str!("./subgraph_introspection_query.graphql"); -pub const SUBGRAPH_FETCH_QUERY: &str = include_str!("./subgraph_fetch_query.graphql"); diff --git a/src/utils/effect/test/subgraph_fetch_query.graphql b/src/utils/effect/test/subgraph_fetch_query.graphql deleted file mode 100644 index 98fbfa5b7..000000000 --- a/src/utils/effect/test/subgraph_fetch_query.graphql +++ /dev/null @@ -1,16 +0,0 @@ -query SubgraphFetchQuery($graph_ref: ID!, $subgraph_name: ID!) { - variant(ref: $graph_ref) { - __typename - ... on GraphVariant { - subgraph(name: $subgraph_name) { - url, - activePartialSchema { - sdl - } - } - subgraphs { - name - } - } - } -} diff --git a/src/utils/mod.rs b/src/utils/mod.rs index e4cdd8609..e35b222d4 100644 --- a/src/utils/mod.rs +++ b/src/utils/mod.rs @@ -3,6 +3,7 @@ pub mod effect; pub mod env; pub mod parsers; pub mod pkg; +pub mod service; pub mod stringify; #[cfg(feature = "composition-js")] pub mod supergraph_config; diff --git a/src/utils/service/mod.rs b/src/utils/service/mod.rs new file mode 100644 index 000000000..82e03f9a4 --- /dev/null +++ b/src/utils/service/mod.rs @@ -0,0 +1,2 @@ +#[cfg(test)] +pub mod test; diff --git a/src/utils/service/test.rs b/src/utils/service/test.rs new file mode 100644 index 000000000..e2b43f338 --- /dev/null +++ b/src/utils/service/test.rs @@ -0,0 +1,47 @@ +use std::{convert::Infallible, pin::Pin}; + +use futures::Future; +use tower::Service; + +#[derive(thiserror::Error, Debug)] +#[error(transparent)] +pub struct FakeError(Box); + +impl From> for FakeError { + fn from(value: Box) -> Self { + FakeError(value) + } +} + +#[derive(Clone)] +pub struct FakeMakeService { + service: S, +} + +impl FakeMakeService { + pub fn new(service: S) -> FakeMakeService { + FakeMakeService { service } + } +} + +impl Service<()> for FakeMakeService +where + S: Clone + Send + 'static, +{ + type Response = S; + type Error = Infallible; + type Future = Pin> + Send>>; + + fn poll_ready( + &mut self, + _cx: &mut std::task::Context<'_>, + ) -> std::task::Poll> { + std::task::Poll::Ready(Ok(())) + } + + fn call(&mut self, _req: ()) -> Self::Future { + let service = self.service.clone(); + let fut = async move { Ok(service) }; + Box::pin(fut) + } +} From e1379559295cec2b14ae969f26c5cb7ef186d51f Mon Sep 17 00:00:00 2001 From: Brian George Date: Thu, 19 Dec 2024 10:14:55 -0500 Subject: [PATCH 5/5] Update `routing_url` calculation for introspection watchers --- src/composition/watchers/watcher/subgraph.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/composition/watchers/watcher/subgraph.rs b/src/composition/watchers/watcher/subgraph.rs index e3a555d73..7388650b4 100644 --- a/src/composition/watchers/watcher/subgraph.rs +++ b/src/composition/watchers/watcher/subgraph.rs @@ -82,7 +82,7 @@ impl SubgraphWatcher { client_config, introspection_polling_interval, )), - routing_url: Some(subgraph_url.to_string()), + routing_url: routing_url.or_else(|| Some(subgraph_url.to_string())), }), SchemaSource::Subgraph { graphref, subgraph } => Ok(Self { watcher: SubgraphWatcherKind::Once(NonRepeatingFetch::RemoteSchema(