Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to datafusion 16 #634

Closed
wants to merge 13 commits into from
Prev Previous commit
Next Next commit
Paste results back into test = passing test
  • Loading branch information
Brent Gardner committed Jan 26, 2023
commit 83ec9f13ae0c434bf717df6534ea746aa2e82858
8 changes: 4 additions & 4 deletions ballista/scheduler/src/state/execution_graph.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1686,13 +1686,13 @@ mod test {

// With the improvement of https://github.com/apache/arrow-datafusion/pull/4122,
// unnecessary RepartitionExec can be removed
assert_eq!(join_graph.stage_count(), 4);
assert_eq!(join_graph.stage_count(), 5);
assert_eq!(join_graph.available_tasks(), 0);

// Call revive to move the two leaf Resolved stages to Running
join_graph.revive();

assert_eq!(join_graph.stage_count(), 4);
assert_eq!(join_graph.stage_count(), 5);
assert_eq!(join_graph.available_tasks(), 2);

// Complete the first stage
Expand Down Expand Up @@ -1737,13 +1737,13 @@ mod test {
let executor2 = mock_executor("executor-id2".to_string());
let mut join_graph = test_join_plan(4).await;

assert_eq!(join_graph.stage_count(), 4);
assert_eq!(join_graph.stage_count(), 5);
assert_eq!(join_graph.available_tasks(), 0);

// Call revive to move the two leaf Resolved stages to Running
join_graph.revive();

assert_eq!(join_graph.stage_count(), 4);
assert_eq!(join_graph.stage_count(), 5);
assert_eq!(join_graph.available_tasks(), 2);

// Complete the first stage
Expand Down
20 changes: 16 additions & 4 deletions ballista/scheduler/src/state/execution_graph_dot.rs
Original file line number Diff line number Diff line change
Expand Up @@ -606,14 +606,26 @@ filter_expr="]
#[tokio::test]
async fn query_stage_optimized() -> Result<()> {
let graph = test_graph_optimized().await?;
let dot = ExecutionGraphDot::generate_for_query_stage(Arc::new(graph), 4)
let dot = ExecutionGraphDot::generate_for_query_stage(Arc::new(graph), 3)
.map_err(|e| BallistaError::Internal(format!("{e:?}")))?;

let expected = r#"
digraph G {
stage_4_0 [shape=box, label="ShuffleWriter [0 partitions]"]
stage_4_0_0 [shape=box, label="MemoryExec"]
stage_4_0_0 -> stage_4_0
stage_3_0 [shape=box, label="ShuffleWriter [48 partitions]"]
stage_3_0_0 [shape=box, label="CoalesceBatches [batchSize=8192]"]
stage_3_0_0_0 [shape=box, label="HashJoin
join_expr=a@0 = a@0
filter_expr="]
stage_3_0_0_0_0 [shape=box, label="CoalesceBatches [batchSize=8192]"]
stage_3_0_0_0_0_0 [shape=box, label="UnresolvedShuffleExec [stage_id=1]"]
stage_3_0_0_0_0_0 -> stage_3_0_0_0_0
stage_3_0_0_0_0 -> stage_3_0_0_0
stage_3_0_0_0_1 [shape=box, label="CoalesceBatches [batchSize=8192]"]
stage_3_0_0_0_1_0 [shape=box, label="UnresolvedShuffleExec [stage_id=2]"]
stage_3_0_0_0_1_0 -> stage_3_0_0_0_1
stage_3_0_0_0_1 -> stage_3_0_0_0
stage_3_0_0_0 -> stage_3_0_0
stage_3_0_0 -> stage_3_0
}
"#
.trim();
Expand Down