Skip to content

Commit

Permalink
minor: remove redundant code/TODO (#4019)
Browse files Browse the repository at this point in the history
* minor: remove redundant prefix.

* join already is added.

* more

* fmt
  • Loading branch information
jackwener authored Oct 31, 2022
1 parent f91eddf commit 8f1ae58
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 17 deletions.
6 changes: 1 addition & 5 deletions datafusion/core/src/dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -362,8 +362,6 @@ impl DataFrame {
Ok(Arc::new(DataFrame::new(self.session_state.clone(), &plan)))
}

// TODO: add join_using

/// Repartition a DataFrame based on a logical partitioning scheme.
///
/// ```
Expand Down Expand Up @@ -1386,9 +1384,7 @@ mod tests {

let plan = df.explain(false, false)?.collect().await?;
// Filters all the way to Parquet
let formatted = arrow::util::pretty::pretty_format_batches(&plan)
.unwrap()
.to_string();
let formatted = pretty::pretty_format_batches(&plan).unwrap().to_string();
assert!(formatted.contains("predicate=id_min@0 <= 1 AND 1 <= id_max@1"));

Ok(())
Expand Down
10 changes: 3 additions & 7 deletions datafusion/core/src/physical_plan/joins/hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -348,11 +348,7 @@ impl ExecutionPlan for HashJoinExec {
}))
}

fn fmt_as(
&self,
t: DisplayFormatType,
f: &mut std::fmt::Formatter,
) -> std::fmt::Result {
fn fmt_as(&self, t: DisplayFormatType, f: &mut fmt::Formatter) -> fmt::Result {
match t {
DisplayFormatType::Default => {
let display_filter = self.filter.as_ref().map_or_else(
Expand Down Expand Up @@ -1361,7 +1357,7 @@ fn produce_from_matched(
}
JoinSide::Right => {
let datatype = schema.field(idx).data_type();
arrow::array::new_null_array(datatype, num_rows)
new_null_array(datatype, num_rows)
}
};

Expand All @@ -1376,7 +1372,7 @@ impl HashJoinStream {
fn poll_next_impl(
&mut self,
cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Option<ArrowResult<RecordBatch>>> {
) -> Poll<Option<ArrowResult<RecordBatch>>> {
let left_data = match ready!(self.left_fut.get(cx)) {
Ok(left_data) => left_data,
Err(e) => return Poll::Ready(Some(Err(e))),
Expand Down
7 changes: 2 additions & 5 deletions datafusion/core/src/test/exec.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ use crate::{
/// Index into the data that has been returned so far
#[derive(Debug, Default, Clone)]
pub struct BatchIndex {
inner: std::sync::Arc<std::sync::Mutex<usize>>,
inner: Arc<std::sync::Mutex<usize>>,
}

impl BatchIndex {
Expand Down Expand Up @@ -91,10 +91,7 @@ impl TestStream {
impl Stream for TestStream {
type Item = ArrowResult<RecordBatch>;

fn poll_next(
self: std::pin::Pin<&mut Self>,
_: &mut Context<'_>,
) -> Poll<Option<Self::Item>> {
fn poll_next(self: Pin<&mut Self>, _: &mut Context<'_>) -> Poll<Option<Self::Item>> {
let next_batch = self.index.value();

Poll::Ready(if next_batch < self.data.len() {
Expand Down

0 comments on commit 8f1ae58

Please sign in to comment.