Skip to content

Commit 0865d8d

Browse files
thinkharderdevmpurins-coralogix
authored andcommitted
Use node-level local limit (#20)
* Use node-level local limit * serialize limit in shuffle writer * Revert "Merge pull request #19 from coralogix/sc-5792" This reverts commit 08140ef, reversing changes made to a7f1384. * add log * make sure we don't forget limit for shuffle writer * update accum correctly and try to break early * Check local limit accumulator before polling for more data * fix build Co-authored-by: Martins Purins <[email protected]>
1 parent 61248d8 commit 0865d8d

File tree

5 files changed

+128
-12
lines changed

5 files changed

+128
-12
lines changed

ballista/core/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -56,9 +56,11 @@ datafusion-proto = "18.0.0"
5656
futures = "0.3"
5757
hashbrown = "0.13"
5858

59+
lazy_static = "1.4.0"
5960
itertools = "0.10"
6061
libloading = "0.7.3"
6162
log = "0.4"
63+
lru = "0.8.1"
6264
object_store = "0.5.2"
6365
once_cell = "1.9.0"
6466

ballista/core/src/execution_plans/shuffle_writer.rs

+81-3
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,30 @@ use datafusion::arrow::error::ArrowError;
5555
use datafusion::execution::context::TaskContext;
5656
use datafusion::physical_plan::repartition::BatchPartitioner;
5757
use datafusion::physical_plan::stream::RecordBatchStreamAdapter;
58+
use lazy_static::lazy_static;
5859
use log::{debug, info};
60+
use lru::LruCache;
61+
use parking_lot::Mutex;
62+
use std::num::NonZeroUsize;
63+
use std::sync::atomic::{AtomicUsize, Ordering};
64+
65+
lazy_static! {
66+
static ref LIMIT_ACCUMULATORS: Mutex<LruCache<(String, usize), Arc<AtomicUsize>>> =
67+
Mutex::new(LruCache::new(NonZeroUsize::new(40).unwrap()));
68+
}
69+
70+
fn get_limit_accumulator(job_id: &str, stage: usize) -> Arc<AtomicUsize> {
71+
let mut guard = LIMIT_ACCUMULATORS.lock();
72+
73+
if let Some(accumulator) = guard.get(&(job_id.to_owned(), stage)) {
74+
accumulator.clone()
75+
} else {
76+
let accumulator = Arc::new(AtomicUsize::new(0));
77+
guard.push((job_id.to_owned(), stage), accumulator.clone());
78+
79+
accumulator
80+
}
81+
}
5982

6083
/// ShuffleWriterExec represents a section of a query plan that has consistent partitioning and
6184
/// can be executed as one unit with each partition being executed in parallel. The output of each
@@ -75,6 +98,8 @@ pub struct ShuffleWriterExec {
7598
shuffle_output_partitioning: Option<Partitioning>,
7699
/// Execution metrics
77100
metrics: ExecutionPlanMetricsSet,
101+
/// Maximum number of rows to return
102+
limit: Option<usize>,
78103
}
79104

80105
#[derive(Debug, Clone)]
@@ -121,6 +146,26 @@ impl ShuffleWriterExec {
121146
work_dir,
122147
shuffle_output_partitioning,
123148
metrics: ExecutionPlanMetricsSet::new(),
149+
limit: None,
150+
})
151+
}
152+
153+
pub fn try_new_with_limit(
154+
job_id: String,
155+
stage_id: usize,
156+
plan: Arc<dyn ExecutionPlan>,
157+
work_dir: String,
158+
shuffle_output_partitioning: Option<Partitioning>,
159+
limit: Option<usize>,
160+
) -> Result<Self> {
161+
Ok(Self {
162+
job_id,
163+
stage_id,
164+
plan,
165+
work_dir,
166+
shuffle_output_partitioning,
167+
metrics: ExecutionPlanMetricsSet::new(),
168+
limit,
124169
})
125170
}
126171

@@ -139,6 +184,10 @@ impl ShuffleWriterExec {
139184
self.shuffle_output_partitioning.as_ref()
140185
}
141186

187+
pub fn limit(&self) -> Option<usize> {
188+
self.limit
189+
}
190+
142191
pub fn execute_shuffle_write(
143192
&self,
144193
input_partition: usize,
@@ -152,6 +201,10 @@ impl ShuffleWriterExec {
152201
let output_partitioning = self.shuffle_output_partitioning.clone();
153202
let plan = self.plan.clone();
154203

204+
let limit_and_accumulator = self
205+
.limit
206+
.map(|l| (l, get_limit_accumulator(&self.job_id, self.stage_id)));
207+
155208
async move {
156209
let now = Instant::now();
157210
let mut stream = plan.execute(input_partition, context)?;
@@ -170,6 +223,7 @@ impl ShuffleWriterExec {
170223
&mut stream,
171224
path,
172225
&write_metrics.write_time,
226+
limit_and_accumulator,
173227
)
174228
.await
175229
.map_err(|e| DataFusionError::Execution(format!("{e:?}")))?;
@@ -211,10 +265,26 @@ impl ShuffleWriterExec {
211265
write_metrics.repart_time.clone(),
212266
)?;
213267

214-
while let Some(result) = stream.next().await {
268+
while let Some(result) = {
269+
let poll_more = limit_and_accumulator.as_ref().map_or(
270+
true,
271+
|(limit, accum)| {
272+
let total_rows = accum.load(Ordering::SeqCst);
273+
total_rows < *limit
274+
},
275+
);
276+
277+
if poll_more {
278+
stream.next().await
279+
} else {
280+
None
281+
}
282+
} {
215283
let input_batch = result?;
216284

217-
write_metrics.input_rows.add(input_batch.num_rows());
285+
let num_rows = input_batch.num_rows();
286+
287+
write_metrics.input_rows.add(num_rows);
218288

219289
partitioner.partition(
220290
input_batch,
@@ -252,6 +322,13 @@ impl ShuffleWriterExec {
252322
Ok(())
253323
},
254324
)?;
325+
326+
if let Some((limit, accum)) = limit_and_accumulator.as_ref() {
327+
let total_rows = accum.fetch_add(num_rows, Ordering::SeqCst);
328+
if total_rows > *limit {
329+
break;
330+
}
331+
}
255332
}
256333

257334
let mut part_locs = vec![];
@@ -320,12 +397,13 @@ impl ExecutionPlan for ShuffleWriterExec {
320397
self: Arc<Self>,
321398
children: Vec<Arc<dyn ExecutionPlan>>,
322399
) -> Result<Arc<dyn ExecutionPlan>> {
323-
Ok(Arc::new(ShuffleWriterExec::try_new(
400+
Ok(Arc::new(ShuffleWriterExec::try_new_with_limit(
324401
self.job_id.clone(),
325402
self.stage_id,
326403
children[0].clone(),
327404
self.work_dir.clone(),
328405
self.shuffle_output_partitioning.clone(),
406+
self.limit,
329407
)?))
330408
}
331409

ballista/core/src/utils.rs

+22-1
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ use datafusion_proto::logical_plan::{
5050
};
5151
use futures::StreamExt;
5252
use log::error;
53+
use log::info;
5354
#[cfg(feature = "s3")]
5455
use object_store::aws::AmazonS3Builder;
5556
#[cfg(feature = "azure")]
@@ -145,6 +146,7 @@ pub async fn write_stream_to_disk(
145146
stream: &mut Pin<Box<dyn RecordBatchStream + Send>>,
146147
path: &str,
147148
disk_write_metric: &metrics::Time,
149+
limit: Option<(usize, Arc<AtomicUsize>)>,
148150
) -> Result<PartitionStats> {
149151
let file = File::create(path).map_err(|e| {
150152
error!("Failed to create partition file at {}: {:?}", path, e);
@@ -156,7 +158,18 @@ pub async fn write_stream_to_disk(
156158
let mut num_bytes = 0;
157159
let mut writer = FileWriter::try_new(file, stream.schema().as_ref())?;
158160

159-
while let Some(result) = stream.next().await {
161+
while let Some(result) = {
162+
let poll_more = limit.as_ref().map_or(true, |(limit, accum)| {
163+
let total_rows = accum.load(Ordering::SeqCst);
164+
total_rows < *limit
165+
});
166+
167+
if poll_more {
168+
stream.next().await
169+
} else {
170+
None
171+
}
172+
} {
160173
let batch = result?;
161174

162175
let batch_size_bytes: usize = batch_byte_size(&batch);
@@ -167,6 +180,14 @@ pub async fn write_stream_to_disk(
167180
let timer = disk_write_metric.timer();
168181
writer.write(&batch)?;
169182
timer.done();
183+
184+
if let Some((limit, accum)) = limit.as_ref() {
185+
let total_rows = accum.fetch_add(batch.num_rows(), Ordering::SeqCst);
186+
if total_rows >= *limit {
187+
info!("stopping shuffle write early (path: {})", path);
188+
break;
189+
}
190+
}
170191
}
171192
let timer = disk_write_metric.timer();
172193
writer.finish()?;

ballista/executor/src/executor.rs

+2-1
Original file line numberDiff line numberDiff line change
@@ -155,12 +155,13 @@ impl Executor {
155155
plan.as_any().downcast_ref::<ShuffleWriterExec>()
156156
{
157157
// recreate the shuffle writer with the correct working directory
158-
ShuffleWriterExec::try_new(
158+
ShuffleWriterExec::try_new_with_limit(
159159
job_id,
160160
stage_id,
161161
plan.children()[0].clone(),
162162
self.work_dir.clone(),
163163
shuffle_writer.shuffle_output_partitioning().cloned(),
164+
shuffle_writer.limit(),
164165
)
165166
} else {
166167
Err(DataFusionError::Internal(

ballista/scheduler/src/planner.rs

+21-7
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ use datafusion::physical_plan::{
3333
with_new_children_if_necessary, ExecutionPlan, Partitioning,
3434
};
3535

36+
use datafusion::physical_plan::limit::LocalLimitExec;
3637
use log::{debug, info};
3738

3839
type PartialQueryStageResult = (Arc<dyn ExecutionPlan>, Vec<Arc<ShuffleWriterExec>>);
@@ -288,13 +289,26 @@ fn create_shuffle_writer(
288289
plan: Arc<dyn ExecutionPlan>,
289290
partitioning: Option<Partitioning>,
290291
) -> Result<Arc<ShuffleWriterExec>> {
291-
Ok(Arc::new(ShuffleWriterExec::try_new(
292-
job_id.to_owned(),
293-
stage_id,
294-
plan,
295-
"".to_owned(), // executor will decide on the work_dir path
296-
partitioning,
297-
)?))
292+
if let Some(local_limit_exec) = plan.as_any().downcast_ref::<LocalLimitExec>() {
293+
// This doesn't really capture all cases where we would want to do this but should work for the
294+
// most basic case we care about.
295+
Ok(Arc::new(ShuffleWriterExec::try_new_with_limit(
296+
job_id.to_owned(),
297+
stage_id,
298+
plan.clone(),
299+
"".to_owned(), // executor will decide on the work_dir path
300+
partitioning,
301+
Some(local_limit_exec.fetch()),
302+
)?))
303+
} else {
304+
Ok(Arc::new(ShuffleWriterExec::try_new(
305+
job_id.to_owned(),
306+
stage_id,
307+
plan,
308+
"".to_owned(), // executor will decide on the work_dir path
309+
partitioning,
310+
)?))
311+
}
298312
}
299313

300314
#[cfg(test)]

0 commit comments

Comments
 (0)