From d4730bfad598920017719ff663f14defca01e515 Mon Sep 17 00:00:00 2001 From: jiangzhx Date: Fri, 24 Feb 2023 19:04:19 +0800 Subject: [PATCH 1/4] add describe method like polars --- datafusion-examples/examples/dataframe.rs | 9 ++ datafusion/core/src/dataframe.rs | 181 +++++++++++++++++++++- datafusion/core/tests/dataframe.rs | 30 +++- 3 files changed, 218 insertions(+), 2 deletions(-) diff --git a/datafusion-examples/examples/dataframe.rs b/datafusion-examples/examples/dataframe.rs index f52ff8925612..e44f46af8f5a 100644 --- a/datafusion-examples/examples/dataframe.rs +++ b/datafusion-examples/examples/dataframe.rs @@ -49,6 +49,15 @@ async fn main() -> Result<()> { let csv_df = example_read_csv_file_with_schema().await; csv_df.show().await?; + // Reading PARQUET file and print describe + let parquet_df = ctx + .read_parquet( + &format!("{testdata}/alltypes_plain.parquet"), + ParquetReadOptions::default(), + ) + .await?; + parquet_df.describe().await?; + Ok(()) } diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index 3d00c684550a..b600b81e6c27 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -20,11 +20,15 @@ use std::any::Any; use std::sync::Arc; -use arrow::array::Int64Array; +use arrow::array::{ArrayRef, Int64Array, StringArray}; +use arrow::compute::{cast, concat}; +use arrow::datatypes::{DataType, Field}; +use arrow::util::pretty::print_batches; use async_trait::async_trait; use datafusion_common::DataFusionError; use parquet::file::properties::WriterProperties; +use datafusion_common::from_slice::FromSlice; use datafusion_common::{Column, DFSchema, ScalarValue}; use datafusion_expr::TableProviderFilterPushDown; @@ -302,6 +306,181 @@ impl DataFrame { )) } + /// Summary statistics for a DataFrame. Only summarizes numeric datatypes at the moment and + /// returns nulls for non numeric datatypes. Try in keep output similar to pandas + /// + /// ``` + /// # use datafusion::prelude::*; + /// # use datafusion::error::Result; + /// # use arrow::util::pretty; + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// let ctx = SessionContext::new(); + /// let df = ctx.read_csv("tests/tpch-csv/customer.csv", CsvReadOptions::new()).await?; + /// df.describe().await?; + /// + /// # Ok(()) + /// # } + /// ``` + pub async fn describe(self) -> Result<()> { + Ok(print_batches( + &self.clone().collect_describe().await.unwrap(), + )?) + } + + /// Summary statistics for a DataFrame. Only summarizes numeric datatypes at the moment and + /// returns nulls for non numeric datatypes. Try in keep output similar to pandas + /// + /// ``` + /// # use datafusion::prelude::*; + /// # use datafusion::error::Result; + /// # use arrow::util::pretty; + /// # #[tokio::main] + /// # async fn main() -> Result<()> { + /// let ctx = SessionContext::new(); + /// let df = ctx.read_csv("tests/tpch-csv/customer.csv", CsvReadOptions::new()).await?; + /// df.collect_describe().await.unwrap(); + /// + /// # Ok(()) + /// # } + /// ``` + pub async fn collect_describe(self) -> Result> { + //the functions now supported + let supported_describe_functions = vec!["count", "null_count", "max", "min"]; //"count", "max", "min", + + let fields_iter = self.schema().fields().iter(); + + //define describe column + let mut describe_schemas = fields_iter + .clone() + .map(|field| { + if field.data_type().is_numeric() { + Field::new(field.name(), DataType::Float64, true) + } else { + Field::new(field.name(), DataType::Utf8, true) + } + }) + .collect::>(); + describe_schemas.insert(0, Field::new("describe", DataType::Utf8, false)); + + //collect recordBatch + let describe_record_batch = vec![ + // count aggregation + self.clone() + .aggregate( + vec![], + fields_iter + .clone() + .map(|f| { + Expr::Alias( + Box::new(datafusion_expr::count(col(f.name()))), + f.name().to_string(), + ) + }) + .collect::>(), + )? + .collect() + .await? + .clone(), + // null_count aggregation + self.clone() + .aggregate( + vec![], + fields_iter + .clone() + .map(|f| { + Expr::Alias( + Box::new(datafusion_expr::count( + datafusion_expr::is_null(col(f.name())), + )), + f.name().to_string(), + ) + }) + .collect::>(), + )? + .collect() + .await? + .clone(), + // max aggregation + self.clone() + .aggregate( + vec![], + fields_iter + .clone() + .filter(|f| matches!(f.data_type().is_numeric(), true)) + .map(|f| { + Expr::Alias( + Box::new(datafusion_expr::max(col(f.name()))), + f.name().to_string(), + ) + }) + .collect::>(), + )? + .collect() + .await? + .clone(), + // min aggregation + self.clone() + .aggregate( + vec![], + fields_iter + .clone() + .filter(|f| matches!(f.data_type().is_numeric(), true)) + .map(|f| { + Expr::Alias( + Box::new(datafusion_expr::min(col(f.name()))), + f.name().to_string(), + ) + }) + .collect::>(), + )? + .collect() + .await? + .clone(), + ]; + + let mut array_ref_vec: Vec = vec![]; + for field in self.schema().fields().iter() { + let mut array_datas = vec![]; + for record_batch in describe_record_batch.iter() { + let column = record_batch.get(0).unwrap().column_by_name(&field.name()); + match column { + Some(c) => { + if field.data_type().is_numeric() { + array_datas.push(cast(c, &DataType::Float64)?); + } else { + array_datas.push(cast(c, &DataType::Utf8)?); + } + } + //if None mean the column cannot be min/max aggregation + None => { + array_datas.push(Arc::new(StringArray::from_slice(["null"]))); + } + } + } + + array_ref_vec.push(concat( + array_datas + .iter() + .map(|af| af.as_ref()) + .collect::>() + .as_slice(), + )?); + } + + //insert first column with function names + array_ref_vec.insert( + 0, + Arc::new(StringArray::from_slice( + supported_describe_functions.clone(), + )), + ); + + let describe_record_batch = + RecordBatch::try_new(Arc::new(Schema::new(describe_schemas)), array_ref_vec)?; + Ok(vec![describe_record_batch]) + } + /// Sort the DataFrame by the specified sorting expressions. Any expression can be turned into /// a sort expression by calling its [sort](../logical_plan/enum.Expr.html#method.sort) method. /// diff --git a/datafusion/core/tests/dataframe.rs b/datafusion/core/tests/dataframe.rs index 73d2ce3ba4ef..c36b9a770411 100644 --- a/datafusion/core/tests/dataframe.rs +++ b/datafusion/core/tests/dataframe.rs @@ -29,12 +29,40 @@ use std::sync::Arc; use datafusion::dataframe::DataFrame; use datafusion::error::Result; use datafusion::execution::context::SessionContext; -use datafusion::prelude::CsvReadOptions; use datafusion::prelude::JoinType; +use datafusion::prelude::{CsvReadOptions, ParquetReadOptions}; use datafusion::{assert_batches_eq, assert_batches_sorted_eq}; use datafusion_expr::expr::{GroupingSet, Sort}; use datafusion_expr::{avg, col, count, lit, sum, Expr, ExprSchemable}; +#[tokio::test] +async fn describe() -> Result<()> { + let ctx = SessionContext::new(); + let testdata = datafusion::test_util::parquet_test_data(); + + let filename = &format!("{testdata}/alltypes_plain.parquet"); + + let df = ctx + .read_parquet(filename, ParquetReadOptions::default()) + .await?; + + let describe_record_batch = df.collect_describe().await.unwrap(); + #[rustfmt::skip] + let expected = vec![ + "+------------+-----+----------+-------------+--------------+---------+------------+-------------------+------------+-----------------+------------+---------------+", + "| describe | id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col | string_col | timestamp_col |", + "+------------+-----+----------+-------------+--------------+---------+------------+-------------------+------------+-----------------+------------+---------------+", + "| count | 8.0 | 8 | 8.0 | 8.0 | 8.0 | 8.0 | 8.0 | 8.0 | 8 | 8 | 8 |", + "| null_count | 8.0 | 8 | 8.0 | 8.0 | 8.0 | 8.0 | 8.0 | 8.0 | 8 | 8 | 8 |", + "| max | 7.0 | null | 1.0 | 1.0 | 1.0 | 10.0 | 1.100000023841858 | 10.1 | null | null | null |", + "| min | 0.0 | null | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | null | null | null |", + "+------------+-----+----------+-------------+--------------+---------+------------+-------------------+------------+-----------------+------------+---------------+", + ]; + assert_batches_eq!(expected, &describe_record_batch); + + Ok(()) +} + #[tokio::test] async fn join() -> Result<()> { let schema1 = Arc::new(Schema::new(vec![ From c7e6091a3390a096f68c3ef02e0be02e778fe1ed Mon Sep 17 00:00:00 2001 From: jiangzhx Date: Fri, 24 Feb 2023 19:25:13 +0800 Subject: [PATCH 2/4] clippy fix --- datafusion/core/src/dataframe.rs | 14 +++++--------- 1 file changed, 5 insertions(+), 9 deletions(-) diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index b600b81e6c27..68f257338d16 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -380,8 +380,7 @@ impl DataFrame { .collect::>(), )? .collect() - .await? - .clone(), + .await?, // null_count aggregation self.clone() .aggregate( @@ -399,8 +398,7 @@ impl DataFrame { .collect::>(), )? .collect() - .await? - .clone(), + .await?, // max aggregation self.clone() .aggregate( @@ -417,8 +415,7 @@ impl DataFrame { .collect::>(), )? .collect() - .await? - .clone(), + .await?, // min aggregation self.clone() .aggregate( @@ -435,15 +432,14 @@ impl DataFrame { .collect::>(), )? .collect() - .await? - .clone(), + .await?, ]; let mut array_ref_vec: Vec = vec![]; for field in self.schema().fields().iter() { let mut array_datas = vec![]; for record_batch in describe_record_batch.iter() { - let column = record_batch.get(0).unwrap().column_by_name(&field.name()); + let column = record_batch.get(0).unwrap().column_by_name(field.name()); match column { Some(c) => { if field.data_type().is_numeric() { From aeac881fb96e880f0be86193371f8458caef174f Mon Sep 17 00:00:00 2001 From: jiangzhx Date: Mon, 27 Feb 2023 13:22:14 +0800 Subject: [PATCH 3/4] commit suggestion --- datafusion-examples/examples/dataframe.rs | 3 +- datafusion/core/src/dataframe.rs | 87 +++++++++-------------- datafusion/core/tests/dataframe.rs | 18 ++--- 3 files changed, 44 insertions(+), 64 deletions(-) diff --git a/datafusion-examples/examples/dataframe.rs b/datafusion-examples/examples/dataframe.rs index e44f46af8f5a..d19119db5208 100644 --- a/datafusion-examples/examples/dataframe.rs +++ b/datafusion-examples/examples/dataframe.rs @@ -56,7 +56,8 @@ async fn main() -> Result<()> { ParquetReadOptions::default(), ) .await?; - parquet_df.describe().await?; + parquet_df.clone().show().await?; + parquet_df.describe().await.unwrap().show().await?; Ok(()) } diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index 68f257338d16..6efa6e59ac99 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -23,20 +23,19 @@ use std::sync::Arc; use arrow::array::{ArrayRef, Int64Array, StringArray}; use arrow::compute::{cast, concat}; use arrow::datatypes::{DataType, Field}; -use arrow::util::pretty::print_batches; use async_trait::async_trait; use datafusion_common::DataFusionError; use parquet::file::properties::WriterProperties; use datafusion_common::from_slice::FromSlice; use datafusion_common::{Column, DFSchema, ScalarValue}; -use datafusion_expr::TableProviderFilterPushDown; +use datafusion_expr::{TableProviderFilterPushDown, UNNAMED_TABLE}; use crate::arrow::datatypes::Schema; use crate::arrow::datatypes::SchemaRef; use crate::arrow::record_batch::RecordBatch; use crate::arrow::util::pretty; -use crate::datasource::{MemTable, TableProvider}; +use crate::datasource::{provider_as_source, MemTable, TableProvider}; use crate::error::Result; use crate::execution::{ context::{SessionState, TaskContext}, @@ -317,36 +316,14 @@ impl DataFrame { /// # async fn main() -> Result<()> { /// let ctx = SessionContext::new(); /// let df = ctx.read_csv("tests/tpch-csv/customer.csv", CsvReadOptions::new()).await?; - /// df.describe().await?; + /// df.describe().await.unwrap(); /// /// # Ok(()) /// # } /// ``` - pub async fn describe(self) -> Result<()> { - Ok(print_batches( - &self.clone().collect_describe().await.unwrap(), - )?) - } - - /// Summary statistics for a DataFrame. Only summarizes numeric datatypes at the moment and - /// returns nulls for non numeric datatypes. Try in keep output similar to pandas - /// - /// ``` - /// # use datafusion::prelude::*; - /// # use datafusion::error::Result; - /// # use arrow::util::pretty; - /// # #[tokio::main] - /// # async fn main() -> Result<()> { - /// let ctx = SessionContext::new(); - /// let df = ctx.read_csv("tests/tpch-csv/customer.csv", CsvReadOptions::new()).await?; - /// df.collect_describe().await.unwrap(); - /// - /// # Ok(()) - /// # } - /// ``` - pub async fn collect_describe(self) -> Result> { + pub async fn describe(self) -> Result { //the functions now supported - let supported_describe_functions = vec!["count", "null_count", "max", "min"]; //"count", "max", "min", + let supported_describe_functions = vec!["count", "null_count", "max", "min"]; let fields_iter = self.schema().fields().iter(); @@ -371,12 +348,7 @@ impl DataFrame { vec![], fields_iter .clone() - .map(|f| { - Expr::Alias( - Box::new(datafusion_expr::count(col(f.name()))), - f.name().to_string(), - ) - }) + .map(|f| datafusion_expr::count(col(f.name())).alias(f.name())) .collect::>(), )? .collect() @@ -388,12 +360,10 @@ impl DataFrame { fields_iter .clone() .map(|f| { - Expr::Alias( - Box::new(datafusion_expr::count( - datafusion_expr::is_null(col(f.name())), - )), - f.name().to_string(), - ) + datafusion_expr::count(datafusion_expr::is_null( + col(f.name()), + )) + .alias(f.name()) }) .collect::>(), )? @@ -405,13 +375,10 @@ impl DataFrame { vec![], fields_iter .clone() - .filter(|f| matches!(f.data_type().is_numeric(), true)) - .map(|f| { - Expr::Alias( - Box::new(datafusion_expr::max(col(f.name()))), - f.name().to_string(), - ) + .filter(|f| { + !matches!(f.data_type(), DataType::Binary | DataType::Boolean) }) + .map(|f| datafusion_expr::max(col(f.name())).alias(f.name())) .collect::>(), )? .collect() @@ -422,13 +389,10 @@ impl DataFrame { vec![], fields_iter .clone() - .filter(|f| matches!(f.data_type().is_numeric(), true)) - .map(|f| { - Expr::Alias( - Box::new(datafusion_expr::min(col(f.name()))), - f.name().to_string(), - ) + .filter(|f| { + !matches!(f.data_type(), DataType::Binary | DataType::Boolean) }) + .map(|f| datafusion_expr::min(col(f.name())).alias(f.name())) .collect::>(), )? .collect() @@ -436,7 +400,7 @@ impl DataFrame { ]; let mut array_ref_vec: Vec = vec![]; - for field in self.schema().fields().iter() { + for field in fields_iter { let mut array_datas = vec![]; for record_batch in describe_record_batch.iter() { let column = record_batch.get(0).unwrap().column_by_name(field.name()); @@ -474,7 +438,22 @@ impl DataFrame { let describe_record_batch = RecordBatch::try_new(Arc::new(Schema::new(describe_schemas)), array_ref_vec)?; - Ok(vec![describe_record_batch]) + + let provider = MemTable::try_new( + describe_record_batch.schema(), + vec![vec![describe_record_batch]], + )?; + Ok(DataFrame::new( + self.session_state, + LogicalPlanBuilder::scan( + UNNAMED_TABLE, + provider_as_source(Arc::new(provider)), + None, + )? + .build()?, + )) + + // Ok(vec![describe_record_batch]) } /// Sort the DataFrame by the specified sorting expressions. Any expression can be turned into diff --git a/datafusion/core/tests/dataframe.rs b/datafusion/core/tests/dataframe.rs index c36b9a770411..4ac528be3a6e 100644 --- a/datafusion/core/tests/dataframe.rs +++ b/datafusion/core/tests/dataframe.rs @@ -46,17 +46,17 @@ async fn describe() -> Result<()> { .read_parquet(filename, ParquetReadOptions::default()) .await?; - let describe_record_batch = df.collect_describe().await.unwrap(); + let describe_record_batch = df.describe().await.unwrap().collect().await.unwrap(); #[rustfmt::skip] let expected = vec![ - "+------------+-----+----------+-------------+--------------+---------+------------+-------------------+------------+-----------------+------------+---------------+", - "| describe | id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col | string_col | timestamp_col |", - "+------------+-----+----------+-------------+--------------+---------+------------+-------------------+------------+-----------------+------------+---------------+", - "| count | 8.0 | 8 | 8.0 | 8.0 | 8.0 | 8.0 | 8.0 | 8.0 | 8 | 8 | 8 |", - "| null_count | 8.0 | 8 | 8.0 | 8.0 | 8.0 | 8.0 | 8.0 | 8.0 | 8 | 8 | 8 |", - "| max | 7.0 | null | 1.0 | 1.0 | 1.0 | 10.0 | 1.100000023841858 | 10.1 | null | null | null |", - "| min | 0.0 | null | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | null | null | null |", - "+------------+-----+----------+-------------+--------------+---------+------------+-------------------+------------+-----------------+------------+---------------+", + "+------------+-----+----------+-------------+--------------+---------+------------+-------------------+------------+-----------------+------------+---------------------+", + "| describe | id | bool_col | tinyint_col | smallint_col | int_col | bigint_col | float_col | double_col | date_string_col | string_col | timestamp_col |", + "+------------+-----+----------+-------------+--------------+---------+------------+-------------------+------------+-----------------+------------+---------------------+", + "| count | 8.0 | 8 | 8.0 | 8.0 | 8.0 | 8.0 | 8.0 | 8.0 | 8 | 8 | 8 |", + "| null_count | 8.0 | 8 | 8.0 | 8.0 | 8.0 | 8.0 | 8.0 | 8.0 | 8 | 8 | 8 |", + "| max | 7.0 | null | 1.0 | 1.0 | 1.0 | 10.0 | 1.100000023841858 | 10.1 | null | null | 2009-04-01T00:01:00 |", + "| min | 0.0 | null | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | 0.0 | null | null | 2009-01-01T00:00:00 |", + "+------------+-----+----------+-------------+--------------+---------+------------+-------------------+------------+-----------------+------------+---------------------+", ]; assert_batches_eq!(expected, &describe_record_batch); From ae848b1d7d9f62b9398a2fe43f2bf185453a3559 Mon Sep 17 00:00:00 2001 From: jiangzhx Date: Mon, 27 Feb 2023 13:26:03 +0800 Subject: [PATCH 4/4] fix typos --- datafusion-examples/examples/dataframe.rs | 1 - datafusion/core/src/dataframe.rs | 2 -- 2 files changed, 3 deletions(-) diff --git a/datafusion-examples/examples/dataframe.rs b/datafusion-examples/examples/dataframe.rs index d19119db5208..027ff9970057 100644 --- a/datafusion-examples/examples/dataframe.rs +++ b/datafusion-examples/examples/dataframe.rs @@ -56,7 +56,6 @@ async fn main() -> Result<()> { ParquetReadOptions::default(), ) .await?; - parquet_df.clone().show().await?; parquet_df.describe().await.unwrap().show().await?; Ok(()) diff --git a/datafusion/core/src/dataframe.rs b/datafusion/core/src/dataframe.rs index 6efa6e59ac99..76598acb52c5 100644 --- a/datafusion/core/src/dataframe.rs +++ b/datafusion/core/src/dataframe.rs @@ -452,8 +452,6 @@ impl DataFrame { )? .build()?, )) - - // Ok(vec![describe_record_batch]) } /// Sort the DataFrame by the specified sorting expressions. Any expression can be turned into