Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Tcp bus #110

Merged
merged 2 commits into from
May 23, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
242 changes: 190 additions & 52 deletions src/ReactiveDomain.Transport.Tests/TcpBusClientSideTests.cs
Original file line number Diff line number Diff line change
@@ -1,70 +1,208 @@
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Net;
using System.Threading;
using System.Threading.Tasks;
using Newtonsoft.Json;
using ReactiveDomain.Messaging;
using ReactiveDomain.Messaging.Bus;
using ReactiveDomain.Transport.Serialization;
using Xunit;

namespace ReactiveDomain.Transport.Tests
{
public class TcpBusClientSideTests
[Collection("TCP bus tests")]
public class TcpBusClientSideTests : IDisposable
{
private IPAddress _hostAddress;
private IDispatcher _commandBus;
private IPAddress _clientAddress;
private MockTcpConnection _clientTcpConnection;
private const int CommandPort = 10660;
private const string ShortProp = "abc";
// 16kb is large enough to cause the transport to split up the frame.
// It would be better if we did the splitting manually so we were sure it really happened.
// Would require mocking more things.
private readonly string _longProp = string.Join("", Enumerable.Repeat("a", 16 * 1024));

private readonly Dispatcher _localBus = new Dispatcher("local");
private readonly IList<IDisposable> _subscriptions = new List<IDisposable>();
private readonly TcpBusServerSide _tcpBusServerSide;
private readonly TcpBusClientSide _tcpBusClientSide;
private readonly TaskCompletionSource<IMessage> _tcs;

public TcpBusClientSideTests()
{
_commandBus = new Dispatcher("TestBus");
_hostAddress = IPAddress.Loopback;
_clientAddress = IPAddress.Loopback;
_clientTcpConnection = MockTcpConnection.CreateConnectingTcpConnection(Guid.NewGuid(),
new IPEndPoint(_hostAddress, CommandPort),
new TcpClientConnector(),
TimeSpan.FromSeconds(120),
conn =>
{
},
(conn, err) =>
{
},
verbose: true);
var hostAddress = IPAddress.Loopback;
var port = 10000;
_tcs = new TaskCompletionSource<IMessage>();

// server side
var serverInbound = new QueuedHandler(
new AdHocHandler<IMessage>(m => { if (m is Command cmd) _localBus.TrySend(cmd, out _); }),
"InboundMessageServerHandler",
true,
TimeSpan.FromMilliseconds(1000));

_tcpBusServerSide = new TcpBusServerSide(
hostAddress,
port,
inboundNondiscardingMessageTypes: new[] { typeof(WoftamCommand) },
inboundNondiscardingMessageQueuedHandler: serverInbound);

_localBus.SubscribeToAll(_tcpBusServerSide);

serverInbound.Start();

// client side
var clientInbound = new QueuedHandler(
new AdHocHandler<IMessage>(_tcs.SetResult),
"InboundMessageQueuedHandler",
true,
TimeSpan.FromMilliseconds(1000));

_tcpBusClientSide = new TcpBusClientSide(
hostAddress,
port,
inboundNondiscardingMessageTypes: new[] { typeof(CommandResponse) },
inboundNondiscardingMessageQueuedHandler: clientInbound,
messageSerializers: new Dictionary<Type, IMessageSerializer>
{ { typeof(WoftamCommandResponse), new WoftamCommandResponse.Serializer() } });

clientInbound.Start();

// wait for tcp connection to be established (maybe an api to detect this would be nice)
Thread.Sleep(TimeSpan.FromMilliseconds(200));
}

[Fact]
public void can_send_command()
{
var handler = new WoftamCommandHandler(_longProp);
_subscriptions.Add(_localBus.Subscribe(handler));

// First send the command to server so it knows where to send the response.
_tcpBusClientSide.Handle(MessageBuilder.New(() => new WoftamCommand(ShortProp)));

// expect to receive it on the client side
var gotMessage = _tcs.Task.Wait(TimeSpan.FromMilliseconds(1000));
Assert.True(gotMessage);
Assert.IsType<Success>(_tcs.Task.Result);
}

[Fact]
public void can_handle_split_frames() // Also tests custom deserializer
{
var handler = new WoftamCommandHandler(_longProp) { ReturnCustomResponse = true };
_subscriptions.Add(_localBus.Subscribe(handler));

// First send the command to server so it knows where to send the response.
// We don't need this properties to be large since we're only testing message
// splitting from server to client.
_tcpBusClientSide.Handle(MessageBuilder.New(() => new WoftamCommand(ShortProp)));

// expect to receive it on the client side
var gotMessage = _tcs.Task.Wait(TimeSpan.FromMilliseconds(1000));
Assert.True(gotMessage);
var response = Assert.IsType<WoftamCommandResponse>(_tcs.Task.Result);
Assert.Equal(_longProp, response.PropertyA);
}

public void Dispose()
{
foreach (var subscription in _subscriptions)
{
subscription.Dispose();
}
_localBus?.Dispose();
_tcpBusClientSide.Dispose();
_tcpBusServerSide.Dispose();
}
}

public class WoftamCommand : Command
{
public readonly string Property1;

public WoftamCommand(string property1)
{
Property1 = property1;
}
}

public class WoftamCommandResponse : Success
{
public readonly string PropertyA;

public WoftamCommandResponse(WoftamCommand source, string propertyA)
: base(source)
{
PropertyA = propertyA;
}
~TcpBusClientSideTests()

public class Serializer : IMessageSerializer
{
_clientTcpConnection.Close("I'm done.");
public IMessage DeserializeMessage(string json, Type messageType)
{
var reader = new JsonTextReader(new StringReader(json));
var propA = "";
var correlationId = Guid.Empty;
var causationId = Guid.Empty;
WoftamCommand sourceCommand = null;
while (reader.Read())
{
if (reader.TokenType == JsonToken.PropertyName)
{
if (reader.Value.ToString() == nameof(PropertyA))
{
reader.Read();
propA = reader.Value.ToString();
}
else if (reader.Value.ToString() == "CorrelationId")
{
reader.Read();
correlationId = Guid.Parse(reader.Value.ToString());
}
else if (reader.Value.ToString() == "CausationId")
{
reader.Read();
causationId = Guid.Parse(reader.Value.ToString());
}
else if (reader.Value.ToString() == "SourceCommand")
{
reader.Read();
var serializer = new JsonSerializer();
sourceCommand = serializer.Deserialize<WoftamCommand>(reader);
break;
}
}
}
if (sourceCommand is null)
throw new JsonSerializationException("Could not deserialize WoftamCommandResponse.");
var response = new WoftamCommandResponse(sourceCommand, propA);
if (correlationId != Guid.Empty)
response.CorrelationId = correlationId;
if (causationId != Guid.Empty)
response.CausationId = causationId;
return response;
}

public string SerializeMessage(IMessage message)
{
return JsonConvert.SerializeObject(message, Json.JsonSettings);
}
}
}

// Sigh... at this point, there are no commands defined in ReactiveDomain, so I have nothing with
// which to test.

//[Fact]
//public void handle_command_test()
//{
// // Set up the TcpBusClientSide that I will test, and also hook up the LengthPrefixMessageFramer
// var tcpBusClientSide = new TcpBusClientSide(_hostAddress, _commandBus, _clientAddress, 10000, _clientTcpConnection);
// tcpBusClientSide._framer.RegisterMessageArrivedCallback(tcpBusClientSide.TcpMessageArrived);
// Action<ITcpConnection, IEnumerable<ArraySegment<byte>>> callback = null;
// callback = (x, data) =>
// {
// tcpBusClientSide._framer.UnFrameData(data);
// _clientTcpConnection.ReceiveAsync(callback);
// };
// _clientTcpConnection.ReceiveAsync(callback);

// _clientTcpConnection.SentData = null;
// var cmd = new ImageProcess.Decolorize(true, Guid.NewGuid(), null);

// var cmdResponse = tcpBusClientSide.Handle(cmd);

// var expectedSentData = tcpBusClientSide._framer.FrameData((new TcpMessage(cmd).AsArraySegment()));
// var expectedCmdResponse = cmd.Succeed();
// Assert.NotNull(_clientTcpConnection.SentData);
// Assert.Equal(expectedSentData.ToArray(), _clientTcpConnection.SentData.ToArray());
// Assert.NotNull(cmdResponse);
// Assert.Equal(expectedCmdResponse.Succeeded, cmdResponse.Succeeded);
// Assert.Equal(expectedCmdResponse.Error, cmdResponse.Error);
//}
public class WoftamCommandHandler : IHandleCommand<WoftamCommand>
{
private readonly string _prop;
public bool ReturnCustomResponse { get; set; }

public WoftamCommandHandler(string prop)
{
_prop = prop;
}
public CommandResponse Handle(WoftamCommand command)
{
return ReturnCustomResponse ? new WoftamCommandResponse(command, _prop) : command.Succeed();
}
}
}
65 changes: 52 additions & 13 deletions src/ReactiveDomain.Transport.Tests/TcpBusServerSideTests.cs
Original file line number Diff line number Diff line change
@@ -1,5 +1,4 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Net;
using System.Threading;
Expand All @@ -10,37 +9,39 @@

namespace ReactiveDomain.Transport.Tests
{
[Collection("TCP bus tests")]
public class TcpBusServerSideTests
{
private readonly IPAddress _hostAddress = IPAddress.Loopback;
private int port = 10000;
private readonly TaskCompletionSource<IMessage> _tcs = new TaskCompletionSource<IMessage>();

[Fact]
public void can_handle_split_frames()
{
// 16kb large enough to cause the transport to split up the frame.
// it would be better if we did the splitting manually so we were sure it really happened.
// would require mocking more things.
var hostAddress = IPAddress.Loopback;
var prop1 = "prop1";
var prop2 = string.Join("", Enumerable.Repeat("a", 16 * 1024));
var port = 10000;
var tcs = new TaskCompletionSource<IMessage>();

// server side
var serverInbound = new QueuedHandler(
new AdHocHandler<IMessage>(tcs.SetResult),
new AdHocHandler<IMessage>(_tcs.SetResult),
"InboundMessageQueuedHandler",
true,
TimeSpan.FromMilliseconds(1000));

var tcpBusServerSide = new TcpBusServerSide(hostAddress, port, null)
{
InboundMessageQueuedHandler = serverInbound,
InboundSpamMessageTypes = new List<Type>(),
};
var tcpBusServerSide = new TcpBusServerSide(
_hostAddress,
port,
inboundNondiscardingMessageTypes: new[] { typeof(WoftamEvent) },
inboundNondiscardingMessageQueuedHandler: serverInbound);

serverInbound.Start();

// client side
var tcpBusClientSide = new TcpBusClientSide(null, hostAddress, port);
var tcpBusClientSide = new TcpBusClientSide(_hostAddress, port);

// wait for tcp connection to be established (maybe an api to detect this would be nice)
Thread.Sleep(TimeSpan.FromMilliseconds(200));
Expand All @@ -49,11 +50,49 @@ public void can_handle_split_frames()
tcpBusClientSide.Handle(new WoftamEvent(prop1, prop2));

// expect to receive it in the server
var gotMessage = tcs.Task.Wait(TimeSpan.FromMilliseconds(1000));
var gotMessage = _tcs.Task.Wait(TimeSpan.FromMilliseconds(1000));
Assert.True(gotMessage);
var evt = Assert.IsType<WoftamEvent>(tcs.Task.Result);
var evt = Assert.IsType<WoftamEvent>(_tcs.Task.Result);
Assert.Equal(prop1, evt.Property1);
Assert.Equal(prop2, evt.Property2);

tcpBusClientSide.Dispose();
tcpBusServerSide.Dispose();
}

[Fact]
public void can_filter_out_message_types()
{
// server side
var serverInbound = new QueuedHandler(
new AdHocHandler<IMessage>(_tcs.SetResult),
"InboundMessageQueuedHandler",
true,
TimeSpan.FromMilliseconds(1000));

var tcpBusServerSide = new TcpBusServerSide(
_hostAddress,
port,
inboundNondiscardingMessageTypes: new[] { typeof(WoftamEvent) },
inboundNondiscardingMessageQueuedHandler: serverInbound);

serverInbound.Start();

// client side
var tcpBusClientSide = new TcpBusClientSide(_hostAddress, port);

// wait for tcp connection to be established (maybe an api to detect this would be nice)
Thread.Sleep(TimeSpan.FromMilliseconds(200));

// put disallowed message into client
tcpBusClientSide.Handle(new WoftamCommand("abc"));

// expect to receive it in the server but drop it on the floor
var gotMessage = _tcs.Task.Wait(TimeSpan.FromMilliseconds(1000));
Assert.False(gotMessage);

tcpBusClientSide.Dispose();
tcpBusServerSide.Dispose();
}
}
}
Loading