Skip to content

Commit

Permalink
Include current runner status while getting messages (#2026)
Browse files Browse the repository at this point in the history
* get messages with runner status

* fixed l0 tests

* PR feedback
  • Loading branch information
lokesh755 authored Jul 28, 2022
1 parent 72e2107 commit 813af29
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 10 deletions.
14 changes: 14 additions & 0 deletions src/Runner.Common/JobStatusEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;
using GitHub.DistributedTask.WebApi;

namespace GitHub.Runner.Common
{
public class JobStatusEventArgs : EventArgs
{
public JobStatusEventArgs(TaskAgentStatus status)
{
this.Status = status;
}
public TaskAgentStatus Status { get; private set; }
}
}
6 changes: 3 additions & 3 deletions src/Runner.Common/RunnerServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public interface IRunnerServer : IRunnerService
Task<TaskAgentSession> CreateAgentSessionAsync(Int32 poolId, TaskAgentSession session, CancellationToken cancellationToken);
Task DeleteAgentMessageAsync(Int32 poolId, Int64 messageId, Guid sessionId, CancellationToken cancellationToken);
Task DeleteAgentSessionAsync(Int32 poolId, Guid sessionId, CancellationToken cancellationToken);
Task<TaskAgentMessage> GetAgentMessageAsync(Int32 poolId, Guid sessionId, Int64? lastMessageId, CancellationToken cancellationToken);
Task<TaskAgentMessage> GetAgentMessageAsync(Int32 poolId, Guid sessionId, Int64? lastMessageId, TaskAgentStatus status, CancellationToken cancellationToken);

// job request
Task<TaskAgentJobRequest> GetAgentRequestAsync(int poolId, long requestId, CancellationToken cancellationToken);
Expand Down Expand Up @@ -298,10 +298,10 @@ public Task DeleteAgentSessionAsync(Int32 poolId, Guid sessionId, CancellationTo
return _messageTaskAgentClient.DeleteAgentSessionAsync(poolId, sessionId, cancellationToken: cancellationToken);
}

public Task<TaskAgentMessage> GetAgentMessageAsync(Int32 poolId, Guid sessionId, Int64? lastMessageId, CancellationToken cancellationToken)
public Task<TaskAgentMessage> GetAgentMessageAsync(Int32 poolId, Guid sessionId, Int64? lastMessageId, TaskAgentStatus status, CancellationToken cancellationToken)
{
CheckConnection(RunnerConnectionType.MessageQueue);
return _messageTaskAgentClient.GetMessageAsync(poolId, sessionId, lastMessageId, cancellationToken: cancellationToken);
return _messageTaskAgentClient.GetMessageAsync(poolId, sessionId, lastMessageId, status, cancellationToken: cancellationToken);
}

//-----------------------------------------------------------------
Expand Down
13 changes: 13 additions & 0 deletions src/Runner.Listener/JobDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public interface IJobDispatcher : IRunnerService
bool Cancel(JobCancelMessage message);
Task WaitAsync(CancellationToken token);
Task ShutdownAsync();
event EventHandler<JobStatusEventArgs> JobStatus;
}

// This implementation of IJobDispatcher is not thread safe.
Expand Down Expand Up @@ -55,6 +56,8 @@ public sealed class JobDispatcher : RunnerService, IJobDispatcher

private TaskCompletionSource<bool> _runOnceJobCompleted = new TaskCompletionSource<bool>();

public event EventHandler<JobStatusEventArgs> JobStatus;

public override void Initialize(IHostContext hostContext)
{
base.Initialize(hostContext);
Expand Down Expand Up @@ -335,6 +338,11 @@ private async Task RunAsync(Pipelines.AgentJobRequestMessage message, string orc
Busy = true;
try
{
if (JobStatus != null)
{
JobStatus(this, new JobStatusEventArgs(TaskAgentStatus.Busy));
}

if (previousJobDispatch != null)
{
Trace.Verbose($"Make sure the previous job request {previousJobDispatch.JobId} has successfully finished on worker.");
Expand Down Expand Up @@ -650,6 +658,11 @@ await processChannel.SendAsync(
finally
{
Busy = false;

if (JobStatus != null)
{
JobStatus(this, new JobStatusEventArgs(TaskAgentStatus.Online));
}
}
}

Expand Down
33 changes: 32 additions & 1 deletion src/Runner.Listener/MessageListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public interface IMessageListener : IRunnerService
Task DeleteSessionAsync();
Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token);
Task DeleteMessageAsync(TaskAgentMessage message);
void OnJobStatus(object sender, JobStatusEventArgs e);
}

public sealed class MessageListener : RunnerService, IMessageListener
Expand All @@ -38,6 +39,8 @@ public sealed class MessageListener : RunnerService, IMessageListener
private readonly TimeSpan _sessionConflictRetryLimit = TimeSpan.FromMinutes(4);
private readonly TimeSpan _clockSkewRetryLimit = TimeSpan.FromMinutes(30);
private readonly Dictionary<string, int> _sessionCreationExceptionTracker = new Dictionary<string, int>();
private TaskAgentStatus runnerStatus = TaskAgentStatus.Online;
private CancellationTokenSource _getMessagesTokenSource;

public override void Initialize(IHostContext hostContext)
{
Expand Down Expand Up @@ -170,6 +173,23 @@ public async Task DeleteSessionAsync()
}
}

public void OnJobStatus(object sender, JobStatusEventArgs e)
{
if (StringUtil.ConvertToBoolean(Environment.GetEnvironmentVariable("USE_BROKER_FLOW")))
{
Trace.Info("Received job status event. JobState: {0}", e.Status);
runnerStatus = e.Status;
try
{
_getMessagesTokenSource?.Cancel();
}
catch (ObjectDisposedException)
{
Trace.Info("_getMessagesTokenSource is already disposed.");
}
}
}

public async Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token)
{
Trace.Entering();
Expand All @@ -184,12 +204,14 @@ public async Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token)
{
token.ThrowIfCancellationRequested();
TaskAgentMessage message = null;
_getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token);
try
{
message = await _runnerServer.GetAgentMessageAsync(_settings.PoolId,
_session.SessionId,
_lastMessageId,
token);
runnerStatus,
_getMessagesTokenSource.Token);

// Decrypt the message body if the session is using encryption
message = DecryptMessage(message);
Expand All @@ -206,6 +228,11 @@ public async Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token)
continuousError = 0;
}
}
catch (OperationCanceledException) when (_getMessagesTokenSource.Token.IsCancellationRequested && !token.IsCancellationRequested)
{
Trace.Info("Get messages has been cancelled using local token source. Continue to get messages with new status.");
continue;
}
catch (OperationCanceledException) when (token.IsCancellationRequested)
{
Trace.Info("Get next message has been cancelled.");
Expand Down Expand Up @@ -261,6 +288,10 @@ public async Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token)
await HostContext.Delay(_getNextMessageRetryInterval, token);
}
}
finally
{
_getMessagesTokenSource.Dispose();
}

if (message == null)
{
Expand Down
3 changes: 3 additions & 0 deletions src/Runner.Listener/Runner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,8 @@ private async Task<int> RunAsync(RunnerSettings settings, bool runOnce = false)
bool runOnceJobReceived = false;
jobDispatcher = HostContext.CreateService<IJobDispatcher>();

jobDispatcher.JobStatus += _listener.OnJobStatus;

while (!HostContext.RunnerShutdownToken.IsCancellationRequested)
{
TaskAgentMessage message = null;
Expand Down Expand Up @@ -561,6 +563,7 @@ private async Task<int> RunAsync(RunnerSettings settings, bool runOnce = false)
{
if (jobDispatcher != null)
{
jobDispatcher.JobStatus -= _listener.OnJobStatus;
await jobDispatcher.ShutdownAsync();
}

Expand Down
2 changes: 1 addition & 1 deletion src/Runner.Sdk/Util/VssUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public static VssConnection CreateConnection(Uri serverUri, VssCredentials crede
settings.SendTimeout = TimeSpan.FromSeconds(Math.Min(Math.Max(httpRequestTimeoutSeconds, 100), 1200));
}

if (StringUtil.ConvertToBoolean(Environment.GetEnvironmentVariable("GITHUB_ACTIONS_RUNNER_ALLOW_REDIRECT")))
if (StringUtil.ConvertToBoolean(Environment.GetEnvironmentVariable("USE_BROKER_FLOW")))
{
settings.AllowAutoRedirect = true;
}
Expand Down
5 changes: 5 additions & 0 deletions src/Sdk/DTGenerated/Generated/TaskAgentHttpClientBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ public virtual Task<TaskAgentMessage> GetMessageAsync(
int poolId,
Guid sessionId,
long? lastMessageId = null,
TaskAgentStatus? status = null,
object userState = null,
CancellationToken cancellationToken = default)
{
Expand All @@ -470,6 +471,10 @@ public virtual Task<TaskAgentMessage> GetMessageAsync(
{
queryParams.Add("lastMessageId", lastMessageId.Value.ToString(CultureInfo.InvariantCulture));
}
if (status != null)
{
queryParams.Add("status", status.Value.ToString());
}

return SendAsync<TaskAgentMessage>(
httpMethod,
Expand Down
3 changes: 3 additions & 0 deletions src/Sdk/DTWebApi/WebApi/TaskAgentStatus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,8 @@ public enum TaskAgentStatus

[EnumMember]
Online = 2,

[EnumMember]
Busy = 3,
}
}
10 changes: 5 additions & 5 deletions src/Test/L0/Listener/MessageListenerL0.cs
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ public async void GetNextMessage()

_runnerServer
.Setup(x => x.GetAgentMessageAsync(
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), tokenSource.Token))
.Returns(async (Int32 poolId, Guid sessionId, Int64? lastMessageId, CancellationToken cancellationToken) =>
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), TaskAgentStatus.Online, It.IsAny<CancellationToken>()))
.Returns(async (Int32 poolId, Guid sessionId, Int64? lastMessageId, TaskAgentStatus status, CancellationToken cancellationToken) =>
{
await Task.Yield();
return messages.Dequeue();
Expand All @@ -208,7 +208,7 @@ public async void GetNextMessage()
//Assert
_runnerServer
.Verify(x => x.GetAgentMessageAsync(
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), tokenSource.Token), Times.Exactly(arMessages.Length));
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), TaskAgentStatus.Online, It.IsAny<CancellationToken>()), Times.Exactly(arMessages.Length));
}
}

Expand Down Expand Up @@ -293,7 +293,7 @@ public async void SkipDeleteSession_WhenGetNextMessageGetTaskAgentAccessTokenExp

_runnerServer
.Setup(x => x.GetAgentMessageAsync(
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), tokenSource.Token))
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), TaskAgentStatus.Online, It.IsAny<CancellationToken>()))
.Throws(new TaskAgentAccessTokenExpiredException("test"));
try
{
Expand All @@ -311,7 +311,7 @@ public async void SkipDeleteSession_WhenGetNextMessageGetTaskAgentAccessTokenExp
//Assert
_runnerServer
.Verify(x => x.GetAgentMessageAsync(
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), tokenSource.Token), Times.Once);
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), TaskAgentStatus.Online, It.IsAny<CancellationToken>()), Times.Once);

_runnerServer
.Verify(x => x.DeleteAgentSessionAsync(
Expand Down

0 comments on commit 813af29

Please sign in to comment.