Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add BatchPartitioner (#2285) #2287

Merged
merged 3 commits into from
Apr 20, 2022
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Add BatchPartitioner (#2285)
  • Loading branch information
tustvold committed Apr 20, 2022
commit e453d2820970c2fc0f04691adb1a8214aafe09d2
109 changes: 41 additions & 68 deletions ballista/rust/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -34,16 +34,13 @@ use crate::serde::protobuf::ShuffleWritePartition;
use crate::serde::scheduler::PartitionStats;
use async_trait::async_trait;
use datafusion::arrow::array::{
Array, ArrayBuilder, ArrayRef, StringBuilder, StructBuilder, UInt32Builder,
UInt64Builder,
ArrayBuilder, ArrayRef, StringBuilder, StructBuilder, UInt32Builder, UInt64Builder,
};
use datafusion::arrow::compute::take;
use datafusion::arrow::datatypes::{DataType, Field, Schema, SchemaRef};

use datafusion::arrow::record_batch::RecordBatch;
use datafusion::error::{DataFusionError, Result};
use datafusion::physical_plan::common::IPCWriter;
use datafusion::physical_plan::hash_utils::create_hashes;
use datafusion::physical_plan::memory::MemoryStream;
use datafusion::physical_plan::metrics::{
self, ExecutionPlanMetricsSet, MetricBuilder, MetricsSet,
Expand All @@ -55,6 +52,7 @@ use datafusion::physical_plan::{
use futures::StreamExt;

use datafusion::execution::context::TaskContext;
use datafusion::physical_plan::repartition::BatchPartitioner;
use log::{debug, info};

/// ShuffleWriterExec represents a section of a query plan that has consistent partitioning and
Expand All @@ -81,20 +79,24 @@ pub struct ShuffleWriterExec {
struct ShuffleWriteMetrics {
/// Time spend writing batches to shuffle files
write_time: metrics::Time,
repart_time: metrics::Time,
input_rows: metrics::Count,
output_rows: metrics::Count,
}

impl ShuffleWriteMetrics {
fn new(partition: usize, metrics: &ExecutionPlanMetricsSet) -> Self {
let write_time = MetricBuilder::new(metrics).subset_time("write_time", partition);
let repart_time =
MetricBuilder::new(metrics).subset_time("repart_time", partition);

let input_rows = MetricBuilder::new(metrics).counter("input_rows", partition);

let output_rows = MetricBuilder::new(metrics).output_rows(partition);

Self {
write_time,
repart_time,
input_rows,
output_rows,
}
Expand Down Expand Up @@ -202,77 +204,48 @@ impl ShuffleWriterExec {
writers.push(None);
}

let hashes_buf = &mut vec![];
let random_state = ahash::RandomState::with_seeds(0, 0, 0, 0);
let mut partitioner = BatchPartitioner::new(
Partitioning::Hash(exprs.clone(), *n),
write_metrics.repart_time.clone(),
);

while let Some(result) = stream.next().await {
let input_batch = result?;

write_metrics.input_rows.add(input_batch.num_rows());

let arrays = exprs
.iter()
.map(|expr| {
Ok(expr
.evaluate(&input_batch)?
.into_array(input_batch.num_rows()))
})
.collect::<Result<Vec<_>>>()?;
hashes_buf.clear();
hashes_buf.resize(arrays[0].len(), 0);
// Hash arrays and compute buckets based on number of partitions
let hashes = create_hashes(&arrays, &random_state, hashes_buf)?;
let mut indices = vec![vec![]; num_output_partitions];
for (index, hash) in hashes.iter().enumerate() {
indices[(*hash % num_output_partitions as u64) as usize]
.push(index as u64)
}
for (output_partition, partition_indices) in
indices.into_iter().enumerate()
{
let indices = partition_indices.into();

// Produce batches based on indices
let columns = input_batch
.columns()
.iter()
.map(|c| {
take(c.as_ref(), &indices, None).map_err(|e| {
DataFusionError::Execution(e.to_string())
})
})
.collect::<Result<Vec<Arc<dyn Array>>>>()?;

let output_batch =
RecordBatch::try_new(input_batch.schema(), columns)?;

// write non-empty batch out

// TODO optimize so we don't write or fetch empty partitions
// if output_batch.num_rows() > 0 {
let timer = write_metrics.write_time.timer();
match &mut writers[output_partition] {
Some(w) => {
w.write(&output_batch)?;
}
None => {
let mut path = path.clone();
path.push(&format!("{}", output_partition));
std::fs::create_dir_all(&path)?;

path.push(format!("data-{}.arrow", input_partition));
info!("Writing results to {:?}", path);

let mut writer =
IPCWriter::new(&path, stream.schema().as_ref())?;

writer.write(&output_batch)?;
writers[output_partition] = Some(writer);
partitioner.partition(
input_batch,
|output_partition, output_batch| {
// write non-empty batch out

// TODO optimize so we don't write or fetch empty partitions
// if output_batch.num_rows() > 0 {
let timer = write_metrics.write_time.timer();
match &mut writers[output_partition] {
Some(w) => {
w.write(&output_batch)?;
}
None => {
let mut path = path.clone();
path.push(&format!("{}", output_partition));
std::fs::create_dir_all(&path)?;

path.push(format!("data-{}.arrow", input_partition));
info!("Writing results to {:?}", path);

let mut writer =
IPCWriter::new(&path, stream.schema().as_ref())?;

writer.write(&output_batch)?;
writers[output_partition] = Some(writer);
}
}
}
write_metrics.output_rows.add(output_batch.num_rows());
timer.done();
}
write_metrics.output_rows.add(output_batch.num_rows());
timer.done();
Ok(())
},
)?;
}

let mut part_locs = vec![];
Expand Down
5 changes: 5 additions & 0 deletions datafusion/core/src/physical_plan/metrics/value.rs
Original file line number Diff line number Diff line change
Expand Up @@ -300,6 +300,11 @@ impl<'a> ScopedTimerGuard<'a> {
}
}

/// Restarts the timer recording from the current time
pub fn restart(&mut self) {
self.start = Some(Instant::now())
}

/// Stop the timer, record the time taken and consume self
pub fn done(mut self) {
self.stop()
Expand Down
Loading