Skip to content

Commit 53cdc2b

Browse files
committed
sc-5792: wrap pending queue counter into Arc
1 parent 0fdc8b8 commit 53cdc2b

File tree

1 file changed

+2
-17
lines changed

1 file changed

+2
-17
lines changed

ballista/rust/scheduler/src/state/task_manager.rs

+2-17
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@ use tonic::transport::Channel;
5050
type ExecutorClients = Arc<RwLock<HashMap<String, ExecutorGrpcClient<Channel>>>>;
5151
type ExecutionGraphCache = Arc<RwLock<HashMap<String, Arc<RwLock<ExecutionGraph>>>>>;
5252

53+
#[derive(Clone)]
5354
pub struct TaskManager<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> {
5455
state: Arc<dyn StateBackendClient>,
5556
#[allow(dead_code)]
@@ -62,22 +63,6 @@ pub struct TaskManager<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan>
6263
pending_task_queue_size: Arc<AtomicUsize>,
6364
}
6465

65-
impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> Clone
66-
for TaskManager<T, U>
67-
{
68-
fn clone(&self) -> Self {
69-
Self {
70-
state: self.state.clone(),
71-
clients: self.clients.clone(),
72-
session_builder: self.session_builder,
73-
codec: self.codec.clone(),
74-
scheduler_id: self.scheduler_id.clone(),
75-
active_job_cache: self.active_job_cache.clone(),
76-
pending_task_queue_size: AtomicUsize::new(self.get_pending_task_queue_size()),
77-
}
78-
}
79-
}
80-
8166
impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U> {
8267
pub fn new(
8368
state: Arc<dyn StateBackendClient>,
@@ -92,7 +77,7 @@ impl<T: 'static + AsLogicalPlan, U: 'static + AsExecutionPlan> TaskManager<T, U>
9277
codec,
9378
scheduler_id,
9479
active_job_cache: Arc::new(RwLock::new(HashMap::new())),
95-
pending_task_queue_size: AtomicUsize::new(0),
80+
pending_task_queue_size: Arc::new(AtomicUsize::new(0)),
9681
}
9782
}
9883

0 commit comments

Comments
 (0)