Skip to content

Commit

Permalink
Added support for timeouts in Ask operations
Browse files Browse the repository at this point in the history
  • Loading branch information
andresgutierrez committed Oct 17, 2023
1 parent b8d6cf8 commit 727a739
Show file tree
Hide file tree
Showing 9 changed files with 163 additions and 11 deletions.
10 changes: 3 additions & 7 deletions Nixie.Tests/Actors/PingPongActor.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,9 @@ public PongActor(IActorContext<PongActor, string, string> _)

}

public async Task<string> Receive(string message)
{
await Task.Delay(1);

return message;

//return Task.FromResult(message);
public Task<string> Receive(string message)
{
return Task.FromResult(message);
}
}

Expand Down
37 changes: 37 additions & 0 deletions Nixie.Tests/Actors/ReplySlowActor.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@

namespace Nixie.Tests.Actors;

public sealed class ReplySlowActor : IActor<string, string>
{
private readonly Dictionary<string, int> receivedMessages = new();

public ReplySlowActor(IActorContext<ReplySlowActor, string, string> _)
{

}

public int GetMessages(string id)
{
if (receivedMessages.TryGetValue(id, out int number))
return number;

return 0;
}

public void IncrMessage(string id)
{
if (!receivedMessages.ContainsKey(id))
receivedMessages.Add(id, 1);
else
receivedMessages[id]++;
}

public async Task<string> Receive(string message)
{
IncrMessage(message);

await Task.Delay(2000);

return message;
}
}
85 changes: 85 additions & 0 deletions Nixie.Tests/TestAskReplies.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,20 @@ public async Task TestAskMessageToSingleActor()
Assert.NotNull(reply);
Assert.Equal("TestSendMessageToSingleActor", reply);

Assert.Equal(1, ((ReplyActor)actor.Runner.Actor!).GetMessages("TestSendMessageToSingleActor"));
}

[Fact]
public async Task TestAskMessageToSingleActorWithTimeout()
{
using ActorSystem asx = new();

IActorRef<ReplyActor, string, string> actor = asx.Spawn<ReplyActor, string, string>();

string? reply = await actor.Ask("TestSendMessageToSingleActor", TimeSpan.FromSeconds(5));
Assert.NotNull(reply);
Assert.Equal("TestSendMessageToSingleActor", reply);

Assert.Equal(1, ((ReplyActor)actor.Runner.Actor!).GetMessages("TestSendMessageToSingleActor"));
}

Expand All @@ -44,6 +58,29 @@ public async Task TestCreateMultipleActorsAndAskOneMessage()
Assert.Equal(1, ((ReplyActor)actorRefs[i].Runner.Actor!).GetMessages("TestCreateMultipleActorsAndAskOneMessage"));
}

[Fact]
public async Task TestCreateMultipleActorsAndAskOneMessageWithTimeout()
{
using ActorSystem asx = new();

IActorRef<ReplyActor, string, string>[] actorRefs = new IActorRef<ReplyActor, string, string>[10];

for (int i = 0; i < 10; i++)
actorRefs[i] = asx.Spawn<ReplyActor, string, string>();

for (int i = 0; i < 10; i++)
{
string? response = await actorRefs[i].Ask("TestCreateMultipleActorsAndAskOneMessage", TimeSpan.FromSeconds(5));
Assert.NotNull(response);
Assert.Equal("TestCreateMultipleActorsAndAskOneMessage", response);
}

await asx.Wait();

for (int i = 0; i < 10; i++)
Assert.Equal(1, ((ReplyActor)actorRefs[i].Runner.Actor!).GetMessages("TestCreateMultipleActorsAndAskOneMessage"));
}

[Fact]
public async Task TestCreateMultipleActorsAndAskOneMessage2()
{
Expand Down Expand Up @@ -187,4 +224,52 @@ public async Task TestCreateMultiplePingPingAndAskMultipleRace()
for (int i = 0; i < 100; i++)
Assert.Equal(50, ((PingActor)actorRefs[i].Runner.Actor!).GetMessages());
}

[Fact]
public async Task TestAskMessageToSlowActor()
{
using ActorSystem asx = new();

IActorRef<ReplySlowActor, string, string> actor = asx.Spawn<ReplySlowActor, string, string>();

string? reply = await actor.Ask("TestAskMessageToSlowActor");
Assert.NotNull(reply);
Assert.Equal("TestAskMessageToSlowActor", reply);

Assert.Equal(1, ((ReplySlowActor)actor.Runner.Actor!).GetMessages("TestAskMessageToSlowActor"));
}

[Fact]
public async Task TestAskMessageToSlowActorWithTimeout()
{
using ActorSystem asx = new();

IActorRef<ReplySlowActor, string, string> actor = asx.Spawn<ReplySlowActor, string, string>();

string? reply = await actor.Ask("TestAskMessageToSlowActorWithTimeout", TimeSpan.FromSeconds(5));
Assert.NotNull(reply);
Assert.Equal("TestAskMessageToSlowActorWithTimeout", reply);

Assert.Equal(1, ((ReplySlowActor)actor.Runner.Actor!).GetMessages("TestAskMessageToSlowActorWithTimeout"));
}

[Fact]
public async Task TestAskMessageToSlowActorTriggerTimeout()
{
using ActorSystem asx = new();

IActorRef<ReplySlowActor, string, string> actor = asx.Spawn<ReplySlowActor, string, string>();

AskTimeoutException exception = await Assert.ThrowsAsync<AskTimeoutException>(async () => await TriggerTimeout(actor));
Assert.Equal("Timeout after 00:00:01 waiting for a reply", exception.Message);

await asx.Wait();

Assert.Equal(1, ((ReplySlowActor)actor.Runner.Actor!).GetMessages("TestAskMessageToSlowActorTriggerTimeout"));
}

private async Task TriggerTimeout(IActorRef<ReplySlowActor, string, string> actor)
{
await actor.Ask("TestAskMessageToSlowActorTriggerTimeout", TimeSpan.FromSeconds(1));
}
}
2 changes: 1 addition & 1 deletion Nixie/ActorMessageReply.cs
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@ public ActorMessageReply(TRequest request, IGenericActorRef? sender)
/// <param name="response"></param>
public void SetCompleted(TResponse? response)
{
Response = response;
Interlocked.Exchange(ref completed, 0);
Response = response;
}
}

18 changes: 17 additions & 1 deletion Nixie/ActorRefReply.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@

using System.Diagnostics;

namespace Nixie;

/// <summary>
Expand Down Expand Up @@ -68,11 +70,18 @@ public void Send(TRequest message, IGenericActorRef sender)
/// <param name="timeout"></param>
/// <returns></returns>
public async Task<TResponse?> Ask(TRequest message, TimeSpan timeout)
{
{
ActorMessageReply<TRequest, TResponse> promise = runner.SendAndTryDeliver(message, null);

Stopwatch stopwatch = Stopwatch.StartNew();

while (!promise.IsCompleted)
{
if (stopwatch.Elapsed >= timeout)
throw new AskTimeoutException($"Timeout after {timeout} waiting for a reply");

await Task.Yield();
}

return promise.Response;
}
Expand Down Expand Up @@ -105,8 +114,15 @@ public void Send(TRequest message, IGenericActorRef sender)
{
ActorMessageReply<TRequest, TResponse> promise = runner.SendAndTryDeliver(message, sender);

Stopwatch stopwatch = Stopwatch.StartNew();

while (!promise.IsCompleted)
{
if (stopwatch.Elapsed >= timeout)
throw new AskTimeoutException($"Timeout after {timeout} waiting for a reply");

await Task.Yield();
}

return promise.Response;
}
Expand Down
9 changes: 9 additions & 0 deletions Nixie/AskTimeoutException.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@

namespace Nixie;

public class AskTimeoutException : NixieException
{
public AskTimeoutException(string message) : base(message)
{
}
}
9 changes: 9 additions & 0 deletions Nixie/IActorRefReply.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,13 @@ public interface IActorRef<TActor, TRequest, TResponse> where TActor : IActor<TR
/// <param name="sender"></param>
/// <returns></returns>
public Task<TResponse?> Ask(TRequest message, IGenericActorRef sender);

/// <summary>
/// Sends a message to the actor and expects a response
/// An exception will be thrown if the timeout limit is reached
/// </summary>
/// <param name="message"></param>
/// <param name="sender"></param>
/// <returns></returns>
public Task<TResponse?> Ask(TRequest message, IGenericActorRef sender, TimeSpan timeout);
}
2 changes: 1 addition & 1 deletion Nixie/Nixie.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
<ImplicitUsings>enable</ImplicitUsings>
<Nullable>enable</Nullable>
<PackageId>Nixie</PackageId>
<Version>0.0.4-alpha</Version>
<Version>0.0.5-alpha</Version>
<Description>A Lightweight Actor Model Implementation for C#/.NET</Description>
<Authors>Andres Gutierrez</Authors>
<Company>Andres Gutierrez</Company>
Expand Down
2 changes: 1 addition & 1 deletion Nixie/NixieException.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@

namespace Nixie;

public sealed class NixieException : Exception
public class NixieException : Exception
{
public NixieException(string message) : base(message)
{
Expand Down

0 comments on commit 727a739

Please sign in to comment.