Skip to content

Commit

Permalink
fmt
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Jul 19, 2023
1 parent 5188c54 commit a4106d9
Showing 1 changed file with 12 additions and 7 deletions.
19 changes: 12 additions & 7 deletions datafusion/core/src/physical_plan/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ impl Stream for GroupedHashAggregateStream {

// return error, otherwise loop again
if let Err(e) = res {
return Poll::Ready(Some(Err(e)))
return Poll::Ready(Some(Err(e)));
}
}
}
Expand All @@ -357,10 +357,14 @@ impl Stream for GroupedHashAggregateStream {
impl GroupedHashAggregateStream {
/// Processes the next batch of input, updating
/// self.execution_state appropriately
fn next_input(&mut self, input: Option<Result<RecordBatch>>, elapsed_compute: &Time) -> Result<()>{
fn next_input(
&mut self,
input: Option<Result<RecordBatch>>,
elapsed_compute: &Time,
) -> Result<()> {
assert!(matches!(&self.exec_state, ExecutionState::ReadingInput));
match input {
// new batch to aggregate
// new batch to aggregate
Some(batch) => {
let batch = batch?;
let timer = elapsed_compute.timer();
Expand All @@ -383,7 +387,7 @@ impl GroupedHashAggregateStream {
self.input_done = true;
self.group_ordering.input_done();
let timer = elapsed_compute.timer();
let batch =self.create_batch_from_map(EmitTo::All)?;
let batch = self.create_batch_from_map(EmitTo::All)?;
self.exec_state = ExecutionState::ProducingOutput(batch);
timer.done();
}
Expand All @@ -394,7 +398,10 @@ impl GroupedHashAggregateStream {
/// Produces the next batch of output from ExecutionState::ProductingOuptut, updating self.execution_state
/// appropriately
fn next_output(&mut self, batch: RecordBatch) -> RecordBatch {
assert!(matches!(&self.exec_state, ExecutionState::ProducingOutput(_)));
assert!(matches!(
&self.exec_state,
ExecutionState::ProducingOutput(_)
));
// slice off a part of the batch, if needed
let output_batch = if batch.num_rows() <= self.batch_size {
if self.input_done {
Expand All @@ -412,10 +419,8 @@ impl GroupedHashAggregateStream {
};
output_batch.record_output(&self.baseline_metrics)
}

}


impl RecordBatchStream for GroupedHashAggregateStream {
fn schema(&self) -> SchemaRef {
self.schema.clone()
Expand Down

0 comments on commit a4106d9

Please sign in to comment.