Skip to content

Commit 2ae8cd8

Browse files
committed
Make ExecutionPlan sync (#2307)
1 parent b7bb2cf commit 2ae8cd8

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

45 files changed

+160
-259
lines changed

ballista/rust/core/src/execution_plans/distributed_query.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,6 @@ use datafusion::physical_plan::{
4141

4242
use crate::serde::protobuf::execute_query_params::OptionalSessionId;
4343
use crate::serde::{AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec};
44-
use async_trait::async_trait;
4544
use datafusion::arrow::error::{ArrowError, Result as ArrowResult};
4645
use datafusion::arrow::record_batch::RecordBatch;
4746
use datafusion::execution::context::TaskContext;
@@ -122,7 +121,6 @@ impl<T: 'static + AsLogicalPlan> DistributedQueryExec<T> {
122121
}
123122
}
124123

125-
#[async_trait]
126124
impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
127125
fn as_any(&self) -> &dyn Any {
128126
self
@@ -162,7 +160,7 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
162160
}))
163161
}
164162

165-
async fn execute(
163+
fn execute(
166164
&self,
167165
partition: usize,
168166
_context: Arc<TaskContext>,

ballista/rust/core/src/execution_plans/shuffle_reader.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -21,7 +21,6 @@ use std::sync::Arc;
2121
use crate::client::BallistaClient;
2222
use crate::serde::scheduler::{PartitionLocation, PartitionStats};
2323

24-
use async_trait::async_trait;
2524
use datafusion::arrow::datatypes::SchemaRef;
2625

2726
use datafusion::error::{DataFusionError, Result};
@@ -64,7 +63,6 @@ impl ShuffleReaderExec {
6463
}
6564
}
6665

67-
#[async_trait]
6866
impl ExecutionPlan for ShuffleReaderExec {
6967
fn as_any(&self) -> &dyn Any {
7068
self
@@ -101,7 +99,7 @@ impl ExecutionPlan for ShuffleReaderExec {
10199
))
102100
}
103101

104-
async fn execute(
102+
fn execute(
105103
&self,
106104
partition: usize,
107105
_context: Arc<TaskContext>,

ballista/rust/core/src/execution_plans/shuffle_writer.rs

+4-6
Original file line numberDiff line numberDiff line change
@@ -33,7 +33,6 @@ use crate::utils;
3333

3434
use crate::serde::protobuf::ShuffleWritePartition;
3535
use crate::serde::scheduler::PartitionStats;
36-
use async_trait::async_trait;
3736
use datafusion::arrow::array::{
3837
ArrayBuilder, ArrayRef, StringBuilder, StructBuilder, UInt32Builder, UInt64Builder,
3938
};
@@ -155,7 +154,7 @@ impl ShuffleWriterExec {
155154

156155
async move {
157156
let now = Instant::now();
158-
let mut stream = plan.execute(input_partition, context).await?;
157+
let mut stream = plan.execute(input_partition, context)?;
159158

160159
match output_partitioning {
161160
None => {
@@ -293,7 +292,6 @@ impl ShuffleWriterExec {
293292
}
294293
}
295294

296-
#[async_trait]
297295
impl ExecutionPlan for ShuffleWriterExec {
298296
fn as_any(&self) -> &dyn Any {
299297
self
@@ -336,7 +334,7 @@ impl ExecutionPlan for ShuffleWriterExec {
336334
)?))
337335
}
338336

339-
async fn execute(
337+
fn execute(
340338
&self,
341339
partition: usize,
342340
context: Arc<TaskContext>,
@@ -459,7 +457,7 @@ mod tests {
459457
work_dir.into_path().to_str().unwrap().to_owned(),
460458
Some(Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 2)),
461459
)?;
462-
let mut stream = query_stage.execute(0, task_ctx).await?;
460+
let mut stream = query_stage.execute(0, task_ctx)?;
463461
let batches = utils::collect_stream(&mut stream)
464462
.await
465463
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;
@@ -516,7 +514,7 @@ mod tests {
516514
work_dir.into_path().to_str().unwrap().to_owned(),
517515
Some(Partitioning::Hash(vec![Arc::new(Column::new("a", 0))], 2)),
518516
)?;
519-
let mut stream = query_stage.execute(0, task_ctx).await?;
517+
let mut stream = query_stage.execute(0, task_ctx)?;
520518
let batches = utils::collect_stream(&mut stream)
521519
.await
522520
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;

ballista/rust/core/src/execution_plans/unresolved_shuffle.rs

+1-3
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818
use std::any::Any;
1919
use std::sync::Arc;
2020

21-
use async_trait::async_trait;
2221
use datafusion::arrow::datatypes::SchemaRef;
2322
use datafusion::error::{DataFusionError, Result};
2423
use datafusion::execution::context::TaskContext;
@@ -63,7 +62,6 @@ impl UnresolvedShuffleExec {
6362
}
6463
}
6564

66-
#[async_trait]
6765
impl ExecutionPlan for UnresolvedShuffleExec {
6866
fn as_any(&self) -> &dyn Any {
6967
self
@@ -101,7 +99,7 @@ impl ExecutionPlan for UnresolvedShuffleExec {
10199
))
102100
}
103101

104-
async fn execute(
102+
fn execute(
105103
&self,
106104
_partition: usize,
107105
_context: Arc<TaskContext>,

ballista/rust/core/src/serde/mod.rs

+1-2
Original file line numberDiff line numberDiff line change
@@ -477,7 +477,6 @@ mod tests {
477477
}
478478
}
479479

480-
#[async_trait]
481480
impl ExecutionPlan for TopKExec {
482481
/// Return a reference to Any that can be used for downcasting
483482
fn as_any(&self) -> &dyn Any {
@@ -515,7 +514,7 @@ mod tests {
515514
}
516515

517516
/// Execute one partition and return an iterator over RecordBatch
518-
async fn execute(
517+
fn execute(
519518
&self,
520519
_partition: usize,
521520
_context: Arc<TaskContext>,

ballista/rust/executor/src/collect.rs

+3-7
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,6 @@ use std::sync::Arc;
2222
use std::task::{Context, Poll};
2323
use std::{any::Any, pin::Pin};
2424

25-
use async_trait::async_trait;
2625
use datafusion::arrow::{
2726
datatypes::SchemaRef, error::Result as ArrowResult, record_batch::RecordBatch,
2827
};
@@ -49,7 +48,6 @@ impl CollectExec {
4948
}
5049
}
5150

52-
#[async_trait]
5351
impl ExecutionPlan for CollectExec {
5452
fn as_any(&self) -> &dyn Any {
5553
self
@@ -78,18 +76,16 @@ impl ExecutionPlan for CollectExec {
7876
unimplemented!()
7977
}
8078

81-
async fn execute(
79+
fn execute(
8280
&self,
8381
partition: usize,
8482
context: Arc<TaskContext>,
8583
) -> Result<SendableRecordBatchStream> {
8684
assert_eq!(0, partition);
8785
let num_partitions = self.plan.output_partitioning().partition_count();
8886

89-
let futures = (0..num_partitions).map(|i| self.plan.execute(i, context.clone()));
90-
let streams = futures::future::join_all(futures)
91-
.await
92-
.into_iter()
87+
let streams = (0..num_partitions)
88+
.map(|i| self.plan.execute(i, context.clone()))
9389
.collect::<Result<Vec<_>>>()
9490
.map_err(|e| DataFusionError::Execution(format!("BallistaError: {:?}", e)))?;
9591

datafusion-examples/examples/custom_datasource.rs

+3-4
Original file line numberDiff line numberDiff line change
@@ -196,7 +196,6 @@ impl CustomExec {
196196
}
197197
}
198198

199-
#[async_trait]
200199
impl ExecutionPlan for CustomExec {
201200
fn as_any(&self) -> &dyn Any {
202201
self
@@ -225,7 +224,7 @@ impl ExecutionPlan for CustomExec {
225224
Ok(self)
226225
}
227226

228-
async fn execute(
227+
fn execute(
229228
&self,
230229
_partition: usize,
231230
_context: Arc<TaskContext>,
@@ -243,7 +242,7 @@ impl ExecutionPlan for CustomExec {
243242
account_array.append_value(user.bank_account)?;
244243
}
245244

246-
return Ok(Box::pin(MemoryStream::try_new(
245+
Ok(Box::pin(MemoryStream::try_new(
247246
vec![RecordBatch::try_new(
248247
self.projected_schema.clone(),
249248
vec![
@@ -253,7 +252,7 @@ impl ExecutionPlan for CustomExec {
253252
)?],
254253
self.schema(),
255254
None,
256-
)?));
255+
)?))
257256
}
258257

259258
fn statistics(&self) -> Statistics {

datafusion/core/src/datasource/file_format/csv.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -161,7 +161,7 @@ mod tests {
161161
let projection = Some(vec![0, 1, 2, 3, 4, 5, 6, 7, 8, 10, 11, 12]);
162162
let exec = get_exec("aggregate_test_100.csv", &projection, None).await?;
163163
let task_ctx = ctx.task_ctx();
164-
let stream = exec.execute(0, task_ctx).await?;
164+
let stream = exec.execute(0, task_ctx)?;
165165

166166
let tt_batches: i32 = stream
167167
.map(|batch| {

datafusion/core/src/datasource/file_format/json.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,7 @@ mod tests {
131131
let projection = None;
132132
let exec = get_exec(&projection, None).await?;
133133
let task_ctx = ctx.task_ctx();
134-
let stream = exec.execute(0, task_ctx).await?;
134+
let stream = exec.execute(0, task_ctx)?;
135135

136136
let tt_batches: i32 = stream
137137
.map(|batch| {

datafusion/core/src/datasource/file_format/parquet.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -512,7 +512,7 @@ mod tests {
512512
let projection = None;
513513
let exec = get_exec("alltypes_plain.parquet", &projection, None).await?;
514514
let task_ctx = ctx.task_ctx();
515-
let stream = exec.execute(0, task_ctx).await?;
515+
let stream = exec.execute(0, task_ctx)?;
516516

517517
let tt_batches = stream
518518
.map(|batch| {

datafusion/core/src/datasource/memory.rs

+5-5
Original file line numberDiff line numberDiff line change
@@ -76,7 +76,7 @@ impl MemTable {
7676
let context1 = context.clone();
7777
let exec = exec.clone();
7878
tokio::spawn(async move {
79-
let stream = exec.execute(part_i, context1.clone()).await?;
79+
let stream = exec.execute(part_i, context1.clone())?;
8080
common::collect(stream).await
8181
})
8282
})
@@ -103,7 +103,7 @@ impl MemTable {
103103
let mut output_partitions = vec![];
104104
for i in 0..exec.output_partitioning().partition_count() {
105105
// execute this *output* partition and collect all batches
106-
let mut stream = exec.execute(i, context.clone()).await?;
106+
let mut stream = exec.execute(i, context.clone())?;
107107
let mut batches = vec![];
108108
while let Some(result) = stream.next().await {
109109
batches.push(result?);
@@ -177,7 +177,7 @@ mod tests {
177177

178178
// scan with projection
179179
let exec = provider.scan(&Some(vec![2, 1]), &[], None).await?;
180-
let mut it = exec.execute(0, task_ctx).await?;
180+
let mut it = exec.execute(0, task_ctx)?;
181181
let batch2 = it.next().await.unwrap()?;
182182
assert_eq!(2, batch2.schema().fields().len());
183183
assert_eq!("c", batch2.schema().field(0).name());
@@ -209,7 +209,7 @@ mod tests {
209209
let provider = MemTable::try_new(schema, vec![vec![batch]])?;
210210

211211
let exec = provider.scan(&None, &[], None).await?;
212-
let mut it = exec.execute(0, task_ctx).await?;
212+
let mut it = exec.execute(0, task_ctx)?;
213213
let batch1 = it.next().await.unwrap()?;
214214
assert_eq!(3, batch1.schema().fields().len());
215215
assert_eq!(3, batch1.num_columns());
@@ -365,7 +365,7 @@ mod tests {
365365
MemTable::try_new(Arc::new(merged_schema), vec![vec![batch1, batch2]])?;
366366

367367
let exec = provider.scan(&None, &[], None).await?;
368-
let mut it = exec.execute(0, task_ctx).await?;
368+
let mut it = exec.execute(0, task_ctx)?;
369369
let batch1 = it.next().await.unwrap()?;
370370
assert_eq!(3, batch1.schema().fields().len());
371371
assert_eq!(3, batch1.num_columns());

datafusion/core/src/physical_optimizer/aggregate_statistics.rs

+1-1
Original file line numberDiff line numberDiff line change
@@ -304,7 +304,7 @@ mod tests {
304304

305305
// A ProjectionExec is a sign that the count optimization was applied
306306
assert!(optimized.as_any().is::<ProjectionExec>());
307-
let result = common::collect(optimized.execute(0, task_ctx).await?).await?;
307+
let result = common::collect(optimized.execute(0, task_ctx)?).await?;
308308
assert_eq!(result[0].schema(), Arc::new(Schema::new(vec![col])));
309309
assert_eq!(
310310
result[0]

datafusion/core/src/physical_plan/aggregates/mod.rs

+5-10
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ use crate::physical_plan::{
2929
};
3030
use arrow::array::ArrayRef;
3131
use arrow::datatypes::{Field, Schema, SchemaRef};
32-
use async_trait::async_trait;
3332
use datafusion_common::Result;
3433
use datafusion_expr::Accumulator;
3534
use datafusion_physical_expr::expressions::Column;
@@ -145,7 +144,6 @@ impl AggregateExec {
145144
}
146145
}
147146

148-
#[async_trait]
149147
impl ExecutionPlan for AggregateExec {
150148
/// Return a reference to Any that can be used for down-casting
151149
fn as_any(&self) -> &dyn Any {
@@ -196,12 +194,12 @@ impl ExecutionPlan for AggregateExec {
196194
)?))
197195
}
198196

199-
async fn execute(
197+
fn execute(
200198
&self,
201199
partition: usize,
202200
context: Arc<TaskContext>,
203201
) -> Result<SendableRecordBatchStream> {
204-
let input = self.input.execute(partition, context).await?;
202+
let input = self.input.execute(partition, context)?;
205203
let group_expr = self.group_expr.iter().map(|x| x.0.clone()).collect();
206204

207205
let baseline_metrics = BaselineMetrics::new(&self.metrics, partition);
@@ -417,7 +415,6 @@ mod tests {
417415
use arrow::datatypes::{DataType, Field, Schema, SchemaRef};
418416
use arrow::error::Result as ArrowResult;
419417
use arrow::record_batch::RecordBatch;
420-
use async_trait::async_trait;
421418
use datafusion_common::{DataFusionError, Result};
422419
use datafusion_physical_expr::{AggregateExpr, PhysicalExpr, PhysicalSortExpr};
423420
use futures::{FutureExt, Stream};
@@ -489,8 +486,7 @@ mod tests {
489486
)?);
490487

491488
let result =
492-
common::collect(partial_aggregate.execute(0, task_ctx.clone()).await?)
493-
.await?;
489+
common::collect(partial_aggregate.execute(0, task_ctx.clone())?).await?;
494490

495491
let expected = vec![
496492
"+---+---------------+-------------+",
@@ -522,7 +518,7 @@ mod tests {
522518
)?);
523519

524520
let result =
525-
common::collect(merged_aggregate.execute(0, task_ctx.clone()).await?).await?;
521+
common::collect(merged_aggregate.execute(0, task_ctx.clone())?).await?;
526522
assert_eq!(result.len(), 1);
527523

528524
let batch = &result[0];
@@ -556,7 +552,6 @@ mod tests {
556552
pub yield_first: bool,
557553
}
558554

559-
#[async_trait]
560555
impl ExecutionPlan for TestYieldingExec {
561556
fn as_any(&self) -> &dyn Any {
562557
self
@@ -587,7 +582,7 @@ mod tests {
587582
)))
588583
}
589584

590-
async fn execute(
585+
fn execute(
591586
&self,
592587
_partition: usize,
593588
_context: Arc<TaskContext>,

0 commit comments

Comments
 (0)