From 0b9cf707820dca83fe5c6567300ab5a5e9db9f3e Mon Sep 17 00:00:00 2001 From: Lokesh Gopu Date: Wed, 27 Jul 2022 12:32:13 +0000 Subject: [PATCH 1/3] get messages with runner status --- src/Runner.Common/JobStatusEventArgs.cs | 14 ++++++++ src/Runner.Common/RunnerServer.cs | 6 ++-- src/Runner.Listener/JobDispatcher.cs | 13 ++++++++ src/Runner.Listener/MessageListener.cs | 33 ++++++++++++++++++- src/Runner.Listener/Runner.cs | 3 ++ src/Runner.Sdk/Util/VssUtil.cs | 2 +- .../Generated/TaskAgentHttpClientBase.cs | 5 +++ src/Sdk/DTWebApi/WebApi/TaskAgentStatus.cs | 3 ++ src/Test/L0/Listener/MessageListenerL0.cs | 10 +++--- 9 files changed, 79 insertions(+), 10 deletions(-) create mode 100644 src/Runner.Common/JobStatusEventArgs.cs diff --git a/src/Runner.Common/JobStatusEventArgs.cs b/src/Runner.Common/JobStatusEventArgs.cs new file mode 100644 index 00000000000..3e59fbf003f --- /dev/null +++ b/src/Runner.Common/JobStatusEventArgs.cs @@ -0,0 +1,14 @@ +using System; +using GitHub.DistributedTask.WebApi; + +namespace GitHub.Runner.Common +{ + public class JobStatusEventArgs : EventArgs + { + public JobStatusEventArgs(TaskAgentStatus status) + { + this.Status = status; + } + public TaskAgentStatus Status { get; private set; } + } +} diff --git a/src/Runner.Common/RunnerServer.cs b/src/Runner.Common/RunnerServer.cs index f377622c6bd..0ab7a628503 100644 --- a/src/Runner.Common/RunnerServer.cs +++ b/src/Runner.Common/RunnerServer.cs @@ -39,7 +39,7 @@ public interface IRunnerServer : IRunnerService Task CreateAgentSessionAsync(Int32 poolId, TaskAgentSession session, CancellationToken cancellationToken); Task DeleteAgentMessageAsync(Int32 poolId, Int64 messageId, Guid sessionId, CancellationToken cancellationToken); Task DeleteAgentSessionAsync(Int32 poolId, Guid sessionId, CancellationToken cancellationToken); - Task GetAgentMessageAsync(Int32 poolId, Guid sessionId, Int64? lastMessageId, CancellationToken cancellationToken); + Task GetAgentMessageAsync(Int32 poolId, Guid sessionId, Int64? lastMessageId, TaskAgentStatus status, CancellationToken cancellationToken); // job request Task GetAgentRequestAsync(int poolId, long requestId, CancellationToken cancellationToken); @@ -298,10 +298,10 @@ public Task DeleteAgentSessionAsync(Int32 poolId, Guid sessionId, CancellationTo return _messageTaskAgentClient.DeleteAgentSessionAsync(poolId, sessionId, cancellationToken: cancellationToken); } - public Task GetAgentMessageAsync(Int32 poolId, Guid sessionId, Int64? lastMessageId, CancellationToken cancellationToken) + public Task GetAgentMessageAsync(Int32 poolId, Guid sessionId, Int64? lastMessageId, TaskAgentStatus status, CancellationToken cancellationToken) { CheckConnection(RunnerConnectionType.MessageQueue); - return _messageTaskAgentClient.GetMessageAsync(poolId, sessionId, lastMessageId, cancellationToken: cancellationToken); + return _messageTaskAgentClient.GetMessageAsync(poolId, sessionId, lastMessageId, status, cancellationToken: cancellationToken); } //----------------------------------------------------------------- diff --git a/src/Runner.Listener/JobDispatcher.cs b/src/Runner.Listener/JobDispatcher.cs index 7913c92289a..9b4b2d6f477 100644 --- a/src/Runner.Listener/JobDispatcher.cs +++ b/src/Runner.Listener/JobDispatcher.cs @@ -27,6 +27,7 @@ public interface IJobDispatcher : IRunnerService bool Cancel(JobCancelMessage message); Task WaitAsync(CancellationToken token); Task ShutdownAsync(); + event EventHandler JobStatus; } // This implementation of IJobDispatcher is not thread safe. @@ -55,6 +56,8 @@ public sealed class JobDispatcher : RunnerService, IJobDispatcher private TaskCompletionSource _runOnceJobCompleted = new TaskCompletionSource(); + public event EventHandler JobStatus; + public override void Initialize(IHostContext hostContext) { base.Initialize(hostContext); @@ -335,6 +338,11 @@ private async Task RunAsync(Pipelines.AgentJobRequestMessage message, string orc Busy = true; try { + if (JobStatus != null) + { + JobStatus(this, new JobStatusEventArgs(TaskAgentStatus.Busy)); + } + if (previousJobDispatch != null) { Trace.Verbose($"Make sure the previous job request {previousJobDispatch.JobId} has successfully finished on worker."); @@ -650,6 +658,11 @@ await processChannel.SendAsync( finally { Busy = false; + + if (JobStatus != null) + { + JobStatus(this, new JobStatusEventArgs(TaskAgentStatus.Online)); + } } } diff --git a/src/Runner.Listener/MessageListener.cs b/src/Runner.Listener/MessageListener.cs index 81afd737d3b..19e888a5304 100644 --- a/src/Runner.Listener/MessageListener.cs +++ b/src/Runner.Listener/MessageListener.cs @@ -23,6 +23,7 @@ public interface IMessageListener : IRunnerService Task DeleteSessionAsync(); Task GetNextMessageAsync(CancellationToken token); Task DeleteMessageAsync(TaskAgentMessage message); + void OnJobStatus(object sender, JobStatusEventArgs e); } public sealed class MessageListener : RunnerService, IMessageListener @@ -38,6 +39,8 @@ public sealed class MessageListener : RunnerService, IMessageListener private readonly TimeSpan _sessionConflictRetryLimit = TimeSpan.FromMinutes(4); private readonly TimeSpan _clockSkewRetryLimit = TimeSpan.FromMinutes(30); private readonly Dictionary _sessionCreationExceptionTracker = new Dictionary(); + private TaskAgentStatus runnerStatus = TaskAgentStatus.Online; + private CancellationTokenSource _getMessagesTokenSource; public override void Initialize(IHostContext hostContext) { @@ -170,6 +173,23 @@ public async Task DeleteSessionAsync() } } + public void OnJobStatus(object sender, JobStatusEventArgs e) + { + if (StringUtil.ConvertToBoolean(Environment.GetEnvironmentVariable("USE_BROKER_FLOW"))) + { + Trace.Info("Received job status event. JobState: {0}", e.Status); + runnerStatus = e.Status; + try + { + _getMessagesTokenSource?.Cancel(); + } + catch (ObjectDisposedException) + { + Trace.Info("_getMessagesTokenSource is already disposed."); + } + } + } + public async Task GetNextMessageAsync(CancellationToken token) { Trace.Entering(); @@ -184,12 +204,14 @@ public async Task GetNextMessageAsync(CancellationToken token) { token.ThrowIfCancellationRequested(); TaskAgentMessage message = null; + _getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token); try { message = await _runnerServer.GetAgentMessageAsync(_settings.PoolId, _session.SessionId, _lastMessageId, - token); + runnerStatus, + _getMessagesTokenSource.Token); // Decrypt the message body if the session is using encryption message = DecryptMessage(message); @@ -206,6 +228,11 @@ public async Task GetNextMessageAsync(CancellationToken token) continuousError = 0; } } + catch (OperationCanceledException) when (_getMessagesTokenSource.Token.IsCancellationRequested) + { + Trace.Info("Get messages has been cancelled using local token source. Continue to get messages with new status."); + continue; + } catch (OperationCanceledException) when (token.IsCancellationRequested) { Trace.Info("Get next message has been cancelled."); @@ -261,6 +288,10 @@ public async Task GetNextMessageAsync(CancellationToken token) await HostContext.Delay(_getNextMessageRetryInterval, token); } } + finally + { + _getMessagesTokenSource.Dispose(); + } if (message == null) { diff --git a/src/Runner.Listener/Runner.cs b/src/Runner.Listener/Runner.cs index 2e590163f41..dae439595a7 100644 --- a/src/Runner.Listener/Runner.cs +++ b/src/Runner.Listener/Runner.cs @@ -360,6 +360,8 @@ private async Task RunAsync(RunnerSettings settings, bool runOnce = false) bool runOnceJobReceived = false; jobDispatcher = HostContext.CreateService(); + jobDispatcher.JobStatus += _listener.OnJobStatus; + while (!HostContext.RunnerShutdownToken.IsCancellationRequested) { TaskAgentMessage message = null; @@ -561,6 +563,7 @@ private async Task RunAsync(RunnerSettings settings, bool runOnce = false) { if (jobDispatcher != null) { + jobDispatcher.JobStatus -= _listener.OnJobStatus; await jobDispatcher.ShutdownAsync(); } diff --git a/src/Runner.Sdk/Util/VssUtil.cs b/src/Runner.Sdk/Util/VssUtil.cs index e4a7cd0bc92..70cdf769c48 100644 --- a/src/Runner.Sdk/Util/VssUtil.cs +++ b/src/Runner.Sdk/Util/VssUtil.cs @@ -57,7 +57,7 @@ public static VssConnection CreateConnection(Uri serverUri, VssCredentials crede settings.SendTimeout = TimeSpan.FromSeconds(Math.Min(Math.Max(httpRequestTimeoutSeconds, 100), 1200)); } - if (StringUtil.ConvertToBoolean(Environment.GetEnvironmentVariable("GITHUB_ACTIONS_RUNNER_ALLOW_REDIRECT"))) + if (StringUtil.ConvertToBoolean(Environment.GetEnvironmentVariable("USE_BROKER_FLOW"))) { settings.AllowAutoRedirect = true; } diff --git a/src/Sdk/DTGenerated/Generated/TaskAgentHttpClientBase.cs b/src/Sdk/DTGenerated/Generated/TaskAgentHttpClientBase.cs index 29fe07c0d3f..439fd61c4bf 100644 --- a/src/Sdk/DTGenerated/Generated/TaskAgentHttpClientBase.cs +++ b/src/Sdk/DTGenerated/Generated/TaskAgentHttpClientBase.cs @@ -457,6 +457,7 @@ public virtual Task GetMessageAsync( int poolId, Guid sessionId, long? lastMessageId = null, + TaskAgentStatus? status = null, object userState = null, CancellationToken cancellationToken = default) { @@ -470,6 +471,10 @@ public virtual Task GetMessageAsync( { queryParams.Add("lastMessageId", lastMessageId.Value.ToString(CultureInfo.InvariantCulture)); } + if (status != null) + { + queryParams.Add("status", status.Value.ToString()); + } return SendAsync( httpMethod, diff --git a/src/Sdk/DTWebApi/WebApi/TaskAgentStatus.cs b/src/Sdk/DTWebApi/WebApi/TaskAgentStatus.cs index 205ee387180..35869ceb320 100644 --- a/src/Sdk/DTWebApi/WebApi/TaskAgentStatus.cs +++ b/src/Sdk/DTWebApi/WebApi/TaskAgentStatus.cs @@ -10,5 +10,8 @@ public enum TaskAgentStatus [EnumMember] Online = 2, + + [EnumMember] + Busy = 3, } } diff --git a/src/Test/L0/Listener/MessageListenerL0.cs b/src/Test/L0/Listener/MessageListenerL0.cs index 05d5d14d17b..1766e2c4567 100644 --- a/src/Test/L0/Listener/MessageListenerL0.cs +++ b/src/Test/L0/Listener/MessageListenerL0.cs @@ -192,8 +192,8 @@ public async void GetNextMessage() _runnerServer .Setup(x => x.GetAgentMessageAsync( - _settings.PoolId, expectedSession.SessionId, It.IsAny(), tokenSource.Token)) - .Returns(async (Int32 poolId, Guid sessionId, Int64? lastMessageId, CancellationToken cancellationToken) => + _settings.PoolId, expectedSession.SessionId, It.IsAny(), TaskAgentStatus.Online, tokenSource.Token)) + .Returns(async (Int32 poolId, Guid sessionId, Int64? lastMessageId, TaskAgentStatus status, CancellationToken cancellationToken) => { await Task.Yield(); return messages.Dequeue(); @@ -208,7 +208,7 @@ public async void GetNextMessage() //Assert _runnerServer .Verify(x => x.GetAgentMessageAsync( - _settings.PoolId, expectedSession.SessionId, It.IsAny(), tokenSource.Token), Times.Exactly(arMessages.Length)); + _settings.PoolId, expectedSession.SessionId, It.IsAny(), TaskAgentStatus.Online, tokenSource.Token), Times.Exactly(arMessages.Length)); } } @@ -293,7 +293,7 @@ public async void SkipDeleteSession_WhenGetNextMessageGetTaskAgentAccessTokenExp _runnerServer .Setup(x => x.GetAgentMessageAsync( - _settings.PoolId, expectedSession.SessionId, It.IsAny(), tokenSource.Token)) + _settings.PoolId, expectedSession.SessionId, It.IsAny(), TaskAgentStatus.Online, tokenSource.Token)) .Throws(new TaskAgentAccessTokenExpiredException("test")); try { @@ -311,7 +311,7 @@ public async void SkipDeleteSession_WhenGetNextMessageGetTaskAgentAccessTokenExp //Assert _runnerServer .Verify(x => x.GetAgentMessageAsync( - _settings.PoolId, expectedSession.SessionId, It.IsAny(), tokenSource.Token), Times.Once); + _settings.PoolId, expectedSession.SessionId, It.IsAny(), TaskAgentStatus.Online, tokenSource.Token), Times.Once); _runnerServer .Verify(x => x.DeleteAgentSessionAsync( From 3e42edd19808a9769ef62b780e1f209b0efc8f82 Mon Sep 17 00:00:00 2001 From: Lokesh Gopu Date: Thu, 28 Jul 2022 15:44:34 -0400 Subject: [PATCH 2/3] fixed l0 tests --- src/Test/L0/Listener/MessageListenerL0.cs | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/src/Test/L0/Listener/MessageListenerL0.cs b/src/Test/L0/Listener/MessageListenerL0.cs index 1766e2c4567..09725a1a3f2 100644 --- a/src/Test/L0/Listener/MessageListenerL0.cs +++ b/src/Test/L0/Listener/MessageListenerL0.cs @@ -192,7 +192,7 @@ public async void GetNextMessage() _runnerServer .Setup(x => x.GetAgentMessageAsync( - _settings.PoolId, expectedSession.SessionId, It.IsAny(), TaskAgentStatus.Online, tokenSource.Token)) + _settings.PoolId, expectedSession.SessionId, It.IsAny(), TaskAgentStatus.Online, It.IsAny())) .Returns(async (Int32 poolId, Guid sessionId, Int64? lastMessageId, TaskAgentStatus status, CancellationToken cancellationToken) => { await Task.Yield(); @@ -208,7 +208,7 @@ public async void GetNextMessage() //Assert _runnerServer .Verify(x => x.GetAgentMessageAsync( - _settings.PoolId, expectedSession.SessionId, It.IsAny(), TaskAgentStatus.Online, tokenSource.Token), Times.Exactly(arMessages.Length)); + _settings.PoolId, expectedSession.SessionId, It.IsAny(), TaskAgentStatus.Online, It.IsAny()), Times.Exactly(arMessages.Length)); } } @@ -293,7 +293,7 @@ public async void SkipDeleteSession_WhenGetNextMessageGetTaskAgentAccessTokenExp _runnerServer .Setup(x => x.GetAgentMessageAsync( - _settings.PoolId, expectedSession.SessionId, It.IsAny(), TaskAgentStatus.Online, tokenSource.Token)) + _settings.PoolId, expectedSession.SessionId, It.IsAny(), TaskAgentStatus.Online, It.IsAny())) .Throws(new TaskAgentAccessTokenExpiredException("test")); try { @@ -311,7 +311,7 @@ public async void SkipDeleteSession_WhenGetNextMessageGetTaskAgentAccessTokenExp //Assert _runnerServer .Verify(x => x.GetAgentMessageAsync( - _settings.PoolId, expectedSession.SessionId, It.IsAny(), TaskAgentStatus.Online, tokenSource.Token), Times.Once); + _settings.PoolId, expectedSession.SessionId, It.IsAny(), TaskAgentStatus.Online, It.IsAny()), Times.Once); _runnerServer .Verify(x => x.DeleteAgentSessionAsync( From 392dc7abd23d1ad779cc68971b21f76a8150fa0d Mon Sep 17 00:00:00 2001 From: Lokesh Gopu Date: Thu, 28 Jul 2022 16:09:26 -0400 Subject: [PATCH 3/3] PR feedback --- src/Runner.Listener/MessageListener.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Runner.Listener/MessageListener.cs b/src/Runner.Listener/MessageListener.cs index 19e888a5304..acea728c7c5 100644 --- a/src/Runner.Listener/MessageListener.cs +++ b/src/Runner.Listener/MessageListener.cs @@ -228,7 +228,7 @@ public async Task GetNextMessageAsync(CancellationToken token) continuousError = 0; } } - catch (OperationCanceledException) when (_getMessagesTokenSource.Token.IsCancellationRequested) + catch (OperationCanceledException) when (_getMessagesTokenSource.Token.IsCancellationRequested && !token.IsCancellationRequested) { Trace.Info("Get messages has been cancelled using local token source. Continue to get messages with new status."); continue;