Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Task level retry and Stage level retry #261

Merged
merged 10 commits into from
Oct 2, 2022
165 changes: 128 additions & 37 deletions ballista/rust/core/proto/ballista.proto
Original file line number Diff line number Diff line change
Expand Up @@ -424,54 +424,87 @@ message ExecutionGraph {
uint64 output_partitions = 5;
repeated PartitionLocation output_locations = 6;
string scheduler_id = 7;
uint32 task_id_gen = 8;
repeated StageAttempts failed_attempts = 9;
}

message StageAttempts {
uint32 stage_id = 1;
repeated uint32 stage_attempt_num = 2;
}

message ExecutionGraphStage {
oneof StageType {
UnResolvedStage unresolved_stage = 1;
ResolvedStage resolved_stage = 2;
CompletedStage completed_stage = 3;
SuccessfulStage successful_stage = 3;
FailedStage failed_stage = 4;
}
}

message UnResolvedStage {
uint64 stage_id = 1;
uint32 stage_id = 1;
PhysicalHashRepartition output_partitioning = 2;
repeated uint32 output_links = 3;
repeated GraphStageInput inputs = 4;
bytes plan = 5;
uint32 stage_attempt_num = 6;
repeated string last_attempt_failure_reasons = 7;
}

message ResolvedStage {
uint64 stage_id = 1;
uint32 stage_id = 1;
uint32 partitions = 2;
PhysicalHashRepartition output_partitioning = 3;
repeated uint32 output_links = 4;
repeated GraphStageInput inputs = 5;
bytes plan = 6;
uint32 stage_attempt_num = 7;
repeated string last_attempt_failure_reasons = 8;
}

message CompletedStage {
uint64 stage_id = 1;
message SuccessfulStage {
uint32 stage_id = 1;
uint32 partitions = 2;
PhysicalHashRepartition output_partitioning = 3;
repeated uint32 output_links = 4;
repeated GraphStageInput inputs = 5;
bytes plan = 6;
repeated TaskStatus task_statuses = 7;
repeated TaskInfo task_infos = 7;
repeated OperatorMetricsSet stage_metrics = 8;
uint32 stage_attempt_num = 9;
}

message FailedStage {
uint64 stage_id = 1;
uint32 stage_id = 1;
uint32 partitions = 2;
PhysicalHashRepartition output_partitioning = 3;
repeated uint32 output_links = 4;
bytes plan = 5;
repeated TaskStatus task_statuses = 6;
repeated TaskInfo task_infos = 6;
repeated OperatorMetricsSet stage_metrics = 7;
string error_message = 8;
uint32 stage_attempt_num = 9;
}

message TaskInfo {
uint32 task_id = 1;
uint32 partition_id = 2;
// Scheduler schedule time
uint64 scheduled_time = 3;
// Scheduler launch time
uint64 launch_time = 4;
// The time the Executor start to run the task
uint64 start_exec_time = 5;
// The time the Executor finish the task
uint64 end_exec_time = 6;
// Scheduler side finish time
uint64 finish_time = 7;
oneof status {
RunningTask running = 8;
FailedTask failed = 9;
SuccessfulTask successful = 10;
}
}

message GraphStageInput {
Expand Down Expand Up @@ -521,12 +554,14 @@ message FetchPartition {
uint32 port = 6;
}

// Mapping from partition id to executor id
message PartitionLocation {
PartitionId partition_id = 1;
ExecutorMetadata executor_meta = 2;
PartitionStats partition_stats = 3;
string path = 4;
// partition_id of the map stage who produces the shuffle.
uint32 map_partition_id = 1;
// partition_id of the shuffle, a composition of(job_id + map_stage_id + partition_id).
PartitionId partition_id = 2;
ExecutorMetadata executor_meta = 3;
PartitionStats partition_stats = 4;
string path = 5;
}

// Unique identifier for a materialized partition of data
Expand All @@ -536,11 +571,10 @@ message PartitionId {
uint32 partition_id = 4;
}

// Multiple partitions in the same stage
message PartitionIds {
string job_id = 1;
uint32 stage_id = 2;
repeated uint32 partition_ids = 4;
message TaskId {
uint32 task_id = 1;
uint32 task_attempt_num = 2;
uint32 partition_id = 3;
}

message PartitionStats {
Expand Down Expand Up @@ -664,15 +698,48 @@ message RunningTask {

message FailedTask {
string error = 1;
bool retryable = 2;
// Whether this task failure should be counted to the maximum number of times the task is allowed to retry
bool count_to_failures = 3;
oneof failed_reason {
ExecutionError execution_error = 4;
FetchPartitionError fetch_partition_error = 5;
IOError io_error = 6;
ExecutorLost executor_lost = 7;
// A successful task's result is lost due to executor lost
ResultLost result_lost = 8;
TaskKilled task_killed = 9;
}
}

message CompletedTask {
message SuccessfulTask {
string executor_id = 1;
// TODO tasks are currently always shuffle writes but this will not always be the case
// so we might want to think about some refactoring of the task definitions
repeated ShuffleWritePartition partitions = 2;
}

message ExecutionError {
}

message FetchPartitionError {
string executor_id = 1;
uint32 map_stage_id = 2;
uint32 map_partition_id = 3;
}

message IOError {
}

message ExecutorLost {
}

message ResultLost {
}
Comment on lines +734 to +738
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand the difference between these two errors

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure I understand the difference between these two errors

Yes, good question. In the current code base, both the two errors are not used directly by the executor tasks.
They are used by the Scheduler. When we see a 'FetchPartitionError' task update from the reduce task, the related map task's status is changed to 'ResultLost'. Of cause most of the time ResultLost should be caused by ExecutorLost.


message TaskKilled {
}

message ShuffleWritePartition {
uint64 partition_id = 1;
string path = 2;
Expand All @@ -682,13 +749,20 @@ message ShuffleWritePartition {
}

message TaskStatus {
PartitionId task_id = 1;
uint32 task_id = 1;
string job_id = 2;
uint32 stage_id = 3;
uint32 stage_attempt_num = 4;
uint32 partition_id = 5;
uint64 launch_time = 6;
uint64 start_exec_time = 7;
uint64 end_exec_time = 8;
oneof status {
RunningTask running = 2;
FailedTask failed = 3;
CompletedTask completed = 4;
RunningTask running = 9;
FailedTask failed = 10;
SuccessfulTask successful = 11;
}
repeated OperatorMetricsSet metrics = 5;
repeated OperatorMetricsSet metrics = 12;
}

message PollWorkParams {
Expand All @@ -699,22 +773,32 @@ message PollWorkParams {
}

message TaskDefinition {
PartitionId task_id = 1;
bytes plan = 2;
uint32 task_id = 1;
uint32 task_attempt_num = 2;
string job_id = 3;
uint32 stage_id = 4;
uint32 stage_attempt_num = 5;
uint32 partition_id = 6;
bytes plan = 7;
// Output partition for shuffle writer
PhysicalHashRepartition output_partitioning = 3;
string session_id = 4;
repeated KeyValuePair props = 5;
PhysicalHashRepartition output_partitioning = 8;
string session_id = 9;
uint64 launch_time = 10;
repeated KeyValuePair props = 11;
}

// A set of tasks in the same stage
message MultiTaskDefinition {
PartitionIds task_ids = 1;
bytes plan = 2;
repeated TaskId task_ids = 1;
string job_id = 2;
uint32 stage_id = 3;
uint32 stage_attempt_num = 4;
bytes plan = 5;
// Output partition for shuffle writer
PhysicalHashRepartition output_partitioning = 3;
string session_id = 4;
repeated KeyValuePair props = 5;
PhysicalHashRepartition output_partitioning = 6;
string session_id = 7;
uint64 launch_time = 8;
repeated KeyValuePair props = 9;
}

message SessionSettings {
Expand Down Expand Up @@ -802,7 +886,7 @@ message GetJobStatusParams {
string job_id = 1;
}

message CompletedJob {
message SuccessfulJob {
repeated PartitionLocation partition_location = 1;
}

Expand All @@ -820,7 +904,7 @@ message JobStatus {
QueuedJob queued = 1;
RunningJob running = 2;
FailedJob failed = 3;
CompletedJob completed = 4;
SuccessfulJob successful = 4;
}
}

Expand Down Expand Up @@ -872,13 +956,20 @@ message LaunchMultiTaskResult {
}

message CancelTasksParams {
repeated PartitionId partition_id = 1;
repeated RunningTaskInfo task_infos = 1;
}

message CancelTasksResult {
bool cancelled = 1;
}

message RunningTaskInfo {
uint32 task_id = 1;
string job_id = 2;
uint32 stage_id = 3;
uint32 partition_id = 4;;
}

service SchedulerGrpc {
// Executors must poll the scheduler for heartbeat and to receive tasks
rpc PollWork (PollWorkParams) returns (PollWorkResult) {}
Expand Down
42 changes: 26 additions & 16 deletions ballista/rust/core/src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,8 @@ use std::{
task::{Context, Poll},
};

use crate::error::{ballista_error, BallistaError, Result};
use crate::serde::scheduler::Action;
use crate::error::{BallistaError, Result};
use crate::serde::scheduler::{Action, PartitionId};

use arrow_flight::utils::flight_data_to_arrow_batch;
use arrow_flight::Ticket;
Expand Down Expand Up @@ -62,7 +62,7 @@ impl BallistaClient {
create_grpc_client_connection(addr.clone())
.await
.map_err(|e| {
BallistaError::General(format!(
BallistaError::GrpcConnectionError(format!(
"Error connecting to Ballista scheduler or executor at {}: {:?}",
addr, e
))
Expand All @@ -76,22 +76,32 @@ impl BallistaClient {
/// Fetch a partition from an executor
pub async fn fetch_partition(
&mut self,
job_id: &str,
stage_id: usize,
partition_id: usize,
executor_id: &str,
partition_id: &PartitionId,
path: &str,
host: &str,
port: u16,
) -> Result<SendableRecordBatchStream> {
let action = Action::FetchPartition {
job_id: job_id.to_string(),
stage_id,
partition_id,
job_id: partition_id.job_id.clone(),
stage_id: partition_id.stage_id,
partition_id: partition_id.partition_id,
path: path.to_owned(),
host: host.to_string(),
host: host.to_owned(),
port,
};
self.execute_action(&action).await
self.execute_action(&action)
.await
.map_err(|error| match error {
// map grpc connection error to partition fetch error.
BallistaError::GrpcActionError(msg) => BallistaError::FetchFailed(
executor_id.to_owned(),
partition_id.stage_id,
partition_id.partition_id,
msg,
),
other => other,
})
}

/// Execute an action and retrieve the results
Expand All @@ -105,22 +115,22 @@ impl BallistaClient {

serialized_action
.encode(&mut buf)
.map_err(|e| BallistaError::General(format!("{:?}", e)))?;
.map_err(|e| BallistaError::GrpcActionError(format!("{:?}", e)))?;

let request = tonic::Request::new(Ticket { ticket: buf });

let mut stream = self
.flight_client
.do_get(request)
.await
.map_err(|e| BallistaError::General(format!("{:?}", e)))?
.map_err(|e| BallistaError::GrpcActionError(format!("{:?}", e)))?
.into_inner();

// the schema should be the first message returned, else client should error
match stream
.message()
.await
.map_err(|e| BallistaError::General(format!("{:?}", e)))?
.map_err(|e| BallistaError::GrpcActionError(format!("{:?}", e)))?
{
Some(flight_data) => {
// convert FlightData to a stream
Expand All @@ -129,8 +139,8 @@ impl BallistaClient {
// all the remaining stream messages should be dictionary and record batches
Ok(Box::pin(FlightDataStream::new(stream, schema)))
}
None => Err(ballista_error(
"Did not receive schema batch from flight server",
None => Err(BallistaError::GrpcActionError(
"Did not receive schema batch from flight server".to_string(),
)),
}
}
Expand Down
Loading