Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Using BufferedStream over NetworkStream for performance improvement. #1041

Merged
merged 10 commits into from
Sep 7, 2017
Original file line number Diff line number Diff line change
Expand Up @@ -7,27 +7,26 @@ namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities
using System.IO;
using System.Text;
using System.Threading.Tasks;

using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces;
using Microsoft.VisualStudio.TestPlatform.ObjectModel;
using Microsoft.VisualStudio.TestPlatform.PlatformAbstractions;
using Microsoft.VisualStudio.TestPlatform.Utilities;

/// <summary>
/// A communication channel using a length prefix packet frame for communication.
/// </summary>
public class LengthPrefixCommunicationChannel : ICommunicationChannel
{
private readonly Stream stream;

private readonly BinaryReader reader;

private readonly BinaryWriter writer;

public LengthPrefixCommunicationChannel(Stream stream)
{
this.stream = stream;
this.reader = new BinaryReader(stream, Encoding.UTF8, true);
this.writer = new BinaryWriter(stream, Encoding.UTF8, true);

// Using the Buffered stream while writing, improves the write performance. By reducing the number of writes.
this.writer = new BinaryWriter(new PlatformStream().CreateBufferedStream(stream, SocketConstants.BufferSize), Encoding.UTF8, true);
}

/// <inheritdoc />
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities
using System.Threading.Tasks;
using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces;
using Microsoft.VisualStudio.TestPlatform.ObjectModel;
using Microsoft.VisualStudio.TestPlatform.PlatformAbstractions;
using Microsoft.VisualStudio.TestPlatform.Utilities;

/// <summary>
Expand All @@ -35,7 +36,7 @@ protected SocketClient(Func<Stream, ICommunicationChannel> channelFactory)
this.cancellation = new CancellationTokenSource();
this.stopped = false;

this.tcpClient = new TcpClient();
this.tcpClient = new TcpClient { NoDelay = true };
Copy link
Contributor

@harshjain2 harshjain2 Sep 6, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Using NoDelay = true can be a bit inefficient.
More info here :
https://msdn.microsoft.com/en-us/library/system.net.sockets.tcpclient.nodelay(v=vs.110).aspx #Closed

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yeah, As it also says we need for immediate/important communications.


In reply to: 137197765 [](ancestors = 137197765)

this.channelFactory = channelFactory;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,9 @@ namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;

using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces;
using Microsoft.VisualStudio.TestPlatform.ObjectModel;
using Microsoft.VisualStudio.TestPlatform.PlatformAbstractions;

/// <summary>
/// Facilitates communication using sockets
Expand Down Expand Up @@ -70,11 +70,6 @@ public class SocketCommunicationManager : ICommunicationManager
/// </summary>
private object sendSyncObject = new object();

/// <summary>
/// Stream to use read timeout
/// </summary>
private NetworkStream stream;

private Socket socket;

/// <summary>
Expand All @@ -100,7 +95,6 @@ internal SocketCommunicationManager(IDataSerializer dataSerializer)
public IPEndPoint HostServer(IPEndPoint endpoint)
{
this.tcpListener = new TcpListener(endpoint);

this.tcpListener.Start();
EqtTrace.Info("Listening on Endpoint : {0}", (IPEndPoint)this.tcpListener.LocalEndpoint);

Expand All @@ -119,13 +113,20 @@ public async Task AcceptClientAsync()

var client = await this.tcpListener.AcceptTcpClientAsync();
this.socket = client.Client;
this.stream = client.GetStream();
this.binaryReader = new BinaryReader(this.stream);
this.binaryWriter = new BinaryWriter(this.stream);
this.socket.NoDelay = true;

this.clientConnectedEvent.Set();
// Using Buffered stream only in case of write, and Network stream in case of read.
var bufferedStream = new PlatformStream().CreateBufferedStream(client.GetStream(), SocketConstants.BufferSize);
var networkStream = client.GetStream();
this.binaryReader = new BinaryReader(networkStream);
this.binaryWriter = new BinaryWriter(bufferedStream);

EqtTrace.Info("Accepted Client request and set the flag");
this.clientConnectedEvent.Set();
if (EqtTrace.IsInfoEnabled)
{
EqtTrace.Info("Using the buffer size of {0} bytes", SocketConstants.BufferSize);
EqtTrace.Info("Accepted Client request and set the flag");
}
}
}

Expand Down Expand Up @@ -165,7 +166,7 @@ public async Task SetupClientAsync(IPEndPoint endpoint)
// for now added a check for validation of this.tcpclient
this.clientConnectionAcceptedEvent.Reset();
EqtTrace.Info("Trying to connect to server on socket : {0} ", endpoint);
this.tcpClient = new TcpClient();
this.tcpClient = new TcpClient { NoDelay = true };
this.socket = this.tcpClient.Client;

Stopwatch watch = new Stopwatch();
Expand All @@ -178,10 +179,18 @@ public async Task SetupClientAsync(IPEndPoint endpoint)

if (this.tcpClient.Connected)
{
this.stream = this.tcpClient.GetStream();
this.binaryReader = new BinaryReader(this.stream);
this.binaryWriter = new BinaryWriter(this.stream);
EqtTrace.Info("Connected to the server successfully ");
// Using Buffered stream only in case of write, and Network stream in case of read.
var bufferedStream = new PlatformStream().CreateBufferedStream(this.tcpClient.GetStream(), SocketConstants.BufferSize);
var networkStream = this.tcpClient.GetStream();
this.binaryReader = new BinaryReader(networkStream);
this.binaryWriter = new BinaryWriter(bufferedStream);

if (EqtTrace.IsInfoEnabled)
{
EqtTrace.Info("Connected to the server successfully ");
EqtTrace.Info("Using the buffer size of {0} bytes", SocketConstants.BufferSize);
}

this.clientConnectionAcceptedEvent.Set();
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities
{
public class SocketConstants
{
// Buffer size for the buffered stream we are using.
public const int BufferSize = 16384;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,9 @@ namespace Microsoft.VisualStudio.TestPlatform.CommunicationUtilities
using System.Net.Sockets;
using System.Threading;
using System.Threading.Tasks;

using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces;
using Microsoft.VisualStudio.TestPlatform.ObjectModel;
using Microsoft.VisualStudio.TestPlatform.PlatformAbstractions;
using Microsoft.VisualStudio.TestPlatform.Utilities;

/// <summary>
Expand Down Expand Up @@ -88,10 +88,11 @@ public void Stop()
private void OnClientConnected(TcpClient client)
{
this.tcpClient = client;
this.tcpClient.Client.NoDelay = true;

if (this.ClientConnected != null)
{
this.channel = this.channelFactory(client.GetStream());
this.channel = this.channelFactory(this.tcpClient.GetStream());
this.ClientConnected.SafeInvoke(this, new ConnectedEventArgs(this.channel), "SocketServer: ClientConnected");

// Start the message loop
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Microsoft.VisualStudio.TestPlatform.PlatformAbstractions.Interfaces
{
using System.IO;

/// <summary>
/// Helper class to return plaform specific stream.
/// </summary>
public interface IStream
{
/// <summary>
/// Returns platrform specific Buffered Stream with desired buffer size.
/// </summary>
/// <param name="stream">Input Stream</param>
/// <param name="bufferSize">Buffer Size</param>
/// <returns>Buffered Stream</returns>
Stream CreateBufferedStream(Stream stream, int bufferSize);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Microsoft.VisualStudio.TestPlatform.PlatformAbstractions
{
using System.IO;
using Microsoft.VisualStudio.TestPlatform.PlatformAbstractions.Interfaces;

/// <inheritdoc/>
public class PlatformStream : IStream
{
/// <inheritdoc/>
public Stream CreateBufferedStream(Stream stream, int bufferSize)
{
return new BufferedStream(stream, bufferSize);
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Microsoft.VisualStudio.TestPlatform.PlatformAbstractions
{
using System;
using System.IO;
using Microsoft.VisualStudio.TestPlatform.PlatformAbstractions.Interfaces;

/// <inheritdoc/>
public class PlatformStream : IStream
{
/// <inheritdoc/>
public Stream CreateBufferedStream(Stream stream, int bufferSize)
{
throw new NotImplementedException();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT license. See LICENSE file in the project root for full license information.

namespace Microsoft.VisualStudio.TestPlatform.PlatformAbstractions
{
using System.IO;
using Microsoft.VisualStudio.TestPlatform.PlatformAbstractions.Interfaces;

/// <inheritdoc/>
public class PlatformStream : IStream
{
/// <inheritdoc/>
public Stream CreateBufferedStream(Stream stream, int bufferSize)
{
return stream;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,15 @@
namespace Microsoft.TestPlatform.CommunicationUtilities.PlatformTests
{
using System;
using System.Diagnostics;
using System.IO;
using System.Net;
using System.Net.Sockets;
using System.Text;
using System.Threading;
using System.Threading.Tasks;

using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities;
using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.Interfaces;
using Microsoft.VisualStudio.TestPlatform.CommunicationUtilities.ObjectModel;
using Microsoft.VisualStudio.TestTools.UnitTesting;

Expand Down Expand Up @@ -264,9 +265,55 @@ public async Task ReceiveRawMessageAsyncShouldNotDeserializeThePayload()

Assert.AreEqual(DummyPayload, message);
}

#endregion

[TestMethod]
public void SocketPollShouldNotHangServerClientCommunication()
{
// Measure the throughput with socket communication v1 (SocketCommunicationManager)
// implementation.
var server = new SocketCommunicationManager();
var client = new SocketCommunicationManager();

int port = server.HostServer(new IPEndPoint(IPAddress.Loopback, 0)).Port;
client.SetupClientAsync(new IPEndPoint(IPAddress.Loopback, port)).Wait();
server.AcceptClientAsync().Wait();

server.WaitForClientConnection(1000);
client.WaitForServerConnection(1000);

var clientThread = new Thread(() => SendData(client));
clientThread.Start();

var dataReceived = 0;
while (dataReceived < 2048 * 5)
{
dataReceived += server.ReceiveRawMessageAsync(CancellationToken.None).Result.Length;
Task.Delay(1000).Wait();
}

clientThread.Join();

Assert.IsTrue(true);
}

private static void SendData(ICommunicationManager communicationManager)
{
// Having less than the buffer size in SocketConstants.BUFFERSIZE.
var dataBytes = new byte[2048];
for (int i = 0; i < dataBytes.Length; i++)
{
dataBytes[i] = 0x65;
}

var dataBytesStr = Encoding.UTF8.GetString(dataBytes);

for (int i = 0; i < 5; i++)
{
communicationManager.SendRawMessage(dataBytesStr);
}
}

private int StartServer()
{
this.tcpListener.Start();
Expand Down