Skip to content

Commit

Permalink
with preserve order now receives argument (#7231)
Browse files Browse the repository at this point in the history
  • Loading branch information
mustafasrepo authored Aug 8, 2023
1 parent 43f182f commit 19988a8
Show file tree
Hide file tree
Showing 3 changed files with 8 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ fn get_updated_plan(
let child = plan.children()[0].clone();
plan = Arc::new(
RepartitionExec::try_new(child, plan.output_partitioning())?
.with_preserve_order(),
.with_preserve_order(true),
) as _
}
// When the input of a `CoalescePartitionsExec` has an ordering, replace it
Expand Down
12 changes: 5 additions & 7 deletions datafusion/core/src/physical_plan/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -419,11 +419,9 @@ impl ExecutionPlan for RepartitionExec {
self: Arc<Self>,
children: Vec<Arc<dyn ExecutionPlan>>,
) -> Result<Arc<dyn ExecutionPlan>> {
let mut repartition =
RepartitionExec::try_new(children[0].clone(), self.partitioning.clone())?;
if self.preserve_order {
repartition = repartition.with_preserve_order();
}
let repartition =
RepartitionExec::try_new(children[0].clone(), self.partitioning.clone())?
.with_preserve_order(self.preserve_order);
Ok(Arc::new(repartition))
}

Expand Down Expand Up @@ -625,8 +623,8 @@ impl RepartitionExec {
}

/// Set Order preserving flag
pub fn with_preserve_order(mut self) -> Self {
self.preserve_order = true;
pub fn with_preserve_order(mut self, preserve_order: bool) -> Self {
self.preserve_order = preserve_order;
self
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ mod sp_repartition_fuzz_tests {
Arc::new(
RepartitionExec::try_new(input, Partitioning::RoundRobinBatch(2))
.unwrap()
.with_preserve_order(),
.with_preserve_order(true),
)
}

Expand All @@ -159,7 +159,7 @@ mod sp_repartition_fuzz_tests {
Arc::new(
RepartitionExec::try_new(input, Partitioning::Hash(hash_expr, 2))
.unwrap()
.with_preserve_order(),
.with_preserve_order(true),
)
}

Expand Down

0 comments on commit 19988a8

Please sign in to comment.