Skip to content

Commit 161fcd8

Browse files
authored
Simplify file struct abstractions (#1120)
* [refacto] simplify file struct abstractions * [doc] explicit desc for file_groups
1 parent a3ffc52 commit 161fcd8

File tree

7 files changed

+79
-111
lines changed

7 files changed

+79
-111
lines changed

ballista/rust/core/proto/ballista.proto

+2-2
Original file line numberDiff line numberDiff line change
@@ -596,7 +596,7 @@ message FilterExecNode {
596596
PhysicalExprNode expr = 2;
597597
}
598598

599-
message FilePartition {
599+
message FileGroup {
600600
repeated PartitionedFile files = 1;
601601
}
602602

@@ -606,7 +606,7 @@ message ScanLimit {
606606
}
607607

608608
message ParquetScanExecNode {
609-
repeated FilePartition partitions = 1;
609+
repeated FileGroup file_groups = 1;
610610
Schema schema = 2;
611611
uint32 batch_size = 4;
612612
repeated uint32 projection = 6;

ballista/rust/core/src/serde/physical_plan/from_proto.rs

+22-21
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ use datafusion::catalog::catalog::{
3737
};
3838
use datafusion::datasource::object_store::local::LocalFileSystem;
3939
use datafusion::datasource::object_store::{FileMeta, ObjectStoreRegistry, SizedFile};
40-
use datafusion::datasource::{FilePartition, PartitionedFile};
40+
use datafusion::datasource::PartitionedFile;
4141
use datafusion::execution::context::{
4242
ExecutionConfig, ExecutionContextState, ExecutionProps,
4343
};
@@ -127,8 +127,8 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
127127
Arc::new(LocalFileSystem {}),
128128
scan.files
129129
.iter()
130-
.map(|f| f.try_into())
131-
.collect::<Result<Vec<PartitionedFile>, _>>()?,
130+
.map(|f| f.into())
131+
.collect::<Vec<PartitionedFile>>(),
132132
statistics,
133133
schema,
134134
scan.has_header,
@@ -145,13 +145,10 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
145145

146146
Ok(Arc::new(ParquetExec::new(
147147
Arc::new(LocalFileSystem {}),
148-
scan.partitions
148+
scan.file_groups
149149
.iter()
150-
.map(|p| {
151-
let it = p.files.iter().map(|f| f.try_into());
152-
it.collect::<Result<Vec<PartitionedFile>, _>>()
153-
})
154-
.collect::<Result<Vec<Vec<PartitionedFile>>, _>>()?,
150+
.map(|p| p.into())
151+
.collect::<Vec<Vec<PartitionedFile>>>(),
155152
statistics,
156153
schema,
157154
Some(projection),
@@ -170,8 +167,8 @@ impl TryInto<Arc<dyn ExecutionPlan>> for &protobuf::PhysicalPlanNode {
170167
Arc::new(LocalFileSystem {}),
171168
scan.files
172169
.iter()
173-
.map(|f| f.try_into())
174-
.collect::<Result<Vec<PartitionedFile>, _>>()?,
170+
.map(|f| f.into())
171+
.collect::<Vec<PartitionedFile>>(),
175172
statistics,
176173
schema,
177174
Some(projection),
@@ -741,23 +738,27 @@ pub fn parse_protobuf_hash_partitioning(
741738
}
742739
}
743740

744-
impl TryInto<PartitionedFile> for &protobuf::PartitionedFile {
745-
type Error = BallistaError;
746-
747-
fn try_into(self) -> Result<PartitionedFile, Self::Error> {
748-
Ok(PartitionedFile {
741+
impl From<&protobuf::PartitionedFile> for PartitionedFile {
742+
fn from(val: &protobuf::PartitionedFile) -> Self {
743+
PartitionedFile {
749744
file_meta: FileMeta {
750745
sized_file: SizedFile {
751-
path: self.path.clone(),
752-
size: self.size,
746+
path: val.path.clone(),
747+
size: val.size,
753748
},
754-
last_modified: if self.last_modified_ns == 0 {
749+
last_modified: if val.last_modified_ns == 0 {
755750
None
756751
} else {
757-
Some(Utc.timestamp_nanos(self.last_modified_ns as i64))
752+
Some(Utc.timestamp_nanos(val.last_modified_ns as i64))
758753
},
759754
},
760-
})
755+
}
756+
}
757+
}
758+
759+
impl From<&protobuf::FileGroup> for Vec<PartitionedFile> {
760+
fn from(val: &protobuf::FileGroup) -> Self {
761+
val.files.iter().map(|f| f.into()).collect()
761762
}
762763
}
763764

ballista/rust/core/src/serde/physical_plan/to_proto.rs

+13-7
Original file line numberDiff line numberDiff line change
@@ -275,18 +275,16 @@ impl TryInto<protobuf::PhysicalPlanNode> for Arc<dyn ExecutionPlan> {
275275
)),
276276
})
277277
} else if let Some(exec) = plan.downcast_ref::<ParquetExec>() {
278-
let partitions = exec
279-
.partitions()
280-
.into_iter()
281-
.map(|p| protobuf::FilePartition {
282-
files: p.iter().map(|f| f.into()).collect(),
283-
})
278+
let file_groups = exec
279+
.file_groups()
280+
.iter()
281+
.map(|p| p.as_slice().into())
284282
.collect();
285283

286284
Ok(protobuf::PhysicalPlanNode {
287285
physical_plan_type: Some(PhysicalPlanType::ParquetScan(
288286
protobuf::ParquetScanExecNode {
289-
partitions,
287+
file_groups,
290288
statistics: Some((&exec.statistics()).into()),
291289
limit: exec
292290
.limit()
@@ -688,6 +686,14 @@ impl From<&PartitionedFile> for protobuf::PartitionedFile {
688686
}
689687
}
690688

689+
impl From<&[PartitionedFile]> for protobuf::FileGroup {
690+
fn from(gr: &[PartitionedFile]) -> protobuf::FileGroup {
691+
protobuf::FileGroup {
692+
files: gr.iter().map(|f| f.into()).collect(),
693+
}
694+
}
695+
}
696+
691697
impl From<&ColumnStatistics> for protobuf::ColumnStats {
692698
fn from(cs: &ColumnStatistics) -> protobuf::ColumnStats {
693699
protobuf::ColumnStats {

datafusion/src/datasource/datasource.rs

+3
Original file line numberDiff line numberDiff line change
@@ -71,6 +71,9 @@ pub trait TableProvider: Sync + Send {
7171
}
7272

7373
/// Create an ExecutionPlan that will scan the table.
74+
/// The table provider will be usually responsible of grouping
75+
/// the source data into partitions that can be efficiently
76+
/// parallelized or distributed.
7477
async fn scan(
7578
&self,
7679
projection: &Option<Vec<usize>>,

datafusion/src/datasource/mod.rs

-16
Original file line numberDiff line numberDiff line change
@@ -155,22 +155,6 @@ impl std::fmt::Display for PartitionedFile {
155155
}
156156
}
157157

158-
#[derive(Debug, Clone)]
159-
/// A collection of files that should be read in a single task
160-
pub struct FilePartition {
161-
/// The index of the partition among all partitions
162-
pub index: usize,
163-
/// The contained files of the partition
164-
pub files: Vec<PartitionedFile>,
165-
}
166-
167-
impl std::fmt::Display for FilePartition {
168-
fn fmt(&self, f: &mut std::fmt::Formatter) -> std::fmt::Result {
169-
let files: Vec<String> = self.files.iter().map(|f| f.to_string()).collect();
170-
write!(f, "{}", files.join(", "))
171-
}
172-
}
173-
174158
fn create_max_min_accs(
175159
schema: &Schema,
176160
) -> (Vec<Option<MaxAccumulator>>, Vec<Option<MinAccumulator>>) {

datafusion/src/physical_plan/file_format/mod.rs

+23
Original file line numberDiff line numberDiff line change
@@ -26,3 +26,26 @@ pub use self::parquet::ParquetExec;
2626
pub use avro::AvroExec;
2727
pub use csv::CsvExec;
2828
pub use json::NdJsonExec;
29+
30+
use crate::datasource::PartitionedFile;
31+
use std::fmt::{Display, Formatter, Result};
32+
33+
/// A wrapper to customize partitioned file display
34+
#[derive(Debug)]
35+
struct FileGroupsDisplay<'a>(&'a [Vec<PartitionedFile>]);
36+
37+
impl<'a> Display for FileGroupsDisplay<'a> {
38+
fn fmt(&self, f: &mut Formatter) -> Result {
39+
let parts: Vec<_> = self
40+
.0
41+
.iter()
42+
.map(|pp| {
43+
pp.iter()
44+
.map(|pf| pf.file_meta.path())
45+
.collect::<Vec<_>>()
46+
.join(", ")
47+
})
48+
.collect();
49+
write!(f, "[{}]", parts.join(", "))
50+
}
51+
}

datafusion/src/physical_plan/file_format/parquet.rs

+16-65
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@ use std::{any::Any, convert::TryInto};
2323

2424
use crate::datasource::file_format::parquet::ChunkObjectReader;
2525
use crate::datasource::object_store::ObjectStore;
26+
use crate::datasource::PartitionedFile;
2627
use crate::{
2728
error::{DataFusionError, Result},
2829
logical_plan::{Column, Expr},
@@ -59,14 +60,13 @@ use tokio::{
5960

6061
use async_trait::async_trait;
6162

62-
use crate::datasource::{FilePartition, PartitionedFile};
63-
6463
/// Execution plan for scanning one or more Parquet partitions
6564
#[derive(Debug, Clone)]
6665
pub struct ParquetExec {
6766
object_store: Arc<dyn ObjectStore>,
68-
/// Parquet partitions to read
69-
partitions: Vec<ParquetPartition>,
67+
/// Grouped list of files. Each group will be processed together by one
68+
/// partition of the `ExecutionPlan`.
69+
file_groups: Vec<Vec<PartitionedFile>>,
7070
/// Schema after projection is applied
7171
schema: SchemaRef,
7272
/// Projection for which columns to load
@@ -83,23 +83,6 @@ pub struct ParquetExec {
8383
limit: Option<usize>,
8484
}
8585

86-
/// Represents one partition of a Parquet data set and this currently means one Parquet file.
87-
///
88-
/// In the future it would be good to support subsets of files based on ranges of row groups
89-
/// so that we can better parallelize reads of large files across available cores (see
90-
/// [ARROW-10995](https://issues.apache.org/jira/browse/ARROW-10995)).
91-
///
92-
/// We may also want to support reading Parquet files that are partitioned based on a key and
93-
/// in this case we would want this partition struct to represent multiple files for a given
94-
/// partition key (see [ARROW-11019](https://issues.apache.org/jira/browse/ARROW-11019)).
95-
#[derive(Debug, Clone)]
96-
pub struct ParquetPartition {
97-
/// The Parquet filename for this partition
98-
pub file_partition: FilePartition,
99-
/// Execution metrics
100-
metrics: ExecutionPlanMetricsSet,
101-
}
102-
10386
/// Stores metrics about the parquet execution for a particular parquet file
10487
#[derive(Debug, Clone)]
10588
struct ParquetFileMetrics {
@@ -115,24 +98,16 @@ impl ParquetExec {
11598
#[allow(clippy::too_many_arguments)]
11699
pub fn new(
117100
object_store: Arc<dyn ObjectStore>,
118-
files: Vec<Vec<PartitionedFile>>,
101+
file_groups: Vec<Vec<PartitionedFile>>,
119102
statistics: Statistics,
120103
schema: SchemaRef,
121104
projection: Option<Vec<usize>>,
122105
predicate: Option<Expr>,
123106
batch_size: usize,
124107
limit: Option<usize>,
125108
) -> Self {
126-
debug!("Creating ParquetExec, desc: {:?}, projection {:?}, predicate: {:?}, limit: {:?}",
127-
files, projection, predicate, limit);
128-
129-
let metrics = ExecutionPlanMetricsSet::new();
130-
131-
let partitions = files
132-
.into_iter()
133-
.enumerate()
134-
.map(|(i, f)| ParquetPartition::new(f, i, metrics.clone()))
135-
.collect::<Vec<_>>();
109+
debug!("Creating ParquetExec, files: {:?}, projection {:?}, predicate: {:?}, limit: {:?}",
110+
file_groups, projection, predicate, limit);
136111

137112
let metrics = ExecutionPlanMetricsSet::new();
138113
let predicate_creation_errors =
@@ -162,7 +137,7 @@ impl ParquetExec {
162137

163138
Self {
164139
object_store,
165-
partitions,
140+
file_groups,
166141
schema: projected_schema,
167142
projection,
168143
metrics,
@@ -204,11 +179,8 @@ impl ParquetExec {
204179
}
205180

206181
/// List of data files
207-
pub fn partitions(&self) -> Vec<&[PartitionedFile]> {
208-
self.partitions
209-
.iter()
210-
.map(|fp| fp.file_partition.files.as_slice())
211-
.collect()
182+
pub fn file_groups(&self) -> &[Vec<PartitionedFile>] {
183+
&self.file_groups
212184
}
213185
/// Optional projection for which columns to load
214186
pub fn projection(&self) -> &[usize] {
@@ -225,20 +197,6 @@ impl ParquetExec {
225197
}
226198
}
227199

228-
impl ParquetPartition {
229-
/// Create a new parquet partition
230-
pub fn new(
231-
files: Vec<PartitionedFile>,
232-
index: usize,
233-
metrics: ExecutionPlanMetricsSet,
234-
) -> Self {
235-
Self {
236-
file_partition: FilePartition { index, files },
237-
metrics,
238-
}
239-
}
240-
}
241-
242200
impl ParquetFileMetrics {
243201
/// Create new metrics
244202
pub fn new(
@@ -279,7 +237,7 @@ impl ExecutionPlan for ParquetExec {
279237

280238
/// Get the output partitioning of this plan
281239
fn output_partitioning(&self) -> Partitioning {
282-
Partitioning::UnknownPartitioning(self.partitions.len())
240+
Partitioning::UnknownPartitioning(self.file_groups.len())
283241
}
284242

285243
fn with_new_children(
@@ -304,7 +262,7 @@ impl ExecutionPlan for ParquetExec {
304262
Receiver<ArrowResult<RecordBatch>>,
305263
) = channel(2);
306264

307-
let partition = self.partitions[partition_index].clone();
265+
let partition = self.file_groups[partition_index].clone();
308266
let metrics = self.metrics.clone();
309267
let projection = self.projection.clone();
310268
let predicate_builder = self.predicate_builder.clone();
@@ -342,18 +300,12 @@ impl ExecutionPlan for ParquetExec {
342300
) -> std::fmt::Result {
343301
match t {
344302
DisplayFormatType::Default => {
345-
let files: Vec<_> = self
346-
.partitions
347-
.iter()
348-
.map(|pp| format!("{}", pp.file_partition))
349-
.collect();
350-
351303
write!(
352304
f,
353-
"ParquetExec: batch_size={}, limit={:?}, partitions=[{}]",
305+
"ParquetExec: batch_size={}, limit={:?}, partitions={}",
354306
self.batch_size,
355307
self.limit,
356-
files.join(", ")
308+
super::FileGroupsDisplay(&self.file_groups)
357309
)
358310
}
359311
}
@@ -497,7 +449,7 @@ fn build_row_group_predicate(
497449
fn read_partition(
498450
object_store: &dyn ObjectStore,
499451
partition_index: usize,
500-
partition: ParquetPartition,
452+
partition: Vec<PartitionedFile>,
501453
metrics: ExecutionPlanMetricsSet,
502454
projection: &[usize],
503455
predicate_builder: &Option<PruningPredicate>,
@@ -506,8 +458,7 @@ fn read_partition(
506458
limit: Option<usize>,
507459
) -> Result<()> {
508460
let mut total_rows = 0;
509-
let all_files = partition.file_partition.files;
510-
'outer: for partitioned_file in all_files {
461+
'outer: for partitioned_file in partition {
511462
let file_metrics = ParquetFileMetrics::new(
512463
partition_index,
513464
&*partitioned_file.file_meta.path(),

0 commit comments

Comments
 (0)