From 67c6a83983f771b730438276e9af8a3f070b81f9 Mon Sep 17 00:00:00 2001 From: Yijie Shen Date: Wed, 26 Jan 2022 09:21:49 +0800 Subject: [PATCH] skip empty batch while inserting --- .../sorts/sort_preserving_merge.rs | 69 +++++++++++-------- 1 file changed, 41 insertions(+), 28 deletions(-) diff --git a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs index d6a578766fa8..f950526dfa7e 100644 --- a/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs +++ b/datafusion/src/physical_plan/sorts/sort_preserving_merge.rs @@ -410,40 +410,53 @@ impl SortPreservingMergeStream { // Cursor is not finished - don't need a new RecordBatch yet return Poll::Ready(Ok(())); } - let mut streams = self.streams.streams.lock().unwrap(); + let mut empty_batch = false; + { + let mut streams = self.streams.streams.lock().unwrap(); - let stream = &mut streams[idx]; - if stream.is_terminated() { - return Poll::Ready(Ok(())); - } - - // Fetch a new input record and create a cursor from it - match futures::ready!(stream.poll_next_unpin(cx)) { - None => return Poll::Ready(Ok(())), - Some(Err(e)) => { - return Poll::Ready(Err(e)); + let stream = &mut streams[idx]; + if stream.is_terminated() { + return Poll::Ready(Ok(())); } - Some(Ok(batch)) => { - let cursor = match SortKeyCursor::new( - idx, - self.next_batch_id, // assign this batch an ID - &batch, - &self.column_expressions, - self.sort_options.clone(), - ) { - Ok(cursor) => cursor, - Err(e) => { - return Poll::Ready(Err(ArrowError::ExternalError(Box::new(e)))); + + // Fetch a new input record and create a cursor from it + match futures::ready!(stream.poll_next_unpin(cx)) { + None => return Poll::Ready(Ok(())), + Some(Err(e)) => { + return Poll::Ready(Err(e)); + } + Some(Ok(batch)) => { + if batch.num_rows() > 0 { + let cursor = match SortKeyCursor::new( + idx, + self.next_batch_id, // assign this batch an ID + &batch, + &self.column_expressions, + self.sort_options.clone(), + ) { + Ok(cursor) => cursor, + Err(e) => { + return Poll::Ready(Err(ArrowError::ExternalError( + Box::new(e), + ))); + } + }; + self.next_batch_id += 1; + self.min_heap.push(cursor); + self.cursor_finished[idx] = false; + self.batches[idx].push_back(batch) + } else { + empty_batch = true; } - }; - self.next_batch_id += 1; - self.min_heap.push(cursor); - self.cursor_finished[idx] = false; - self.batches[idx].push_back(batch) + } } } - Poll::Ready(Ok(())) + if empty_batch { + self.maybe_poll_stream(cx, idx) + } else { + Poll::Ready(Ok(())) + } } /// Drains the in_progress row indexes, and builds a new RecordBatch from them