|
16 | 16 | // under the License.
|
17 | 17 |
|
18 | 18 | use ballista_core::BALLISTA_VERSION;
|
| 19 | +use futures::FutureExt; |
19 | 20 | use std::collections::HashMap;
|
20 | 21 | use std::convert::TryInto;
|
21 | 22 | use std::ops::Deref;
|
| 23 | +use std::panic::AssertUnwindSafe; |
22 | 24 | use std::sync::Arc;
|
23 | 25 | use std::time::{Duration, SystemTime, UNIX_EPOCH};
|
24 | 26 | use tokio::sync::{mpsc, RwLock};
|
@@ -52,6 +54,7 @@ use tokio::task::JoinHandle;
|
52 | 54 |
|
53 | 55 | use crate::as_task_status;
|
54 | 56 | use crate::cpu_bound_executor::DedicatedExecutor;
|
| 57 | +use crate::execution_loop::any_to_string; |
55 | 58 | use crate::executor::Executor;
|
56 | 59 | use crate::shutdown::ShutdownNotifier;
|
57 | 60 |
|
@@ -197,8 +200,6 @@ struct ExecutorEnv {
|
197 | 200 | tx_stop: mpsc::Sender<bool>,
|
198 | 201 | }
|
199 | 202 |
|
200 |
| -unsafe impl Sync for ExecutorEnv {} |
201 |
| - |
202 | 203 | impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T, U> {
|
203 | 204 | fn new(
|
204 | 205 | scheduler_to_register: SchedulerGrpcClient<Channel>,
|
@@ -350,17 +351,26 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorServer<T,
|
350 | 351 | plan,
|
351 | 352 | )?;
|
352 | 353 |
|
353 |
| - let execution_result = self |
354 |
| - .executor |
355 |
| - .execute_shuffle_write( |
| 354 | + let execution_result = |
| 355 | + match AssertUnwindSafe(self.executor.execute_shuffle_write( |
356 | 356 | task_id.job_id.clone(),
|
357 | 357 | task_id.stage_id as usize,
|
358 | 358 | task_id.partition_id as usize,
|
359 | 359 | shuffle_writer_plan.clone(),
|
360 | 360 | task_context,
|
361 | 361 | shuffle_output_partitioning,
|
362 |
| - ) |
363 |
| - .await; |
| 362 | + )) |
| 363 | + .catch_unwind() |
| 364 | + .await |
| 365 | + { |
| 366 | + Ok(Ok(r)) => Ok(r), |
| 367 | + Ok(Err(r)) => Err(r), |
| 368 | + Err(r) => { |
| 369 | + error!("Error executing task: {:?}", any_to_string(&r)); |
| 370 | + Err(BallistaError::Internal(format!("{:#?}", any_to_string(&r)))) |
| 371 | + } |
| 372 | + }; |
| 373 | + |
364 | 374 | info!("Done with task {}", task_id_log);
|
365 | 375 | debug!("Statistics: {:?}", execution_result);
|
366 | 376 |
|
@@ -606,13 +616,20 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> ExecutorGrpc
|
606 | 616 | } = request.into_inner();
|
607 | 617 | let task_sender = self.executor_env.tx_task.clone();
|
608 | 618 | for task in tasks {
|
609 |
| - task_sender |
| 619 | + if let Err(e) = task_sender |
610 | 620 | .send(CuratorTaskDefinition {
|
611 | 621 | scheduler_id: scheduler_id.clone(),
|
612 | 622 | task,
|
613 | 623 | })
|
614 | 624 | .await
|
615 |
| - .unwrap(); |
| 625 | + { |
| 626 | + let msg = format!( |
| 627 | + "Error launching tasks, failed to send to task channel: {:?}", |
| 628 | + e |
| 629 | + ); |
| 630 | + error!("{}", msg); |
| 631 | + return Err(Status::internal(msg)); |
| 632 | + } |
616 | 633 | }
|
617 | 634 | Ok(Response::new(LaunchTaskResult { success: true }))
|
618 | 635 | }
|
|
0 commit comments