Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Basic GraphQL support #46

Merged
merged 18 commits into from
Apr 24, 2019
2 changes: 2 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,12 +30,14 @@ serde_json = { version = "1", optional = true }
csv = { version = "1", optional = true }
chrono = { version = "0.4", optional = true }
uuid = { version = "0.7", features = ["serde"], optional = true }
graphql-parser = { version = "0.2.2", optional = true }

[features]
real-time = []
set-semantics = []
csv-source = ["csv", "chrono"]
json-source = ["serde_json", "chrono"]
graphql = ["graphql-parser", "serde_json"]

[profile.release]
opt-level = 3
Expand Down
229 changes: 229 additions & 0 deletions src/plan/graphql.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,229 @@
//! GraphQL expression plan.

use timely::dataflow::scopes::child::Iterative;
use timely::dataflow::Scope;
use timely::progress::Timestamp;

use differential_dataflow::lattice::Lattice;

use graphql_parser::parse_query;
use graphql_parser::query::{Definition, Document, OperationDefinition, Selection, SelectionSet};

use crate::plan::{Dependencies, ImplContext, Implementable};
use crate::plan::{Plan, Pull, PullLevel};
use crate::{Implemented, ShutdownHandle, VariableMap};

/// A plan for GraphQL queries, e.g. `{ Heroes { name age weight } }`
#[derive(Hash, PartialEq, Eq, PartialOrd, Ord, Clone, Debug, Serialize, Deserialize)]
pub struct GraphQl {
/// String representation of GraphQL query
pub query: String,
/// Cached paths
pub paths: Vec<PullLevel<Plan>>,
}

impl GraphQl {
/// Creates a new GraphQL instance by parsing the ast obtained from the provided query
pub fn new(query: String) -> Self {
let q = query.clone();
GraphQl {
query,
paths: ast_to_paths(parse_query(&q).expect("graphQL ast parsing failed")),
}
}
}

fn selection_set_to_paths(
selection_set: &SelectionSet,
parent_path: &Vec<String>,
at_root: bool,
) -> Vec<PullLevel<Plan>> {
let mut result = vec![];
let mut pull_attributes = vec![];
let variables = vec![];

for item in &selection_set.items {
match item {
Selection::Field(field) => {
if field.selection_set.items.is_empty() {
pull_attributes.push(field.name.to_string());
}

let mut new_parent_path = parent_path.to_vec();
new_parent_path.push(field.name.to_string());

result.extend(selection_set_to_paths(
&field.selection_set,
&new_parent_path,
parent_path.is_empty(),
));
}
_ => unimplemented!(),
}
}

// parent_path handles root path case
if !pull_attributes.is_empty() && !parent_path.is_empty() {
// for root, we expect a NameExpr that puts the pulled IDs in the v position
let plan = if at_root {
Plan::NameExpr(vec![0, 1], parent_path.last().unwrap().to_string())
} else {
Plan::MatchA(0, parent_path.last().unwrap().to_string(), 1)
};

let pull_level = PullLevel {
pull_attributes,
path_attributes: parent_path.to_vec(),
pull_variable: 1,
variables,
plan: Box::new(plan),
};
result.push(pull_level);
}

result
}

/// converts an ast to paths
/// The structure of a typical parsed ast looks like this:
/// ```
/// Document {
/// definitions: [
/// Operation(SelectionSet(SelectionSet {
/// items: [
/// Field(Field {
/// name: ...,
/// selection_set: SelectionSet(...}
/// }),
/// ...
/// ]
/// }))
/// ]
/// }
/// ```
fn ast_to_paths(ast: Document) -> Vec<PullLevel<Plan>> {
let mut result = vec![];
for definition in &ast.definitions {
match definition {
Definition::Operation(operation_definition) => match operation_definition {
OperationDefinition::Query(_) => unimplemented!(),
OperationDefinition::SelectionSet(selection_set) => {
result.extend(selection_set_to_paths(selection_set, &vec![], true))
}
_ => unimplemented!(),
},
Definition::Fragment(_) => unimplemented!(),
};
}

result
}

/// Converts a vector of paths to a GraphQL-like nested value.
pub fn paths_to_nested(paths: Vec<Vec<crate::Value>>) -> serde_json::Value {
use crate::Value::{Aid, Eid};
use serde_json::map::Map;

let mut acc = Map::new();
for mut path in paths {
let mut current_map = &mut acc;
let last_val = path.pop().unwrap();

if let Aid(last_key) = path.pop().unwrap() {
for attribute in path {
let attr = match attribute {
Aid(x) => x,
Eid(x) => x.to_string(),
_ => unreachable!(),
};

let entry = current_map
.entry(attr)
.or_insert_with(|| serde_json::Value::Object(Map::new()));

*entry = match entry {
serde_json::Value::Object(m) => {
serde_json::Value::Object(std::mem::replace(m, Map::new()))
}
serde_json::Value::Array(_) => unreachable!(),
_ => serde_json::Value::Object(Map::new()),
};

match entry {
serde_json::Value::Object(m) => current_map = m,
_ => unreachable!(),
};
}

match current_map.get(&last_key) {
Some(serde_json::Value::Object(_)) => (),
_ => {
current_map.insert(last_key, serde_json::json!(last_val));
}
};
} else {
unreachable!();
}
}

serde_json::Value::Object(acc)
}

impl Implementable for GraphQl {
fn dependencies(&self) -> Dependencies {
let mut dependencies = Dependencies::none();

for path in self.paths.iter() {
dependencies = Dependencies::merge(dependencies, path.dependencies());
}

dependencies
}

fn implement<'b, T, I, S>(
&self,
nested: &mut Iterative<'b, S, u64>,
local_arrangements: &VariableMap<Iterative<'b, S, u64>>,
context: &mut I,
) -> (Implemented<'b, S>, ShutdownHandle)
where
T: Timestamp + Lattice,
I: ImplContext<T>,
S: Scope<Timestamp = T>,
{
let ast = parse_query(&self.query).expect("graphQL ast parsing failed");
let parsed = Pull {
variables: vec![],
paths: ast_to_paths(ast),
};

parsed.implement(nested, local_arrangements, context)
}
}

// relation
// .inner
// .map(|x| ((), x))
// .inspect(|x| { println!("{:?}", x); })
// .aggregate::<_,Vec<_>,_,_,_>(
// |_key, (path, _time, _diff), acc| { acc.push(path); },
// |_key, paths| {
// paths_to_nested(paths)
// // squash_nested(nested)
// },
// |_key| 1)

// /// Register a GraphQL query
// pub fn register_graph_ql(&mut self, query: String, name: &str) {
// use crate::plan::{GraphQl, Plan};

// let req = Register {
// rules: vec![Rule {
// name: name.to_string(),
// plan: Plan::GraphQl(GraphQl::new(query)),
// }],
// publish: vec![name.to_string()],
// };

// self.register(req).unwrap();
// }
17 changes: 17 additions & 0 deletions src/plan/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,8 @@ pub mod aggregate;
pub mod aggregate_neu;
pub mod antijoin;
pub mod filter;
#[cfg(feature = "graphql")]
pub mod graphql;
pub mod hector;
pub mod join;
pub mod project;
Expand All @@ -39,6 +41,8 @@ pub use self::aggregate::{Aggregate, AggregationFn};
pub use self::aggregate_neu::{Aggregate, AggregationFn};
pub use self::antijoin::Antijoin;
pub use self::filter::{Filter, Predicate};
#[cfg(feature = "graphql")]
pub use self::graphql::GraphQl;
pub use self::hector::Hector;
pub use self::join::Join;
pub use self::project::Project;
Expand Down Expand Up @@ -205,6 +209,9 @@ pub enum Plan {
Pull(Pull<Plan>),
/// Single-level pull expression
PullLevel(PullLevel<Plan>),
/// GraphQl pull expression
#[cfg(feature = "graphql")]
GraphQl(GraphQl),
}

impl Plan {
Expand All @@ -226,6 +233,8 @@ impl Plan {
Plan::NameExpr(ref variables, ref _name) => variables.clone(),
Plan::Pull(ref pull) => pull.variables.clone(),
Plan::PullLevel(ref path) => path.variables.clone(),
#[cfg(feature = "graphql")]
Plan::GraphQl(_) => unimplemented!(),
}
}
}
Expand All @@ -249,6 +258,8 @@ impl Implementable for Plan {
Plan::NameExpr(_, ref name) => Dependencies::name(name),
Plan::Pull(ref pull) => pull.dependencies(),
Plan::PullLevel(ref path) => path.dependencies(),
#[cfg(feature = "graphql")]
Plan::GraphQl(ref q) => q.dependencies(),
}
}

Expand Down Expand Up @@ -282,6 +293,8 @@ impl Implementable for Plan {
Plan::NameExpr(_, ref _name) => unimplemented!(), // @TODO hmm...
Plan::Pull(ref pull) => pull.into_bindings(),
Plan::PullLevel(ref path) => path.into_bindings(),
#[cfg(feature = "graphql")]
Plan::GraphQl(ref q) => q.into_bindings(),
}
}

Expand Down Expand Up @@ -321,6 +334,8 @@ impl Implementable for Plan {
Plan::NameExpr(_, ref _name) => Vec::new(),
Plan::Pull(ref pull) => pull.datafy(),
Plan::PullLevel(ref path) => path.datafy(),
#[cfg(feature = "graphql")]
Plan::GraphQl(ref q) => q.datafy(),
}
}

Expand Down Expand Up @@ -487,6 +502,8 @@ impl Implementable for Plan {
}
Plan::Pull(ref pull) => pull.implement(nested, local_arrangements, context),
Plan::PullLevel(ref path) => path.implement(nested, local_arrangements, context),
#[cfg(feature = "graphql")]
Plan::GraphQl(ref query) => query.implement(nested, local_arrangements, context),
}
}
}
16 changes: 14 additions & 2 deletions src/plan/pull.rs
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,14 @@ fn interleave(values: &[Value], constants: &[Aid]) -> Vec<Value> {

impl<P: Implementable> Implementable for PullLevel<P> {
fn dependencies(&self) -> Dependencies {
Dependencies::none()
let mut dependencies = self.plan.dependencies();

for attribute in &self.pull_attributes {
let attribute_dependencies = Dependencies::attribute(&attribute);
dependencies = Dependencies::merge(dependencies, attribute_dependencies);
}

dependencies
}

fn implement<'b, T, I, S>(
Expand Down Expand Up @@ -186,7 +193,12 @@ impl<P: Implementable> Implementable for PullLevel<P> {

impl<P: Implementable> Implementable for Pull<P> {
fn dependencies(&self) -> Dependencies {
Dependencies::none()
let mut dependencies = Dependencies::none();
for path in self.paths.iter() {
dependencies = Dependencies::merge(dependencies, path.dependencies());
}

dependencies
}

fn implement<'b, T, I, S>(
Expand Down
Loading