Skip to content

Commit 553b9a7

Browse files
Fix index out of bounds panic (#819)
* Remove output_partitioning from TaskDescription * Remove output_partitioning from execution stages * Remove input_partition_count from UnresolvedShuffleExec * Add input stage id for ShuffleReaderExec * Correct the behavior of output_partitioning() for ShuffleWriterExec --------- Co-authored-by: yangzhong <[email protected]>
1 parent 1691457 commit 553b9a7

File tree

12 files changed

+70
-151
lines changed

12 files changed

+70
-151
lines changed

ballista/core/proto/ballista.proto

+2-5
Original file line numberDiff line numberDiff line change
@@ -49,13 +49,14 @@ message ShuffleWriterExecNode {
4949
message UnresolvedShuffleExecNode {
5050
uint32 stage_id = 1;
5151
datafusion.Schema schema = 2;
52-
uint32 input_partition_count = 3;
5352
uint32 output_partition_count = 4;
5453
}
5554

5655
message ShuffleReaderExecNode {
5756
repeated ShuffleReaderPartition partition = 1;
5857
datafusion.Schema schema = 2;
58+
// The stage to read from
59+
uint32 stage_id = 3;
5960
}
6061

6162
message ShuffleReaderPartition {
@@ -98,7 +99,6 @@ message ExecutionGraphStage {
9899

99100
message UnResolvedStage {
100101
uint32 stage_id = 1;
101-
datafusion.PhysicalHashRepartition output_partitioning = 2;
102102
repeated uint32 output_links = 3;
103103
repeated GraphStageInput inputs = 4;
104104
bytes plan = 5;
@@ -109,7 +109,6 @@ message UnResolvedStage {
109109
message ResolvedStage {
110110
uint32 stage_id = 1;
111111
uint32 partitions = 2;
112-
datafusion.PhysicalHashRepartition output_partitioning = 3;
113112
repeated uint32 output_links = 4;
114113
repeated GraphStageInput inputs = 5;
115114
bytes plan = 6;
@@ -120,7 +119,6 @@ message ResolvedStage {
120119
message SuccessfulStage {
121120
uint32 stage_id = 1;
122121
uint32 partitions = 2;
123-
datafusion.PhysicalHashRepartition output_partitioning = 3;
124122
repeated uint32 output_links = 4;
125123
repeated GraphStageInput inputs = 5;
126124
bytes plan = 6;
@@ -132,7 +130,6 @@ message SuccessfulStage {
132130
message FailedStage {
133131
uint32 stage_id = 1;
134132
uint32 partitions = 2;
135-
datafusion.PhysicalHashRepartition output_partitioning = 3;
136133
repeated uint32 output_links = 4;
137134
bytes plan = 5;
138135
repeated TaskInfo task_infos = 6;

ballista/core/src/execution_plans/shuffle_reader.rs

+13-5
Original file line numberDiff line numberDiff line change
@@ -58,22 +58,26 @@ use tokio_stream::wrappers::ReceiverStream;
5858
/// being executed by an executor
5959
#[derive(Debug, Clone)]
6060
pub struct ShuffleReaderExec {
61+
/// The query stage id to read from
62+
pub stage_id: usize,
63+
pub(crate) schema: SchemaRef,
6164
/// Each partition of a shuffle can read data from multiple locations
6265
pub partition: Vec<Vec<PartitionLocation>>,
63-
pub(crate) schema: SchemaRef,
6466
/// Execution metrics
6567
metrics: ExecutionPlanMetricsSet,
6668
}
6769

6870
impl ShuffleReaderExec {
6971
/// Create a new ShuffleReaderExec
7072
pub fn try_new(
73+
stage_id: usize,
7174
partition: Vec<Vec<PartitionLocation>>,
7275
schema: SchemaRef,
7376
) -> Result<Self> {
7477
Ok(Self {
75-
partition,
78+
stage_id,
7679
schema,
80+
partition,
7781
metrics: ExecutionPlanMetricsSet::new(),
7882
})
7983
}
@@ -513,13 +517,14 @@ mod tests {
513517
]);
514518

515519
let job_id = "test_job_1";
520+
let input_stage_id = 2;
516521
let mut partitions: Vec<PartitionLocation> = vec![];
517522
for partition_id in 0..4 {
518523
partitions.push(PartitionLocation {
519524
map_partition_id: 0,
520525
partition_id: PartitionId {
521526
job_id: job_id.to_string(),
522-
stage_id: 2,
527+
stage_id: input_stage_id,
523528
partition_id,
524529
},
525530
executor_meta: ExecutorMetadata {
@@ -534,8 +539,11 @@ mod tests {
534539
})
535540
}
536541

537-
let shuffle_reader_exec =
538-
ShuffleReaderExec::try_new(vec![partitions], Arc::new(schema))?;
542+
let shuffle_reader_exec = ShuffleReaderExec::try_new(
543+
input_stage_id,
544+
vec![partitions],
545+
Arc::new(schema),
546+
)?;
539547
let mut stream = shuffle_reader_exec.execute(0, task_ctx)?;
540548
let batches = utils::collect_stream(&mut stream).await;
541549

ballista/core/src/execution_plans/shuffle_writer.rs

+12-6
Original file line numberDiff line numberDiff line change
@@ -71,7 +71,8 @@ pub struct ShuffleWriterExec {
7171
plan: Arc<dyn ExecutionPlan>,
7272
/// Path to write output streams to
7373
work_dir: String,
74-
/// Optional shuffle output partitioning
74+
/// Optional shuffle output partitioning.
75+
/// If it's none, it means there's no need to do repartitioning.
7576
shuffle_output_partitioning: Option<Partitioning>,
7677
/// Execution metrics
7778
metrics: ExecutionPlanMetricsSet,
@@ -134,6 +135,11 @@ impl ShuffleWriterExec {
134135
self.stage_id
135136
}
136137

138+
/// Get the input partition count
139+
pub fn input_partition_count(&self) -> usize {
140+
self.plan.output_partitioning().partition_count()
141+
}
142+
137143
/// Get the true output partitioning
138144
pub fn shuffle_output_partitioning(&self) -> Option<&Partitioning> {
139145
self.shuffle_output_partitioning.as_ref()
@@ -297,12 +303,12 @@ impl ExecutionPlan for ShuffleWriterExec {
297303
self.plan.schema()
298304
}
299305

306+
/// If [`shuffle_output_partitioning`] is none, then there's no need to do repartitioning.
307+
/// Therefore, the partition is the same as its input plan's.
300308
fn output_partitioning(&self) -> Partitioning {
301-
// This operator needs to be executed once for each *input* partition and there
302-
// isn't really a mechanism yet in DataFusion to support this use case so we report
303-
// the input partitioning as the output partitioning here. The executor reports
304-
// output partition meta data back to the scheduler.
305-
self.plan.output_partitioning()
309+
self.shuffle_output_partitioning
310+
.clone()
311+
.unwrap_or_else(|| self.plan.output_partitioning())
306312
}
307313

308314
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {

ballista/core/src/execution_plans/unresolved_shuffle.rs

-5
Original file line numberDiff line numberDiff line change
@@ -38,9 +38,6 @@ pub struct UnresolvedShuffleExec {
3838
// The schema this node will have once it is replaced with a ShuffleReaderExec
3939
pub schema: SchemaRef,
4040

41-
// The number of shuffle writer partition tasks that will produce the partitions
42-
pub input_partition_count: usize,
43-
4441
// The partition count this node will have once it is replaced with a ShuffleReaderExec
4542
pub output_partition_count: usize,
4643
}
@@ -50,13 +47,11 @@ impl UnresolvedShuffleExec {
5047
pub fn new(
5148
stage_id: usize,
5249
schema: SchemaRef,
53-
input_partition_count: usize,
5450
output_partition_count: usize,
5551
) -> Self {
5652
Self {
5753
stage_id,
5854
schema,
59-
input_partition_count,
6055
output_partition_count,
6156
}
6257
}

ballista/core/src/serde/generated/ballista.rs

+3-18
Original file line numberDiff line numberDiff line change
@@ -45,8 +45,6 @@ pub struct UnresolvedShuffleExecNode {
4545
pub stage_id: u32,
4646
#[prost(message, optional, tag = "2")]
4747
pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
48-
#[prost(uint32, tag = "3")]
49-
pub input_partition_count: u32,
5048
#[prost(uint32, tag = "4")]
5149
pub output_partition_count: u32,
5250
}
@@ -57,6 +55,9 @@ pub struct ShuffleReaderExecNode {
5755
pub partition: ::prost::alloc::vec::Vec<ShuffleReaderPartition>,
5856
#[prost(message, optional, tag = "2")]
5957
pub schema: ::core::option::Option<::datafusion_proto::protobuf::Schema>,
58+
/// The stage to read from
59+
#[prost(uint32, tag = "3")]
60+
pub stage_id: u32,
6061
}
6162
#[allow(clippy::derive_partial_eq_without_eq)]
6263
#[derive(Clone, PartialEq, ::prost::Message)]
@@ -132,10 +133,6 @@ pub mod execution_graph_stage {
132133
pub struct UnResolvedStage {
133134
#[prost(uint32, tag = "1")]
134135
pub stage_id: u32,
135-
#[prost(message, optional, tag = "2")]
136-
pub output_partitioning: ::core::option::Option<
137-
::datafusion_proto::protobuf::PhysicalHashRepartition,
138-
>,
139136
#[prost(uint32, repeated, tag = "3")]
140137
pub output_links: ::prost::alloc::vec::Vec<u32>,
141138
#[prost(message, repeated, tag = "4")]
@@ -156,10 +153,6 @@ pub struct ResolvedStage {
156153
pub stage_id: u32,
157154
#[prost(uint32, tag = "2")]
158155
pub partitions: u32,
159-
#[prost(message, optional, tag = "3")]
160-
pub output_partitioning: ::core::option::Option<
161-
::datafusion_proto::protobuf::PhysicalHashRepartition,
162-
>,
163156
#[prost(uint32, repeated, tag = "4")]
164157
pub output_links: ::prost::alloc::vec::Vec<u32>,
165158
#[prost(message, repeated, tag = "5")]
@@ -180,10 +173,6 @@ pub struct SuccessfulStage {
180173
pub stage_id: u32,
181174
#[prost(uint32, tag = "2")]
182175
pub partitions: u32,
183-
#[prost(message, optional, tag = "3")]
184-
pub output_partitioning: ::core::option::Option<
185-
::datafusion_proto::protobuf::PhysicalHashRepartition,
186-
>,
187176
#[prost(uint32, repeated, tag = "4")]
188177
pub output_links: ::prost::alloc::vec::Vec<u32>,
189178
#[prost(message, repeated, tag = "5")]
@@ -204,10 +193,6 @@ pub struct FailedStage {
204193
pub stage_id: u32,
205194
#[prost(uint32, tag = "2")]
206195
pub partitions: u32,
207-
#[prost(message, optional, tag = "3")]
208-
pub output_partitioning: ::core::option::Option<
209-
::datafusion_proto::protobuf::PhysicalHashRepartition,
210-
>,
211196
#[prost(uint32, repeated, tag = "4")]
212197
pub output_links: ::prost::alloc::vec::Vec<u32>,
213198
#[prost(bytes = "vec", tag = "5")]

ballista/core/src/serde/mod.rs

+4-4
Original file line numberDiff line numberDiff line change
@@ -157,6 +157,7 @@ impl PhysicalExtensionCodec for BallistaPhysicalExtensionCodec {
157157
)?))
158158
}
159159
PhysicalPlanType::ShuffleReader(shuffle_reader) => {
160+
let stage_id = shuffle_reader.stage_id as usize;
160161
let schema = Arc::new(convert_required!(shuffle_reader.schema)?);
161162
let partition_location: Vec<Vec<PartitionLocation>> = shuffle_reader
162163
.partition
@@ -175,16 +176,14 @@ impl PhysicalExtensionCodec for BallistaPhysicalExtensionCodec {
175176
})
176177
.collect::<Result<Vec<_>, DataFusionError>>()?;
177178
let shuffle_reader =
178-
ShuffleReaderExec::try_new(partition_location, schema)?;
179+
ShuffleReaderExec::try_new(stage_id, partition_location, schema)?;
179180
Ok(Arc::new(shuffle_reader))
180181
}
181182
PhysicalPlanType::UnresolvedShuffle(unresolved_shuffle) => {
182183
let schema = Arc::new(convert_required!(unresolved_shuffle.schema)?);
183184
Ok(Arc::new(UnresolvedShuffleExec {
184185
stage_id: unresolved_shuffle.stage_id as usize,
185186
schema,
186-
input_partition_count: unresolved_shuffle.input_partition_count
187-
as usize,
188187
output_partition_count: unresolved_shuffle.output_partition_count
189188
as usize,
190189
}))
@@ -237,6 +236,7 @@ impl PhysicalExtensionCodec for BallistaPhysicalExtensionCodec {
237236

238237
Ok(())
239238
} else if let Some(exec) = node.as_any().downcast_ref::<ShuffleReaderExec>() {
239+
let stage_id = exec.stage_id as u32;
240240
let mut partition = vec![];
241241
for location in &exec.partition {
242242
partition.push(protobuf::ShuffleReaderPartition {
@@ -255,6 +255,7 @@ impl PhysicalExtensionCodec for BallistaPhysicalExtensionCodec {
255255
let proto = protobuf::BallistaPhysicalPlanNode {
256256
physical_plan_type: Some(PhysicalPlanType::ShuffleReader(
257257
protobuf::ShuffleReaderExecNode {
258+
stage_id,
258259
partition,
259260
schema: Some(exec.schema().as_ref().try_into()?),
260261
},
@@ -273,7 +274,6 @@ impl PhysicalExtensionCodec for BallistaPhysicalExtensionCodec {
273274
protobuf::UnresolvedShuffleExecNode {
274275
stage_id: exec.stage_id as u32,
275276
schema: Some(exec.schema().as_ref().try_into()?),
276-
input_partition_count: exec.input_partition_count as u32,
277277
output_partition_count: exec.output_partition_count as u32,
278278
},
279279
)),

ballista/scheduler/src/planner.rs

+4-13
Original file line numberDiff line numberDiff line change
@@ -178,10 +178,6 @@ fn create_unresolved_shuffle(
178178
shuffle_writer.stage_id(),
179179
shuffle_writer.schema(),
180180
shuffle_writer.output_partitioning().partition_count(),
181-
shuffle_writer
182-
.shuffle_output_partitioning()
183-
.map(|p| p.partition_count())
184-
.unwrap_or_else(|| shuffle_writer.output_partitioning().partition_count()),
185181
))
186182
}
187183

@@ -246,6 +242,7 @@ pub fn remove_unresolved_shuffles(
246242
.join("\n")
247243
);
248244
new_children.push(Arc::new(ShuffleReaderExec::try_new(
245+
unresolved_shuffle.stage_id,
249246
relevant_locations,
250247
unresolved_shuffle.schema().clone(),
251248
)?))
@@ -265,15 +262,13 @@ pub fn rollback_resolved_shuffles(
265262
let mut new_children: Vec<Arc<dyn ExecutionPlan>> = vec![];
266263
for child in stage.children() {
267264
if let Some(shuffle_reader) = child.as_any().downcast_ref::<ShuffleReaderExec>() {
268-
let partition_locations = &shuffle_reader.partition;
269-
let output_partition_count = partition_locations.len();
270-
let input_partition_count = partition_locations[0].len();
271-
let stage_id = partition_locations[0][0].partition_id.stage_id;
265+
let output_partition_count =
266+
shuffle_reader.output_partitioning().partition_count();
267+
let stage_id = shuffle_reader.stage_id;
272268

273269
let unresolved_shuffle = Arc::new(UnresolvedShuffleExec::new(
274270
stage_id,
275271
shuffle_reader.schema(),
276-
input_partition_count,
277272
output_partition_count,
278273
));
279274
new_children.push(unresolved_shuffle);
@@ -392,7 +387,6 @@ mod test {
392387
let unresolved_shuffle =
393388
downcast_exec!(unresolved_shuffle, UnresolvedShuffleExec);
394389
assert_eq!(unresolved_shuffle.stage_id, 1);
395-
assert_eq!(unresolved_shuffle.input_partition_count, 2);
396390
assert_eq!(unresolved_shuffle.output_partition_count, 2);
397391

398392
// verify stage 2
@@ -402,7 +396,6 @@ mod test {
402396
let unresolved_shuffle =
403397
downcast_exec!(unresolved_shuffle, UnresolvedShuffleExec);
404398
assert_eq!(unresolved_shuffle.stage_id, 2);
405-
assert_eq!(unresolved_shuffle.input_partition_count, 2);
406399
assert_eq!(unresolved_shuffle.output_partition_count, 2);
407400

408401
Ok(())
@@ -555,15 +548,13 @@ order by
555548
let join_input_1 = join_input_1.children()[0].clone();
556549
let unresolved_shuffle_reader_1 =
557550
downcast_exec!(join_input_1, UnresolvedShuffleExec);
558-
assert_eq!(unresolved_shuffle_reader_1.input_partition_count, 2); // lineitem
559551
assert_eq!(unresolved_shuffle_reader_1.output_partition_count, 2);
560552

561553
let join_input_2 = join.children()[1].clone();
562554
// skip CoalesceBatches
563555
let join_input_2 = join_input_2.children()[0].clone();
564556
let unresolved_shuffle_reader_2 =
565557
downcast_exec!(join_input_2, UnresolvedShuffleExec);
566-
assert_eq!(unresolved_shuffle_reader_2.input_partition_count, 1); // orders
567558
assert_eq!(unresolved_shuffle_reader_2.output_partition_count, 2);
568559

569560
// final partitioned hash aggregate

ballista/scheduler/src/scheduler_server/mod.rs

+1-4
Original file line numberDiff line numberDiff line change
@@ -460,10 +460,7 @@ mod test {
460460
if let Some(task) = task {
461461
let mut partitions: Vec<ShuffleWritePartition> = vec![];
462462

463-
let num_partitions = task
464-
.output_partitioning
465-
.map(|p| p.partition_count())
466-
.unwrap_or(1);
463+
let num_partitions = task.get_output_partition_number();
467464

468465
for partition_id in 0..num_partitions {
469466
partitions.push(ShuffleWritePartition {

0 commit comments

Comments
 (0)