Skip to content

Commit 0c79855

Browse files
thinkharderdevmpurins-coralogix
authored andcommitted
Use node-level local limit (#20)
* Use node-level local limit * serialize limit in shuffle writer * Revert "Merge pull request #19 from coralogix/sc-5792" This reverts commit 08140ef, reversing changes made to a7f1384. * add log * make sure we don't forget limit for shuffle writer * update accum correctly and try to break early * Check local limit accumulator before polling for more data * fix build Co-authored-by: Martins Purins <[email protected]>
1 parent b8f8983 commit 0c79855

File tree

7 files changed

+1970
-12
lines changed

7 files changed

+1970
-12
lines changed

ballista/core/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,11 @@ datafusion-proto = "17.0.0"
5656
futures = "0.3"
5757
hashbrown = "0.13"
5858

59+
lazy_static = "1.4.0"
5960
itertools = "0.10"
6061
libloading = "0.7.3"
6162
log = "0.4"
63+
lru = "0.8.1"
6264
object_store = "0.5.2"
6365
once_cell = "1.9.0"
6466

ballista/core/proto/ballista.proto

+128
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,134 @@ message UnresolvedShuffleExecNode {
5353
uint32 output_partition_count = 4;
5454
}
5555

56+
message FilterExecNode {
57+
PhysicalPlanNode input = 1;
58+
PhysicalExprNode expr = 2;
59+
}
60+
61+
message FileGroup {
62+
repeated PartitionedFile files = 1;
63+
}
64+
65+
message ScanLimit {
66+
// wrap into a message to make it optional
67+
uint32 limit = 1;
68+
}
69+
70+
message FileScanExecConf {
71+
repeated FileGroup file_groups = 1;
72+
datafusion.Schema schema = 2;
73+
repeated uint32 projection = 4;
74+
ScanLimit limit = 5;
75+
Statistics statistics = 6;
76+
repeated string table_partition_cols = 7;
77+
string object_store_url = 8;
78+
}
79+
80+
message ParquetScanExecNode {
81+
FileScanExecConf base_conf = 1;
82+
datafusion.LogicalExprNode pruning_predicate = 2;
83+
}
84+
85+
message CsvScanExecNode {
86+
FileScanExecConf base_conf = 1;
87+
bool has_header = 2;
88+
string delimiter = 3;
89+
}
90+
91+
message AvroScanExecNode {
92+
FileScanExecConf base_conf = 1;
93+
}
94+
95+
enum PartitionMode {
96+
COLLECT_LEFT = 0;
97+
PARTITIONED = 1;
98+
}
99+
100+
message HashJoinExecNode {
101+
PhysicalPlanNode left = 1;
102+
PhysicalPlanNode right = 2;
103+
repeated JoinOn on = 3;
104+
datafusion.JoinType join_type = 4;
105+
PartitionMode partition_mode = 6;
106+
bool null_equals_null = 7;
107+
JoinFilter filter = 8;
108+
}
109+
110+
message UnionExecNode {
111+
repeated PhysicalPlanNode inputs = 1;
112+
}
113+
114+
message ExplainExecNode {
115+
datafusion.Schema schema = 1;
116+
repeated datafusion.StringifiedPlan stringified_plans = 2;
117+
bool verbose = 3;
118+
}
119+
120+
message CrossJoinExecNode {
121+
PhysicalPlanNode left = 1;
122+
PhysicalPlanNode right = 2;
123+
}
124+
125+
message PhysicalColumn {
126+
string name = 1;
127+
uint32 index = 2;
128+
}
129+
130+
message JoinOn {
131+
PhysicalColumn left = 1;
132+
PhysicalColumn right = 2;
133+
}
134+
135+
message EmptyExecNode {
136+
bool produce_one_row = 1;
137+
datafusion.Schema schema = 2;
138+
}
139+
140+
message ProjectionExecNode {
141+
PhysicalPlanNode input = 1;
142+
repeated PhysicalExprNode expr = 2;
143+
repeated string expr_name = 3;
144+
}
145+
146+
enum AggregateMode {
147+
PARTIAL = 0;
148+
FINAL = 1;
149+
FINAL_PARTITIONED = 2;
150+
}
151+
152+
message WindowAggExecNode {
153+
PhysicalPlanNode input = 1;
154+
repeated PhysicalExprNode window_expr = 2;
155+
repeated string window_expr_name = 3;
156+
datafusion.Schema input_schema = 4;
157+
}
158+
159+
message AggregateExecNode {
160+
repeated PhysicalExprNode group_expr = 1;
161+
repeated PhysicalExprNode aggr_expr = 2;
162+
AggregateMode mode = 3;
163+
PhysicalPlanNode input = 4;
164+
repeated string group_expr_name = 5;
165+
repeated string aggr_expr_name = 6;
166+
// we need the input schema to the partial aggregate to pass to the final aggregate
167+
datafusion.Schema input_schema = 7;
168+
repeated PhysicalExprNode null_expr = 8;
169+
repeated bool groups = 9;
170+
}
171+
172+
message ShuffleWriterExecNode {
173+
//TODO it seems redundant to provide job and stage id here since we also have them
174+
// in the TaskDefinition that wraps this plan
175+
string job_id = 1;
176+
uint32 stage_id = 2;
177+
PhysicalPlanNode input = 3;
178+
PhysicalHashRepartition output_partitioning = 4;
179+
oneof optional_limit {
180+
uint64 limit = 6;
181+
}
182+
}
183+
56184
message ShuffleReaderExecNode {
57185
repeated ShuffleReaderPartition partition = 1;
58186
datafusion.Schema schema = 2;

ballista/core/src/execution_plans/shuffle_writer.rs

+81-3
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,30 @@ use datafusion::arrow::error::ArrowError;
5555
use datafusion::execution::context::TaskContext;
5656
use datafusion::physical_plan::repartition::BatchPartitioner;
5757
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
58+
use lazy_static::lazy_static;
5859
use log::{debug, info};
60+
use lru::LruCache;
61+
use parking_lot::Mutex;
62+
use std::num::NonZeroUsize;
63+
use std::sync::atomic::{AtomicUsize, Ordering};
64+
65+
lazy_static! {
66+
static ref LIMIT_ACCUMULATORS: Mutex<LruCache<(String, usize), Arc<AtomicUsize>>> =
67+
Mutex::new(LruCache::new(NonZeroUsize::new(40).unwrap()));
68+
}
69+
70+
fn get_limit_accumulator(job_id: &str, stage: usize) -> Arc<AtomicUsize> {
71+
let mut guard = LIMIT_ACCUMULATORS.lock();
72+
73+
if let Some(accumulator) = guard.get(&(job_id.to_owned(), stage)) {
74+
accumulator.clone()
75+
} else {
76+
let accumulator = Arc::new(AtomicUsize::new(0));
77+
guard.push((job_id.to_owned(), stage), accumulator.clone());
78+
79+
accumulator
80+
}
81+
}
5982

6083
/// ShuffleWriterExec represents a section of a query plan that has consistent partitioning and
6184
/// can be executed as one unit with each partition being executed in parallel. The output of each
@@ -75,6 +98,8 @@ pub struct ShuffleWriterExec {
7598
shuffle_output_partitioning: Option<Partitioning>,
7699
/// Execution metrics
77100
metrics: ExecutionPlanMetricsSet,
101+
/// Maximum number of rows to return
102+
limit: Option<usize>,
78103
}
79104

80105
#[derive(Debug, Clone)]
@@ -121,6 +146,26 @@ impl ShuffleWriterExec {
121146
work_dir,
122147
shuffle_output_partitioning,
123148
metrics: ExecutionPlanMetricsSet::new(),
149+
limit: None,
150+
})
151+
}
152+
153+
pub fn try_new_with_limit(
154+
job_id: String,
155+
stage_id: usize,
156+
plan: Arc<dyn ExecutionPlan>,
157+
work_dir: String,
158+
shuffle_output_partitioning: Option<Partitioning>,
159+
limit: Option<usize>,
160+
) -> Result<Self> {
161+
Ok(Self {
162+
job_id,
163+
stage_id,
164+
plan,
165+
work_dir,
166+
shuffle_output_partitioning,
167+
metrics: ExecutionPlanMetricsSet::new(),
168+
limit,
124169
})
125170
}
126171

@@ -139,6 +184,10 @@ impl ShuffleWriterExec {
139184
self.shuffle_output_partitioning.as_ref()
140185
}
141186

187+
pub fn limit(&self) -> Option<usize> {
188+
self.limit
189+
}
190+
142191
pub fn execute_shuffle_write(
143192
&self,
144193
input_partition: usize,
@@ -152,6 +201,10 @@ impl ShuffleWriterExec {
152201
let output_partitioning = self.shuffle_output_partitioning.clone();
153202
let plan = self.plan.clone();
154203

204+
let limit_and_accumulator = self
205+
.limit
206+
.map(|l| (l, get_limit_accumulator(&self.job_id, self.stage_id)));
207+
155208
async move {
156209
let now = Instant::now();
157210
let mut stream = plan.execute(input_partition, context)?;
@@ -170,6 +223,7 @@ impl ShuffleWriterExec {
170223
&mut stream,
171224
path,
172225
&write_metrics.write_time,
226+
limit_and_accumulator,
173227
)
174228
.await
175229
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
@@ -211,10 +265,26 @@ impl ShuffleWriterExec {
211265
write_metrics.repart_time.clone(),
212266
)?;
213267

214-
while let Some(result) = stream.next().await {
268+
while let Some(result) = {
269+
let poll_more = limit_and_accumulator.as_ref().map_or(
270+
true,
271+
|(limit, accum)| {
272+
let total_rows = accum.load(Ordering::SeqCst);
273+
total_rows < *limit
274+
},
275+
);
276+
277+
if poll_more {
278+
stream.next().await
279+
} else {
280+
None
281+
}
282+
} {
215283
let input_batch = result?;
216284

217-
write_metrics.input_rows.add(input_batch.num_rows());
285+
let num_rows = input_batch.num_rows();
286+
287+
write_metrics.input_rows.add(num_rows);
218288

219289
partitioner.partition(
220290
input_batch,
@@ -252,6 +322,13 @@ impl ShuffleWriterExec {
252322
Ok(())
253323
},
254324
)?;
325+
326+
if let Some((limit, accum)) = limit_and_accumulator.as_ref() {
327+
let total_rows = accum.fetch_add(num_rows, Ordering::SeqCst);
328+
if total_rows > *limit {
329+
break;
330+
}
331+
}
255332
}
256333

257334
let mut part_locs = vec![];
@@ -320,12 +397,13 @@ impl ExecutionPlan for ShuffleWriterExec {
320397
self: Arc<Self>,
321398
children: Vec<Arc<dyn ExecutionPlan>>,
322399
) -> Result<Arc<dyn ExecutionPlan>> {
323-
Ok(Arc::new(ShuffleWriterExec::try_new(
400+
Ok(Arc::new(ShuffleWriterExec::try_new_with_limit(
324401
self.job_id.clone(),
325402
self.stage_id,
326403
children[0].clone(),
327404
self.work_dir.clone(),
328405
self.shuffle_output_partitioning.clone(),
406+
self.limit,
329407
)?))
330408
}
331409

0 commit comments

Comments
 (0)