Skip to content

Commit

Permalink
Update physical_plan tests to not use SessionContext (#7243)
Browse files Browse the repository at this point in the history
* Update `physical_plan` tests to not use SessionContext

* fix
  • Loading branch information
alamb authored Aug 9, 2023
1 parent c08c30f commit a08ce40
Show file tree
Hide file tree
Showing 22 changed files with 467 additions and 498 deletions.
28 changes: 9 additions & 19 deletions datafusion/core/src/physical_plan/aggregates/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1215,7 +1215,6 @@ fn evaluate_group_by(
#[cfg(test)]
mod tests {
use super::*;
use crate::execution::context::SessionConfig;
use crate::physical_plan::aggregates::GroupByOrderMode::{
FullyOrdered, PartiallyOrdered,
};
Expand All @@ -1231,7 +1230,6 @@ mod tests {
DisplayAs, ExecutionPlan, Partitioning, RecordBatchStream,
SendableRecordBatchStream, Statistics,
};
use crate::prelude::SessionContext;
use crate::test::exec::{assert_strong_count_converges_to_zero, BlockingExec};
use crate::test::{assert_is_pending, csv_exec_sorted};
use crate::{assert_batches_eq, assert_batches_sorted_eq, physical_plan::common};
Expand Down Expand Up @@ -1449,8 +1447,7 @@ mod tests {
DataType::Int64,
))];

let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());

let partial_aggregate = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
Expand Down Expand Up @@ -1556,8 +1553,7 @@ mod tests {
DataType::Float64,
))];

let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());

let partial_aggregate = Arc::new(AggregateExec::try_new(
AggregateMode::Partial,
Expand Down Expand Up @@ -1779,14 +1775,11 @@ mod tests {
Arc::new(TestYieldingExec { yield_first: true });
let input_schema = input.schema();

let session_ctx = SessionContext::with_config_rt(
SessionConfig::default(),
Arc::new(
RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(1, 1.0))
.unwrap(),
),
let runtime = Arc::new(
RuntimeEnv::new(RuntimeConfig::default().with_memory_limit(1, 1.0)).unwrap(),
);
let task_ctx = session_ctx.task_ctx();
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);

let groups_none = PhysicalGroupBy::default();
let groups_some = PhysicalGroupBy {
Expand Down Expand Up @@ -1864,8 +1857,7 @@ mod tests {

#[tokio::test]
async fn test_drop_cancel_without_groups() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));

Expand Down Expand Up @@ -1901,8 +1893,7 @@ mod tests {

#[tokio::test]
async fn test_drop_cancel_with_groups() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());
let schema = Arc::new(Schema::new(vec![
Field::new("a", DataType::Float32, true),
Field::new("b", DataType::Float32, true),
Expand Down Expand Up @@ -1970,8 +1961,7 @@ mod tests {
use_coalesce_batches: bool,
is_first_acc: bool,
) -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());

let (schema, data) = some_data_v2();
let partition1 = data[0].clone();
Expand Down
4 changes: 1 addition & 3 deletions datafusion/core/src/physical_plan/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,7 +229,6 @@ mod tests {
use arrow::datatypes::{DataType, Field, Schema};
use futures::FutureExt;

use crate::prelude::SessionContext;
use crate::{
physical_plan::collect,
test::{
Expand All @@ -242,8 +241,7 @@ mod tests {

#[tokio::test]
async fn test_drop_cancel() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));

Expand Down
39 changes: 1 addition & 38 deletions datafusion/core/src/physical_plan/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -305,46 +305,10 @@ pub fn concat_batches(
#[cfg(test)]
mod tests {
use super::*;
use crate::config::ConfigOptions;
use crate::datasource::MemTable;
use crate::physical_plan::filter::FilterExec;
use crate::physical_plan::{memory::MemoryExec, repartition::RepartitionExec};
use crate::prelude::SessionContext;
use crate::test::create_vec_batches;
use arrow::datatypes::{DataType, Field, Schema};

#[tokio::test]
async fn test_custom_batch_size() -> Result<()> {
let mut config = ConfigOptions::new();
config.execution.batch_size = 1234;

let ctx = SessionContext::with_config(config.into());
let plan = create_physical_plan(ctx).await?;
let coalesce = plan.as_any().downcast_ref::<CoalesceBatchesExec>().unwrap();
assert_eq!(1234, coalesce.target_batch_size);
Ok(())
}

#[tokio::test]
async fn test_disable_coalesce() -> Result<()> {
let mut config = ConfigOptions::new();
config.execution.coalesce_batches = false;

let ctx = SessionContext::with_config(config.into());
let plan = create_physical_plan(ctx).await?;
let _filter = plan.as_any().downcast_ref::<FilterExec>().unwrap();
Ok(())
}

async fn create_physical_plan(ctx: SessionContext) -> Result<Arc<dyn ExecutionPlan>> {
let schema = test_schema();
let partition = create_vec_batches(&schema, 10);
let table = MemTable::try_new(schema, vec![partition])?;
ctx.register_table("a", Arc::new(table))?;
let dataframe = ctx.sql("SELECT * FROM a WHERE c0 < 1").await?;
dataframe.create_physical_plan().await
}

#[tokio::test(flavor = "multi_thread")]
async fn test_concat_batches() -> Result<()> {
let schema = test_schema();
Expand Down Expand Up @@ -385,10 +349,9 @@ mod tests {
// execute and collect results
let output_partition_count = exec.output_partitioning().partition_count();
let mut output_partitions = Vec::with_capacity(output_partition_count);
let session_ctx = SessionContext::new();
for i in 0..output_partition_count {
// execute this *output* partition and collect all batches
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());
let mut stream = exec.execute(i, task_ctx.clone())?;
let mut batches = vec![];
while let Some(result) = stream.next().await {
Expand Down
10 changes: 3 additions & 7 deletions datafusion/core/src/physical_plan/coalesce_partitions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -176,16 +176,14 @@ mod tests {

use super::*;
use crate::physical_plan::{collect, common};
use crate::prelude::SessionContext;
use crate::test::exec::{
assert_strong_count_converges_to_zero, BlockingExec, PanicExec,
};
use crate::test::{self, assert_is_pending};

#[tokio::test]
async fn merge() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());

let num_partitions = 4;
let csv = test::scan_partitioned_csv(num_partitions)?;
Expand All @@ -212,8 +210,7 @@ mod tests {

#[tokio::test]
async fn test_drop_cancel() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));

Expand All @@ -235,8 +232,7 @@ mod tests {
#[tokio::test]
#[should_panic(expected = "PanickingStream did panic")]
async fn test_panic() {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());
let schema =
Arc::new(Schema::new(vec![Field::new("a", DataType::Float32, true)]));

Expand Down
13 changes: 4 additions & 9 deletions datafusion/core/src/physical_plan/empty.rs
Original file line number Diff line number Diff line change
Expand Up @@ -174,13 +174,11 @@ impl ExecutionPlan for EmptyExec {
mod tests {
use super::*;
use crate::physical_plan::with_new_children_if_necessary;
use crate::prelude::SessionContext;
use crate::{physical_plan::common, test_util};

#[tokio::test]
async fn empty() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());
let schema = test_util::aggr_test_schema();

let empty = EmptyExec::new(false, schema.clone());
Expand Down Expand Up @@ -217,8 +215,7 @@ mod tests {

#[tokio::test]
async fn invalid_execute() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());
let schema = test_util::aggr_test_schema();
let empty = EmptyExec::new(false, schema);

Expand All @@ -230,8 +227,7 @@ mod tests {

#[tokio::test]
async fn produce_one_row() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());
let schema = test_util::aggr_test_schema();
let empty = EmptyExec::new(true, schema);

Expand All @@ -246,8 +242,7 @@ mod tests {

#[tokio::test]
async fn produce_one_row_multiple_partition() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());
let schema = test_util::aggr_test_schema();
let partitions = 3;
let empty = EmptyExec::new(true, schema).with_partitions(partitions);
Expand Down
4 changes: 1 addition & 3 deletions datafusion/core/src/physical_plan/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -381,7 +381,6 @@ mod tests {
use crate::physical_plan::expressions::*;
use crate::physical_plan::ExecutionPlan;
use crate::physical_plan::{collect, with_new_children_if_necessary};
use crate::prelude::SessionContext;
use crate::test;
use crate::test::exec::StatisticsExec;
use crate::test_util;
Expand All @@ -395,8 +394,7 @@ mod tests {

#[tokio::test]
async fn simple_predicate() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());
let schema = test_util::aggr_test_schema();

let partitions = 4;
Expand Down
9 changes: 3 additions & 6 deletions datafusion/core/src/physical_plan/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -460,7 +460,6 @@ mod tests {
use crate::assert_batches_sorted_eq;
use crate::common::assert_contains;
use crate::physical_plan::common;
use crate::prelude::{SessionConfig, SessionContext};
use crate::test::{build_table_scan_i32, columns};
use datafusion_execution::runtime_env::{RuntimeConfig, RuntimeEnv};

Expand Down Expand Up @@ -617,8 +616,7 @@ mod tests {

#[tokio::test]
async fn test_join() -> Result<()> {
let session_ctx = SessionContext::new();
let task_ctx = session_ctx.task_ctx();
let task_ctx = Arc::new(TaskContext::default());

let left = build_table_scan_i32(
("a1", &vec![1, 2, 3]),
Expand Down Expand Up @@ -656,9 +654,8 @@ mod tests {
async fn test_overallocation() -> Result<()> {
let runtime_config = RuntimeConfig::new().with_memory_limit(100, 1.0);
let runtime = Arc::new(RuntimeEnv::new(runtime_config)?);
let session_ctx =
SessionContext::with_config_rt(SessionConfig::default(), runtime);
let task_ctx = session_ctx.task_ctx();
let task_ctx = TaskContext::default().with_runtime(runtime);
let task_ctx = Arc::new(task_ctx);

let left = build_table_scan_i32(
("a1", &vec![1, 2, 3, 4, 5, 6, 7, 8, 9, 0]),
Expand Down
Loading

0 comments on commit a08ce40

Please sign in to comment.