From a4106d93a9212e4b5ef3bed06605fa2632c51470 Mon Sep 17 00:00:00 2001 From: Andrew Lamb Date: Wed, 19 Jul 2023 06:35:33 -0400 Subject: [PATCH] fmt --- .../src/physical_plan/aggregates/row_hash.rs | 19 ++++++++++++------- 1 file changed, 12 insertions(+), 7 deletions(-) diff --git a/datafusion/core/src/physical_plan/aggregates/row_hash.rs b/datafusion/core/src/physical_plan/aggregates/row_hash.rs index 100aac303d7cb..122c34c2da18d 100644 --- a/datafusion/core/src/physical_plan/aggregates/row_hash.rs +++ b/datafusion/core/src/physical_plan/aggregates/row_hash.rs @@ -348,7 +348,7 @@ impl Stream for GroupedHashAggregateStream { // return error, otherwise loop again if let Err(e) = res { - return Poll::Ready(Some(Err(e))) + return Poll::Ready(Some(Err(e))); } } } @@ -357,10 +357,14 @@ impl Stream for GroupedHashAggregateStream { impl GroupedHashAggregateStream { /// Processes the next batch of input, updating /// self.execution_state appropriately - fn next_input(&mut self, input: Option>, elapsed_compute: &Time) -> Result<()>{ + fn next_input( + &mut self, + input: Option>, + elapsed_compute: &Time, + ) -> Result<()> { assert!(matches!(&self.exec_state, ExecutionState::ReadingInput)); match input { - // new batch to aggregate + // new batch to aggregate Some(batch) => { let batch = batch?; let timer = elapsed_compute.timer(); @@ -383,7 +387,7 @@ impl GroupedHashAggregateStream { self.input_done = true; self.group_ordering.input_done(); let timer = elapsed_compute.timer(); - let batch =self.create_batch_from_map(EmitTo::All)?; + let batch = self.create_batch_from_map(EmitTo::All)?; self.exec_state = ExecutionState::ProducingOutput(batch); timer.done(); } @@ -394,7 +398,10 @@ impl GroupedHashAggregateStream { /// Produces the next batch of output from ExecutionState::ProductingOuptut, updating self.execution_state /// appropriately fn next_output(&mut self, batch: RecordBatch) -> RecordBatch { - assert!(matches!(&self.exec_state, ExecutionState::ProducingOutput(_))); + assert!(matches!( + &self.exec_state, + ExecutionState::ProducingOutput(_) + )); // slice off a part of the batch, if needed let output_batch = if batch.num_rows() <= self.batch_size { if self.input_done { @@ -412,10 +419,8 @@ impl GroupedHashAggregateStream { }; output_batch.record_output(&self.baseline_metrics) } - } - impl RecordBatchStream for GroupedHashAggregateStream { fn schema(&self) -> SchemaRef { self.schema.clone()