From aa480316497f1a1728c1064d0462f8f54de3b554 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 17 Oct 2022 18:19:23 +0200 Subject: [PATCH 1/8] Check whether we can accept tasks before polling for work --- ballista/executor/src/execution_loop.rs | 10 +++++++++- 1 file changed, 9 insertions(+), 1 deletion(-) diff --git a/ballista/executor/src/execution_loop.rs b/ballista/executor/src/execution_loop.rs index 3afe9b915..fe2c07311 100644 --- a/ballista/executor/src/execution_loop.rs +++ b/ballista/executor/src/execution_loop.rs @@ -70,13 +70,21 @@ pub async fn poll_loop // to avoid going in sleep mode between polling let mut active_job = false; + let can_accept_task = available_tasks_slots.load(Ordering::SeqCst) > 0; + + // Don't poll for work if we can not accept any tasks + if !can_accept_task { + tokio::time::sleep(Duration::from_millis(1)).await; + continue; + } + let poll_work_result: anyhow::Result< tonic::Response, tonic::Status, > = scheduler .poll_work(PollWorkParams { metadata: Some(executor.metadata.clone()), - can_accept_task: available_tasks_slots.load(Ordering::SeqCst) > 0, + can_accept_task: can_accept_task, task_status, }) .await; From 434ea38b6f2c07a7b6a5bfd3150f40956fb577bb Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 17 Oct 2022 20:12:21 +0200 Subject: [PATCH 2/8] Fix order --- ballista/executor/src/execution_loop.rs | 8 +++++--- 1 file changed, 5 insertions(+), 3 deletions(-) diff --git a/ballista/executor/src/execution_loop.rs b/ballista/executor/src/execution_loop.rs index fe2c07311..9403990b7 100644 --- a/ballista/executor/src/execution_loop.rs +++ b/ballista/executor/src/execution_loop.rs @@ -63,8 +63,6 @@ pub async fn poll_loop info!("Starting poll work loop with scheduler"); loop { - let task_status: Vec = - sample_tasks_status(&mut task_status_receiver).await; // Keeps track of whether we received task in last iteration // to avoid going in sleep mode between polling @@ -78,13 +76,16 @@ pub async fn poll_loop continue; } + let task_status: Vec = + sample_tasks_status(&mut task_status_receiver).await; + let poll_work_result: anyhow::Result< tonic::Response, tonic::Status, > = scheduler .poll_work(PollWorkParams { metadata: Some(executor.metadata.clone()), - can_accept_task: can_accept_task, + can_accept_task, task_status, }) .await; @@ -119,6 +120,7 @@ pub async fn poll_loop warn!("Executor poll work loop failed. If this continues to happen the Scheduler might be marked as dead. Error: {}", error); } } + if !active_job { tokio::time::sleep(Duration::from_millis(100)).await; } From e9cf471d3083c4c656f4b2783c6175ef012fa2b3 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 17 Oct 2022 20:30:49 +0200 Subject: [PATCH 3/8] Fmt --- ballista/executor/src/execution_loop.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/ballista/executor/src/execution_loop.rs b/ballista/executor/src/execution_loop.rs index 9403990b7..65d2dfcb2 100644 --- a/ballista/executor/src/execution_loop.rs +++ b/ballista/executor/src/execution_loop.rs @@ -63,7 +63,6 @@ pub async fn poll_loop info!("Starting poll work loop with scheduler"); loop { - // Keeps track of whether we received task in last iteration // to avoid going in sleep mode between polling let mut active_job = false; From 33e0eb597a2be2410127157426f074720050378e Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 17 Oct 2022 21:03:35 +0200 Subject: [PATCH 4/8] Simplify loop --- ballista/executor/src/execution_loop.rs | 15 ++++----------- 1 file changed, 4 insertions(+), 11 deletions(-) diff --git a/ballista/executor/src/execution_loop.rs b/ballista/executor/src/execution_loop.rs index 65d2dfcb2..37c3ef3a2 100644 --- a/ballista/executor/src/execution_loop.rs +++ b/ballista/executor/src/execution_loop.rs @@ -63,10 +63,6 @@ pub async fn poll_loop info!("Starting poll work loop with scheduler"); loop { - // Keeps track of whether we received task in last iteration - // to avoid going in sleep mode between polling - let mut active_job = false; - let can_accept_task = available_tasks_slots.load(Ordering::SeqCst) > 0; // Don't poll for work if we can not accept any tasks @@ -103,16 +99,11 @@ pub async fn poll_loop ) .await { - Ok(_) => { - active_job = true; - } + Ok(_) => {} Err(e) => { warn!("Failed to run task: {:?}", e); - active_job = false; } } - } else { - active_job = false; } } Err(error) => { @@ -120,7 +111,9 @@ pub async fn poll_loop } } - if !active_job { + if available_tasks_slots.load(Ordering::SeqCst) + == executor_specification.task_slots as usize + { tokio::time::sleep(Duration::from_millis(100)).await; } } From 980c2533f6d19c9edd53687262d5a0645bfdc6d7 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 17 Oct 2022 22:14:05 +0200 Subject: [PATCH 5/8] Fix latest commit --- ballista/executor/src/execution_loop.rs | 17 ++++++++++++----- 1 file changed, 12 insertions(+), 5 deletions(-) diff --git a/ballista/executor/src/execution_loop.rs b/ballista/executor/src/execution_loop.rs index 37c3ef3a2..3ce65b436 100644 --- a/ballista/executor/src/execution_loop.rs +++ b/ballista/executor/src/execution_loop.rs @@ -63,6 +63,10 @@ pub async fn poll_loop info!("Starting poll work loop with scheduler"); loop { + // Keeps track of whether we received task in last iteration + // to avoid going in sleep mode between polling + let mut active_job = false; + let can_accept_task = available_tasks_slots.load(Ordering::SeqCst) > 0; // Don't poll for work if we can not accept any tasks @@ -99,11 +103,16 @@ pub async fn poll_loop ) .await { - Ok(_) => {} + Ok(_) => { + active_job = true; + } Err(e) => { warn!("Failed to run task: {:?}", e); + active_job = false; } } + } else { + active_job = false; } } Err(error) => { @@ -111,9 +120,7 @@ pub async fn poll_loop } } - if available_tasks_slots.load(Ordering::SeqCst) - == executor_specification.task_slots as usize - { + if !active_job { tokio::time::sleep(Duration::from_millis(100)).await; } } @@ -281,4 +288,4 @@ async fn sample_tasks_status( } task_status -} +} \ No newline at end of file From b218937255581a4b5a8131f082f8f5f21a6e4f37 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 17 Oct 2022 22:14:31 +0200 Subject: [PATCH 6/8] Fix latest commit --- ballista/executor/src/execution_loop.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/ballista/executor/src/execution_loop.rs b/ballista/executor/src/execution_loop.rs index 3ce65b436..65d2dfcb2 100644 --- a/ballista/executor/src/execution_loop.rs +++ b/ballista/executor/src/execution_loop.rs @@ -288,4 +288,4 @@ async fn sample_tasks_status( } task_status -} \ No newline at end of file +} From b729cb2fa0adce0aeed422c62e054c05c966f49d Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 17 Oct 2022 22:20:59 +0200 Subject: [PATCH 7/8] Fix latest commit --- ballista/executor/src/execution_loop.rs | 2 ++ 1 file changed, 2 insertions(+) diff --git a/ballista/executor/src/execution_loop.rs b/ballista/executor/src/execution_loop.rs index 4e6264637..f19da7f1b 100644 --- a/ballista/executor/src/execution_loop.rs +++ b/ballista/executor/src/execution_loop.rs @@ -110,6 +110,8 @@ pub async fn poll_loop warn!("Failed to run task: {:?}", e); } } + } else { + active_job = false } } Err(error) => { From 299534466a61c55ced3cb7793060338ac933d680 Mon Sep 17 00:00:00 2001 From: "Heres, Daniel" Date: Mon, 17 Oct 2022 22:21:13 +0200 Subject: [PATCH 8/8] Fix latest commit --- ballista/executor/src/execution_loop.rs | 1 + 1 file changed, 1 insertion(+) diff --git a/ballista/executor/src/execution_loop.rs b/ballista/executor/src/execution_loop.rs index f19da7f1b..e6ad31094 100644 --- a/ballista/executor/src/execution_loop.rs +++ b/ballista/executor/src/execution_loop.rs @@ -108,6 +108,7 @@ pub async fn poll_loop } Err(e) => { warn!("Failed to run task: {:?}", e); + active_job = false; } } } else {