Skip to content

Commit

Permalink
Merge remote-tracking branch 'apache/master' into alamb/test_ballista
Browse files Browse the repository at this point in the history
  • Loading branch information
alamb committed Feb 16, 2022
2 parents eb09799 + 407adc0 commit 8f9c195
Show file tree
Hide file tree
Showing 12 changed files with 1,027 additions and 711 deletions.
19 changes: 11 additions & 8 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -848,13 +848,16 @@ message ColumnStats {
uint32 distinct_count = 4;
}

// Used by scheduler
message ExecutorMetadata {
string id = 1;
string host = 2;
uint32 port = 3;
uint32 grpc_port = 4;
ExecutorSpecification specification = 5;
}

// Used by grpc
message ExecutorRegistration {
string id = 1;
// "optional" keyword is stable in protoc 3.15 but prost is still on 3.14 (see https://github.com/tokio-rs/prost/issues/430 and https://github.com/tokio-rs/prost/pull/455)
Expand All @@ -864,10 +867,11 @@ message ExecutorRegistration {
}
uint32 port = 3;
uint32 grpc_port = 4;
ExecutorSpecification specification = 5;
}

message ExecutorHeartbeat {
ExecutorMetadata meta = 1;
string executor_id = 1;
// Unix epoch-based timestamp in seconds
uint64 timestamp = 2;
ExecutorState state = 3;
Expand Down Expand Up @@ -929,7 +933,7 @@ message ShuffleWritePartition {
}

message TaskStatus {
PartitionId partition_id = 1;
PartitionId task_id = 1;
oneof status {
RunningTask running = 2;
FailedTask failed = 3;
Expand Down Expand Up @@ -957,19 +961,18 @@ message PollWorkResult {

message RegisterExecutorParams {
ExecutorRegistration metadata = 1;
ExecutorSpecification specification = 2;
}

message RegisterExecutorResult {
bool success = 1;
}

message SendHeartBeatParams {
ExecutorRegistration metadata = 1;
message HeartBeatParams {
string executor_id = 1;
ExecutorState state = 2;
}

message SendHeartBeatResult {
message HeartBeatResult {
// TODO it's from Spark for BlockManager
bool reregister = 1;
}
Expand All @@ -981,7 +984,7 @@ message StopExecutorResult {
}

message UpdateTaskStatusParams {
ExecutorRegistration metadata = 1;
string executor_id = 1;
// All tasks must be reported until they reach the failed or completed state
repeated TaskStatus task_status = 2;
}
Expand Down Expand Up @@ -1067,7 +1070,7 @@ service SchedulerGrpc {

// Push-based task scheduler will only leverage this interface
// rather than the PollWork interface to report executor states
rpc SendHeartBeat (SendHeartBeatParams) returns (SendHeartBeatResult) {}
rpc HeartBeatFromExecutor (HeartBeatParams) returns (HeartBeatResult) {}

rpc UpdateTaskStatus (UpdateTaskStatusParams) returns (UpdateTaskStatusResult) {}

Expand Down
18 changes: 12 additions & 6 deletions ballista/rust/core/src/serde/scheduler/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,43 +66,47 @@ impl PartitionId {
#[derive(Debug, Clone)]
pub struct PartitionLocation {
pub partition_id: PartitionId,
pub executor_meta: ExecutorMeta,
pub executor_meta: ExecutorMetadata,
pub partition_stats: PartitionStats,
pub path: String,
}

/// Meta-data for an executor, used when fetching shuffle partitions from other executors
#[derive(Debug, Clone, PartialEq, Eq, Serialize)]
pub struct ExecutorMeta {
pub struct ExecutorMetadata {
pub id: String,
pub host: String,
pub port: u16,
pub grpc_port: u16,
pub specification: ExecutorSpecification,
}

#[allow(clippy::from_over_into)]
impl Into<protobuf::ExecutorMetadata> for ExecutorMeta {
impl Into<protobuf::ExecutorMetadata> for ExecutorMetadata {
fn into(self) -> protobuf::ExecutorMetadata {
protobuf::ExecutorMetadata {
id: self.id,
host: self.host,
port: self.port as u32,
grpc_port: self.grpc_port as u32,
specification: Some(self.specification.into()),
}
}
}

impl From<protobuf::ExecutorMetadata> for ExecutorMeta {
impl From<protobuf::ExecutorMetadata> for ExecutorMetadata {
fn from(meta: protobuf::ExecutorMetadata) -> Self {
Self {
id: meta.id,
host: meta.host,
port: meta.port as u16,
grpc_port: meta.grpc_port as u16,
specification: meta.specification.unwrap().into(),
}
}
}

/// Specification of an executor, indicting executor resources, like total task slots
#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize)]
pub struct ExecutorSpecification {
pub task_slots: u32,
Expand Down Expand Up @@ -136,6 +140,7 @@ impl From<protobuf::ExecutorSpecification> for ExecutorSpecification {
}
}

/// From Spark, available resources for an executor, like available task slots
#[derive(Debug, Clone, Serialize)]
pub struct ExecutorData {
pub executor_id: String,
Expand Down Expand Up @@ -204,6 +209,7 @@ impl From<protobuf::ExecutorData> for ExecutorData {
}
}

/// The internal state of an executor, like cpu usage, memory usage, etc
#[derive(Debug, Clone, Copy, Serialize)]
pub struct ExecutorState {
// in bytes
Expand Down Expand Up @@ -359,7 +365,7 @@ pub struct ExecutePartition {
/// The physical plan for this query stage
pub plan: Arc<dyn ExecutionPlan>,
/// Location of shuffle partitions that this query stage may depend on
pub shuffle_locations: HashMap<PartitionId, ExecutorMeta>,
pub shuffle_locations: HashMap<PartitionId, ExecutorMetadata>,
/// Output partitioning for shuffle writes
pub output_partitioning: Option<Partitioning>,
}
Expand All @@ -370,7 +376,7 @@ impl ExecutePartition {
stage_id: usize,
partition_id: Vec<usize>,
plan: Arc<dyn ExecutionPlan>,
shuffle_locations: HashMap<PartitionId, ExecutorMeta>,
shuffle_locations: HashMap<PartitionId, ExecutorMetadata>,
output_partitioning: Option<Partitioning>,
) -> Self {
Self {
Expand Down
20 changes: 12 additions & 8 deletions ballista/rust/executor/src/execution_loop.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use datafusion::physical_plan::ExecutionPlan;
use log::{debug, error, info, warn};
use tonic::transport::Channel;

use ballista_core::serde::protobuf::ExecutorRegistration;
use ballista_core::serde::protobuf::{
scheduler_grpc_client::SchedulerGrpcClient, PollWorkParams, PollWorkResult,
TaskDefinition, TaskStatus,
Expand All @@ -33,16 +32,23 @@ use crate::as_task_status;
use crate::executor::Executor;
use ballista_core::error::BallistaError;
use ballista_core::serde::physical_plan::from_proto::parse_protobuf_hash_partitioning;
use ballista_core::serde::scheduler::ExecutorSpecification;
use ballista_core::serde::{AsExecutionPlan, AsLogicalPlan, BallistaCodec};

pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
mut scheduler: SchedulerGrpcClient<Channel>,
executor: Arc<Executor>,
executor_meta: ExecutorRegistration,
concurrent_tasks: usize,
codec: BallistaCodec<T, U>,
) {
let available_tasks_slots = Arc::new(AtomicUsize::new(concurrent_tasks));
let executor_specification: ExecutorSpecification = executor
.metadata
.specification
.as_ref()
.unwrap()
.clone()
.into();
let available_tasks_slots =
Arc::new(AtomicUsize::new(executor_specification.task_slots as usize));
let (task_status_sender, mut task_status_receiver) =
std::sync::mpsc::channel::<TaskStatus>();

Expand All @@ -61,7 +67,7 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
tonic::Status,
> = scheduler
.poll_work(PollWorkParams {
metadata: Some(executor_meta.clone()),
metadata: Some(executor.metadata.clone()),
can_accept_task: available_tasks_slots.load(Ordering::SeqCst) > 0,
task_status,
})
Expand All @@ -74,7 +80,6 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
if let Some(task) = result.into_inner().task {
match run_received_tasks(
executor.clone(),
executor_meta.id.clone(),
available_tasks_slots.clone(),
task_status_sender,
task,
Expand Down Expand Up @@ -106,7 +111,6 @@ pub async fn poll_loop<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>

async fn run_received_tasks<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>(
executor: Arc<Executor>,
executor_id: String,
available_tasks_slots: Arc<AtomicUsize>,
task_status_sender: Sender<TaskStatus>,
task: TaskDefinition,
Expand Down Expand Up @@ -146,7 +150,7 @@ async fn run_received_tasks<T: 'static + AsLogicalPlan, U: 'static + AsExecution
available_tasks_slots.fetch_add(1, Ordering::SeqCst);
let _ = task_status_sender.send(as_task_status(
execution_result,
executor_id,
executor.metadata.id.clone(),
task_id,
));
});
Expand Down
24 changes: 8 additions & 16 deletions ballista/rust/executor/src/executor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ use std::sync::Arc;
use ballista_core::error::BallistaError;
use ballista_core::execution_plans::ShuffleWriterExec;
use ballista_core::serde::protobuf;
use ballista_core::serde::scheduler::ExecutorSpecification;
use ballista_core::serde::protobuf::ExecutorRegistration;
use datafusion::error::DataFusionError;
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::physical_plan::display::DisplayableExecutionPlan;
Expand All @@ -31,34 +31,26 @@ use datafusion::prelude::{ExecutionConfig, ExecutionContext};

/// Ballista executor
pub struct Executor {
/// Directory for storing partial results
work_dir: String,
/// Metadata
pub metadata: ExecutorRegistration,

/// Specification like total task slots
pub specification: ExecutorSpecification,
/// Directory for storing partial results
pub work_dir: String,

/// DataFusion execution context
pub ctx: Arc<ExecutionContext>,
}

impl Executor {
/// Create a new executor instance
pub fn new(work_dir: &str, ctx: Arc<ExecutionContext>) -> Self {
Executor::new_with_specification(
work_dir,
ExecutorSpecification { task_slots: 4 },
ctx,
)
}

pub fn new_with_specification(
pub fn new(
metadata: ExecutorRegistration,
work_dir: &str,
specification: ExecutorSpecification,
ctx: Arc<ExecutionContext>,
) -> Self {
Self {
metadata,
work_dir: work_dir.to_owned(),
specification,
ctx,
}
}
Expand Down
Loading

0 comments on commit 8f9c195

Please sign in to comment.