From bf9345f050beb017909a8b6a45515d037f97a931 Mon Sep 17 00:00:00 2001 From: xyz Date: Sun, 22 Jan 2023 07:05:31 +0800 Subject: [PATCH] [BugFix] fix explain csv/json/avro exec can not see metrics bug When we executing explain analyze select * from tablexxx, csv/json/avro exev metrics is empty. This bug is introduced by not implementing metrics traits in csv/json/avro. Signed-off-by: xyz --- .../src/physical_plan/file_format/avro.rs | 8 ++++-- .../core/src/physical_plan/file_format/csv.rs | 26 +++++++++++++++++-- .../physical_plan/file_format/file_stream.rs | 11 ++++---- .../src/physical_plan/file_format/json.rs | 8 ++++-- .../src/physical_plan/file_format/parquet.rs | 8 ++---- 5 files changed, 43 insertions(+), 18 deletions(-) diff --git a/datafusion/core/src/physical_plan/file_format/avro.rs b/datafusion/core/src/physical_plan/file_format/avro.rs index f67906e8a187..cc797227e3ec 100644 --- a/datafusion/core/src/physical_plan/file_format/avro.rs +++ b/datafusion/core/src/physical_plan/file_format/avro.rs @@ -24,7 +24,7 @@ use crate::physical_plan::{ use arrow::datatypes::SchemaRef; use crate::execution::context::TaskContext; -use crate::physical_plan::metrics::ExecutionPlanMetricsSet; +use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use std::any::Any; use std::sync::Arc; @@ -122,7 +122,7 @@ impl ExecutionPlan for AvroExec { let opener = private::AvroOpener { config }; let stream = - FileStream::new(&self.base_config, partition, opener, self.metrics.clone())?; + FileStream::new(&self.base_config, partition, opener, &self.metrics)?; Ok(Box::pin(stream)) } @@ -146,6 +146,10 @@ impl ExecutionPlan for AvroExec { fn statistics(&self) -> Statistics { self.projected_statistics.clone() } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } } #[cfg(feature = "avro")] diff --git a/datafusion/core/src/physical_plan/file_format/csv.rs b/datafusion/core/src/physical_plan/file_format/csv.rs index 769ceeb05079..6c3b1371ae2e 100644 --- a/datafusion/core/src/physical_plan/file_format/csv.rs +++ b/datafusion/core/src/physical_plan/file_format/csv.rs @@ -26,7 +26,7 @@ use crate::physical_plan::file_format::file_stream::{ FileOpenFuture, FileOpener, FileStream, }; use crate::physical_plan::file_format::FileMeta; -use crate::physical_plan::metrics::ExecutionPlanMetricsSet; +use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; @@ -153,7 +153,7 @@ impl ExecutionPlan for CsvExec { file_compression_type: self.file_compression_type.to_owned(), }; let stream = - FileStream::new(&self.base_config, partition, opener, self.metrics.clone())?; + FileStream::new(&self.base_config, partition, opener, &self.metrics)?; Ok(Box::pin(stream) as SendableRecordBatchStream) } @@ -179,6 +179,10 @@ impl ExecutionPlan for CsvExec { fn statistics(&self) -> Statistics { self.projected_statistics.clone() } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } } #[derive(Debug, Clone)] @@ -515,6 +519,13 @@ mod tests { "+----+------------+", ]; crate::assert_batches_eq!(expected, &[batch.slice(0, 5)]); + + let metrics = csv.metrics().expect("doesn't found metrics"); + let time_elapsed_processing = get_value(&metrics, "time_elapsed_processing"); + assert!( + time_elapsed_processing > 0, + "Expected time_elapsed_processing greater than 0", + ); Ok(()) } @@ -676,4 +687,15 @@ mod tests { Ok(()) } + + fn get_value(metrics: &MetricsSet, metric_name: &str) -> usize { + match metrics.sum_by_name(metric_name) { + Some(v) => v.as_usize(), + _ => { + panic!( + "Expected metric not found. Looking for '{metric_name}' in\n\n{metrics:#?}" + ); + } + } + } } diff --git a/datafusion/core/src/physical_plan/file_format/file_stream.rs b/datafusion/core/src/physical_plan/file_format/file_stream.rs index 3d09377bf81b..265ff7a4f59f 100644 --- a/datafusion/core/src/physical_plan/file_format/file_stream.rs +++ b/datafusion/core/src/physical_plan/file_format/file_stream.rs @@ -166,7 +166,7 @@ impl FileStream { config: &FileScanConfig, partition: usize, file_reader: F, - metrics: ExecutionPlanMetricsSet, + metrics: &ExecutionPlanMetricsSet, ) -> Result { let (projected_schema, _) = config.project(); let pc_projector = PartitionColumnProjector::new( @@ -187,8 +187,8 @@ impl FileStream { file_reader, pc_projector, state: FileStreamState::Idle, - file_stream_metrics: FileStreamMetrics::new(&metrics, partition), - baseline_metrics: BaselineMetrics::new(&metrics, partition), + file_stream_metrics: FileStreamMetrics::new(metrics, partition), + baseline_metrics: BaselineMetrics::new(metrics, partition), }) } @@ -353,9 +353,8 @@ mod tests { output_ordering: None, infinite_source: false, }; - - let file_stream = - FileStream::new(&config, 0, reader, ExecutionPlanMetricsSet::new()).unwrap(); + let metrics_set = ExecutionPlanMetricsSet::new(); + let file_stream = FileStream::new(&config, 0, reader, &metrics_set).unwrap(); file_stream .map(|b| b.expect("No error expected in stream")) diff --git a/datafusion/core/src/physical_plan/file_format/json.rs b/datafusion/core/src/physical_plan/file_format/json.rs index 8184d3f1ac45..633da67e93e2 100644 --- a/datafusion/core/src/physical_plan/file_format/json.rs +++ b/datafusion/core/src/physical_plan/file_format/json.rs @@ -26,7 +26,7 @@ use crate::physical_plan::file_format::file_stream::{ FileOpenFuture, FileOpener, FileStream, }; use crate::physical_plan::file_format::FileMeta; -use crate::physical_plan::metrics::ExecutionPlanMetricsSet; +use crate::physical_plan::metrics::{ExecutionPlanMetricsSet, MetricsSet}; use crate::physical_plan::{ DisplayFormatType, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, }; @@ -134,7 +134,7 @@ impl ExecutionPlan for NdJsonExec { }; let stream = - FileStream::new(&self.base_config, partition, opener, self.metrics.clone())?; + FileStream::new(&self.base_config, partition, opener, &self.metrics)?; Ok(Box::pin(stream) as SendableRecordBatchStream) } @@ -159,6 +159,10 @@ impl ExecutionPlan for NdJsonExec { fn statistics(&self) -> Statistics { self.projected_statistics.clone() } + + fn metrics(&self) -> Option { + Some(self.metrics.clone_inner()) + } } struct JsonOpener { diff --git a/datafusion/core/src/physical_plan/file_format/parquet.rs b/datafusion/core/src/physical_plan/file_format/parquet.rs index d14c9235b432..8f63947e3029 100644 --- a/datafusion/core/src/physical_plan/file_format/parquet.rs +++ b/datafusion/core/src/physical_plan/file_format/parquet.rs @@ -306,12 +306,8 @@ impl ExecutionPlan for ParquetExec { enable_page_index: self.enable_page_index(config_options), }; - let stream = FileStream::new( - &self.base_config, - partition_index, - opener, - self.metrics.clone(), - )?; + let stream = + FileStream::new(&self.base_config, partition_index, opener, &self.metrics)?; Ok(Box::pin(stream)) }