Skip to content

Commit

Permalink
Merge pull request #4 from yjshen/merge_fuzz
Browse files Browse the repository at this point in the history
skip empty batch while inserting
  • Loading branch information
alamb authored Jan 26, 2022
2 parents e7b6d7a + 67c6a83 commit 645e0c8
Showing 1 changed file with 41 additions and 28 deletions.
69 changes: 41 additions & 28 deletions datafusion/src/physical_plan/sorts/sort_preserving_merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -410,40 +410,53 @@ impl SortPreservingMergeStream {
// Cursor is not finished - don't need a new RecordBatch yet
return Poll::Ready(Ok(()));
}
let mut streams = self.streams.streams.lock().unwrap();
let mut empty_batch = false;
{
let mut streams = self.streams.streams.lock().unwrap();

let stream = &mut streams[idx];
if stream.is_terminated() {
return Poll::Ready(Ok(()));
}

// Fetch a new input record and create a cursor from it
match futures::ready!(stream.poll_next_unpin(cx)) {
None => return Poll::Ready(Ok(())),
Some(Err(e)) => {
return Poll::Ready(Err(e));
let stream = &mut streams[idx];
if stream.is_terminated() {
return Poll::Ready(Ok(()));
}
Some(Ok(batch)) => {
let cursor = match SortKeyCursor::new(
idx,
self.next_batch_id, // assign this batch an ID
&batch,
&self.column_expressions,
self.sort_options.clone(),
) {
Ok(cursor) => cursor,
Err(e) => {
return Poll::Ready(Err(ArrowError::ExternalError(Box::new(e))));

// Fetch a new input record and create a cursor from it
match futures::ready!(stream.poll_next_unpin(cx)) {
None => return Poll::Ready(Ok(())),
Some(Err(e)) => {
return Poll::Ready(Err(e));
}
Some(Ok(batch)) => {
if batch.num_rows() > 0 {
let cursor = match SortKeyCursor::new(
idx,
self.next_batch_id, // assign this batch an ID
&batch,
&self.column_expressions,
self.sort_options.clone(),
) {
Ok(cursor) => cursor,
Err(e) => {
return Poll::Ready(Err(ArrowError::ExternalError(
Box::new(e),
)));
}
};
self.next_batch_id += 1;
self.min_heap.push(cursor);
self.cursor_finished[idx] = false;
self.batches[idx].push_back(batch)
} else {
empty_batch = true;
}
};
self.next_batch_id += 1;
self.min_heap.push(cursor);
self.cursor_finished[idx] = false;
self.batches[idx].push_back(batch)
}
}
}

Poll::Ready(Ok(()))
if empty_batch {
self.maybe_poll_stream(cx, idx)
} else {
Poll::Ready(Ok(()))
}
}

/// Drains the in_progress row indexes, and builds a new RecordBatch from them
Expand Down

0 comments on commit 645e0c8

Please sign in to comment.