Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Include current runner status while getting messages #2026

Merged
merged 3 commits into from
Jul 28, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
14 changes: 14 additions & 0 deletions src/Runner.Common/JobStatusEventArgs.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
using System;
using GitHub.DistributedTask.WebApi;

namespace GitHub.Runner.Common
{
public class JobStatusEventArgs : EventArgs
{
public JobStatusEventArgs(TaskAgentStatus status)
{
this.Status = status;
}
public TaskAgentStatus Status { get; private set; }
}
}
6 changes: 3 additions & 3 deletions src/Runner.Common/RunnerServer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,7 @@ public interface IRunnerServer : IRunnerService
Task<TaskAgentSession> CreateAgentSessionAsync(Int32 poolId, TaskAgentSession session, CancellationToken cancellationToken);
Task DeleteAgentMessageAsync(Int32 poolId, Int64 messageId, Guid sessionId, CancellationToken cancellationToken);
Task DeleteAgentSessionAsync(Int32 poolId, Guid sessionId, CancellationToken cancellationToken);
Task<TaskAgentMessage> GetAgentMessageAsync(Int32 poolId, Guid sessionId, Int64? lastMessageId, CancellationToken cancellationToken);
Task<TaskAgentMessage> GetAgentMessageAsync(Int32 poolId, Guid sessionId, Int64? lastMessageId, TaskAgentStatus status, CancellationToken cancellationToken);

// job request
Task<TaskAgentJobRequest> GetAgentRequestAsync(int poolId, long requestId, CancellationToken cancellationToken);
Expand Down Expand Up @@ -298,10 +298,10 @@ public Task DeleteAgentSessionAsync(Int32 poolId, Guid sessionId, CancellationTo
return _messageTaskAgentClient.DeleteAgentSessionAsync(poolId, sessionId, cancellationToken: cancellationToken);
}

public Task<TaskAgentMessage> GetAgentMessageAsync(Int32 poolId, Guid sessionId, Int64? lastMessageId, CancellationToken cancellationToken)
public Task<TaskAgentMessage> GetAgentMessageAsync(Int32 poolId, Guid sessionId, Int64? lastMessageId, TaskAgentStatus status, CancellationToken cancellationToken)
{
CheckConnection(RunnerConnectionType.MessageQueue);
return _messageTaskAgentClient.GetMessageAsync(poolId, sessionId, lastMessageId, cancellationToken: cancellationToken);
return _messageTaskAgentClient.GetMessageAsync(poolId, sessionId, lastMessageId, status, cancellationToken: cancellationToken);
}

//-----------------------------------------------------------------
Expand Down
13 changes: 13 additions & 0 deletions src/Runner.Listener/JobDispatcher.cs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ public interface IJobDispatcher : IRunnerService
bool Cancel(JobCancelMessage message);
Task WaitAsync(CancellationToken token);
Task ShutdownAsync();
event EventHandler<JobStatusEventArgs> JobStatus;
}

// This implementation of IJobDispatcher is not thread safe.
Expand Down Expand Up @@ -55,6 +56,8 @@ public sealed class JobDispatcher : RunnerService, IJobDispatcher

private TaskCompletionSource<bool> _runOnceJobCompleted = new TaskCompletionSource<bool>();

public event EventHandler<JobStatusEventArgs> JobStatus;

public override void Initialize(IHostContext hostContext)
{
base.Initialize(hostContext);
Expand Down Expand Up @@ -335,6 +338,11 @@ private async Task RunAsync(Pipelines.AgentJobRequestMessage message, string orc
Busy = true;
try
{
if (JobStatus != null)
{
JobStatus(this, new JobStatusEventArgs(TaskAgentStatus.Busy));
}

if (previousJobDispatch != null)
{
Trace.Verbose($"Make sure the previous job request {previousJobDispatch.JobId} has successfully finished on worker.");
Expand Down Expand Up @@ -650,6 +658,11 @@ await processChannel.SendAsync(
finally
{
Busy = false;

if (JobStatus != null)
{
JobStatus(this, new JobStatusEventArgs(TaskAgentStatus.Online));
}
}
}

Expand Down
33 changes: 32 additions & 1 deletion src/Runner.Listener/MessageListener.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ public interface IMessageListener : IRunnerService
Task DeleteSessionAsync();
Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token);
Task DeleteMessageAsync(TaskAgentMessage message);
void OnJobStatus(object sender, JobStatusEventArgs e);
}

public sealed class MessageListener : RunnerService, IMessageListener
Expand All @@ -38,6 +39,8 @@ public sealed class MessageListener : RunnerService, IMessageListener
private readonly TimeSpan _sessionConflictRetryLimit = TimeSpan.FromMinutes(4);
private readonly TimeSpan _clockSkewRetryLimit = TimeSpan.FromMinutes(30);
private readonly Dictionary<string, int> _sessionCreationExceptionTracker = new Dictionary<string, int>();
private TaskAgentStatus runnerStatus = TaskAgentStatus.Online;
private CancellationTokenSource _getMessagesTokenSource;

public override void Initialize(IHostContext hostContext)
{
Expand Down Expand Up @@ -170,6 +173,23 @@ public async Task DeleteSessionAsync()
}
}

public void OnJobStatus(object sender, JobStatusEventArgs e)
{
if (StringUtil.ConvertToBoolean(Environment.GetEnvironmentVariable("USE_BROKER_FLOW")))
{
Trace.Info("Received job status event. JobState: {0}", e.Status);
runnerStatus = e.Status;
try
{
_getMessagesTokenSource?.Cancel();
}
catch (ObjectDisposedException)
{
Trace.Info("_getMessagesTokenSource is already disposed.");
}
}
}

public async Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token)
{
Trace.Entering();
Expand All @@ -184,12 +204,14 @@ public async Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token)
{
token.ThrowIfCancellationRequested();
TaskAgentMessage message = null;
_getMessagesTokenSource = CancellationTokenSource.CreateLinkedTokenSource(token);
try
{
message = await _runnerServer.GetAgentMessageAsync(_settings.PoolId,
_session.SessionId,
_lastMessageId,
token);
runnerStatus,
_getMessagesTokenSource.Token);

// Decrypt the message body if the session is using encryption
message = DecryptMessage(message);
Expand All @@ -206,6 +228,11 @@ public async Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token)
continuousError = 0;
}
}
catch (OperationCanceledException) when (_getMessagesTokenSource.Token.IsCancellationRequested && !token.IsCancellationRequested)
{
Trace.Info("Get messages has been cancelled using local token source. Continue to get messages with new status.");
continue;
}
catch (OperationCanceledException) when (token.IsCancellationRequested)
{
Trace.Info("Get next message has been cancelled.");
Expand Down Expand Up @@ -261,6 +288,10 @@ public async Task<TaskAgentMessage> GetNextMessageAsync(CancellationToken token)
await HostContext.Delay(_getNextMessageRetryInterval, token);
}
}
finally
{
_getMessagesTokenSource.Dispose();
}

if (message == null)
{
Expand Down
3 changes: 3 additions & 0 deletions src/Runner.Listener/Runner.cs
Original file line number Diff line number Diff line change
Expand Up @@ -360,6 +360,8 @@ private async Task<int> RunAsync(RunnerSettings settings, bool runOnce = false)
bool runOnceJobReceived = false;
jobDispatcher = HostContext.CreateService<IJobDispatcher>();

jobDispatcher.JobStatus += _listener.OnJobStatus;

while (!HostContext.RunnerShutdownToken.IsCancellationRequested)
{
TaskAgentMessage message = null;
Expand Down Expand Up @@ -561,6 +563,7 @@ private async Task<int> RunAsync(RunnerSettings settings, bool runOnce = false)
{
if (jobDispatcher != null)
{
jobDispatcher.JobStatus -= _listener.OnJobStatus;
await jobDispatcher.ShutdownAsync();
}

Expand Down
2 changes: 1 addition & 1 deletion src/Runner.Sdk/Util/VssUtil.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public static VssConnection CreateConnection(Uri serverUri, VssCredentials crede
settings.SendTimeout = TimeSpan.FromSeconds(Math.Min(Math.Max(httpRequestTimeoutSeconds, 100), 1200));
}

if (StringUtil.ConvertToBoolean(Environment.GetEnvironmentVariable("GITHUB_ACTIONS_RUNNER_ALLOW_REDIRECT")))
TingluoHuang marked this conversation as resolved.
Show resolved Hide resolved
if (StringUtil.ConvertToBoolean(Environment.GetEnvironmentVariable("USE_BROKER_FLOW")))
{
settings.AllowAutoRedirect = true;
}
Expand Down
5 changes: 5 additions & 0 deletions src/Sdk/DTGenerated/Generated/TaskAgentHttpClientBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -457,6 +457,7 @@ public virtual Task<TaskAgentMessage> GetMessageAsync(
int poolId,
Guid sessionId,
long? lastMessageId = null,
TaskAgentStatus? status = null,
object userState = null,
CancellationToken cancellationToken = default)
{
Expand All @@ -470,6 +471,10 @@ public virtual Task<TaskAgentMessage> GetMessageAsync(
{
queryParams.Add("lastMessageId", lastMessageId.Value.ToString(CultureInfo.InvariantCulture));
}
if (status != null)
{
queryParams.Add("status", status.Value.ToString());
}

return SendAsync<TaskAgentMessage>(
httpMethod,
Expand Down
3 changes: 3 additions & 0 deletions src/Sdk/DTWebApi/WebApi/TaskAgentStatus.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,5 +10,8 @@ public enum TaskAgentStatus

[EnumMember]
Online = 2,

[EnumMember]
Busy = 3,
}
}
10 changes: 5 additions & 5 deletions src/Test/L0/Listener/MessageListenerL0.cs
Original file line number Diff line number Diff line change
Expand Up @@ -192,8 +192,8 @@ public async void GetNextMessage()

_runnerServer
.Setup(x => x.GetAgentMessageAsync(
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), tokenSource.Token))
.Returns(async (Int32 poolId, Guid sessionId, Int64? lastMessageId, CancellationToken cancellationToken) =>
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), TaskAgentStatus.Online, It.IsAny<CancellationToken>()))
.Returns(async (Int32 poolId, Guid sessionId, Int64? lastMessageId, TaskAgentStatus status, CancellationToken cancellationToken) =>
{
await Task.Yield();
return messages.Dequeue();
Expand All @@ -208,7 +208,7 @@ public async void GetNextMessage()
//Assert
_runnerServer
.Verify(x => x.GetAgentMessageAsync(
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), tokenSource.Token), Times.Exactly(arMessages.Length));
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), TaskAgentStatus.Online, It.IsAny<CancellationToken>()), Times.Exactly(arMessages.Length));
}
}

Expand Down Expand Up @@ -293,7 +293,7 @@ public async void SkipDeleteSession_WhenGetNextMessageGetTaskAgentAccessTokenExp

_runnerServer
.Setup(x => x.GetAgentMessageAsync(
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), tokenSource.Token))
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), TaskAgentStatus.Online, It.IsAny<CancellationToken>()))
.Throws(new TaskAgentAccessTokenExpiredException("test"));
try
{
Expand All @@ -311,7 +311,7 @@ public async void SkipDeleteSession_WhenGetNextMessageGetTaskAgentAccessTokenExp
//Assert
_runnerServer
.Verify(x => x.GetAgentMessageAsync(
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), tokenSource.Token), Times.Once);
_settings.PoolId, expectedSession.SessionId, It.IsAny<long?>(), TaskAgentStatus.Online, It.IsAny<CancellationToken>()), Times.Once);

_runnerServer
.Verify(x => x.DeleteAgentSessionAsync(
Expand Down