-
Notifications
You must be signed in to change notification settings - Fork 1.4k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
ShuffleReaderExec now supports multiple locations per partition #541
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Original file line number | Diff line number | Diff line change |
---|---|---|
|
@@ -15,42 +15,43 @@ | |
// specific language governing permissions and limitations | ||
// under the License. | ||
|
||
use std::fmt::Formatter; | ||
use std::sync::Arc; | ||
use std::{any::Any, pin::Pin}; | ||
|
||
use crate::client::BallistaClient; | ||
use crate::memory_stream::MemoryStream; | ||
use crate::serde::scheduler::PartitionLocation; | ||
|
||
use crate::utils::WrappedStream; | ||
use async_trait::async_trait; | ||
use datafusion::arrow::datatypes::SchemaRef; | ||
use datafusion::arrow::error::Result as ArrowResult; | ||
use datafusion::arrow::record_batch::RecordBatch; | ||
use datafusion::physical_plan::{DisplayFormatType, ExecutionPlan, Partitioning}; | ||
use datafusion::{ | ||
error::{DataFusionError, Result}, | ||
physical_plan::RecordBatchStream, | ||
}; | ||
use futures::{future, Stream, StreamExt}; | ||
use log::info; | ||
use std::fmt::Formatter; | ||
|
||
/// ShuffleReaderExec reads partitions that have already been materialized by an executor. | ||
/// ShuffleReaderExec reads partitions that have already been materialized by a query stage | ||
/// being executed by an executor | ||
#[derive(Debug, Clone)] | ||
pub struct ShuffleReaderExec { | ||
// The query stage that is responsible for producing the shuffle partitions that | ||
// this operator will read | ||
pub(crate) partition_location: Vec<PartitionLocation>, | ||
/// Each partition of a shuffle can read data from multiple locations | ||
pub(crate) partition: Vec<Vec<PartitionLocation>>, | ||
pub(crate) schema: SchemaRef, | ||
} | ||
|
||
impl ShuffleReaderExec { | ||
/// Create a new ShuffleReaderExec | ||
pub fn try_new( | ||
partition_meta: Vec<PartitionLocation>, | ||
partition: Vec<Vec<PartitionLocation>>, | ||
schema: SchemaRef, | ||
) -> Result<Self> { | ||
Ok(Self { | ||
partition_location: partition_meta, | ||
schema, | ||
}) | ||
Ok(Self { partition, schema }) | ||
} | ||
} | ||
|
||
|
@@ -65,7 +66,7 @@ impl ExecutionPlan for ShuffleReaderExec { | |
} | ||
|
||
fn output_partitioning(&self) -> Partitioning { | ||
Partitioning::UnknownPartitioning(self.partition_location.len()) | ||
Partitioning::UnknownPartitioning(self.partition.len()) | ||
} | ||
|
||
fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> { | ||
|
@@ -86,23 +87,18 @@ impl ExecutionPlan for ShuffleReaderExec { | |
partition: usize, | ||
) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> { | ||
info!("ShuffleReaderExec::execute({})", partition); | ||
let partition_location = &self.partition_location[partition]; | ||
|
||
let mut client = BallistaClient::try_new( | ||
&partition_location.executor_meta.host, | ||
partition_location.executor_meta.port, | ||
) | ||
.await | ||
.map_err(|e| DataFusionError::Execution(format!("Ballista Error: {:?}", e)))?; | ||
|
||
client | ||
.fetch_partition( | ||
&partition_location.partition_id.job_id, | ||
partition_location.partition_id.stage_id, | ||
partition, | ||
) | ||
let partition_locations = &self.partition[partition]; | ||
let result = future::join_all(partition_locations.iter().map(fetch_partition)) | ||
.await | ||
.map_err(|e| DataFusionError::Execution(format!("Ballista Error: {:?}", e))) | ||
.into_iter() | ||
.collect::<Result<Vec<_>>>()?; | ||
|
||
let result = WrappedStream::new( | ||
Box::pin(futures::stream::iter(result).flatten()), | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. this is a clever way of flattening the streams (though I wonder if it will serialize them all (aka not start reading from the second until the first is entirely consumed)? There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, that's exactly what it does. There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Once the basic shuffle mechanism is implemented, there will be a lot of optimization work to follow |
||
Arc::new(self.schema.as_ref().clone()), | ||
); | ||
Ok(Box::pin(result)) | ||
} | ||
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is the main change |
||
|
||
fn fmt_as( | ||
|
@@ -113,22 +109,46 @@ impl ExecutionPlan for ShuffleReaderExec { | |
match t { | ||
DisplayFormatType::Default => { | ||
let loc_str = self | ||
.partition_location | ||
.partition | ||
.iter() | ||
.map(|l| { | ||
format!( | ||
"[executor={} part={}:{}:{} stats={:?}]", | ||
l.executor_meta.id, | ||
l.partition_id.job_id, | ||
l.partition_id.stage_id, | ||
l.partition_id.partition_id, | ||
l.partition_stats | ||
) | ||
.map(|x| { | ||
x.iter() | ||
.map(|l| { | ||
format!( | ||
"[executor={} part={}:{}:{} stats={:?}]", | ||
l.executor_meta.id, | ||
l.partition_id.job_id, | ||
l.partition_id.stage_id, | ||
l.partition_id.partition_id, | ||
l.partition_stats | ||
) | ||
}) | ||
.collect::<Vec<String>>() | ||
.join(",") | ||
}) | ||
.collect::<Vec<String>>() | ||
.join(","); | ||
.join("\n"); | ||
write!(f, "ShuffleReaderExec: partition_locations={}", loc_str) | ||
} | ||
} | ||
} | ||
} | ||
|
||
async fn fetch_partition( | ||
location: &PartitionLocation, | ||
) -> Result<Pin<Box<dyn RecordBatchStream + Send + Sync>>> { | ||
let metadata = &location.executor_meta; | ||
let partition_id = &location.partition_id; | ||
let mut ballista_client = | ||
BallistaClient::try_new(metadata.host.as_str(), metadata.port as u16) | ||
.await | ||
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?; | ||
Ok(ballista_client | ||
.fetch_partition( | ||
&partition_id.job_id, | ||
partition_id.stage_id as usize, | ||
partition_id.partition_id as usize, | ||
) | ||
.await | ||
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?) | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is moved to ballista-core utils
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a cool abstraction -- i have had need of something similar elsewhere -- perhaps it would be good to move to datafusion itself eventually