Skip to content

Commit

Permalink
Better integrate max message size
Browse files Browse the repository at this point in the history
Fixes #1223 for 6.x

Integrate MaxMessageSize into ConnectionFactory

Update APIApproval

CHANGELOG

Handle excessive payload size with hard protocol exception

Add tests for MaxMessageSize

Update API
  • Loading branch information
lukebakken committed Jun 14, 2022
1 parent f69cbbf commit 5930d91
Show file tree
Hide file tree
Showing 11 changed files with 210 additions and 10 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
_site/

###################
## Generated files
###################
Expand Down
23 changes: 23 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,26 @@
## Changes Between 6.3.1 and 6.4.0

This release adds the ability to specify a maximum message size when receiving data. The default
values are:

* RabbitMQ .NET client 7.0.0 and beyond: 128MiB
* RabbitMQ .NET client 6.4.0 up to 7.0.0: no limit by default

Receiving a frame that specifies a content larger than the limit will throw an execption. This is to
help prevent situations as described in [this discussion](https://github.com/rabbitmq/rabbitmq-dotnet-client/discussions/1213).

To set a limit, use the set `MaxMessageSize` on your `ConnectionFactory` before opening connections:

```
// This sets the limit to 512MiB
var cf = new ConnectionFactory();
cf.MaxMessageSize = 536870912;
var conn = cf.CreateConnection()`
```

GitHub milestone: [`6.4.0`](https://github.com/rabbitmq/rabbitmq-dotnet-client/milestone/58?closed=1)
Diff: [link](https://github.com/rabbitmq/rabbitmq-dotnet-client/compare/v6.3.1...v6.4.0)

## Changes Between 6.3.0 and 6.3.1

GitHub milestone: [`6.3.1`](https://github.com/rabbitmq/rabbitmq-dotnet-client/milestone/57?closed=1)
Expand Down
28 changes: 23 additions & 5 deletions projects/RabbitMQ.Client/client/api/AmqpTcpEndpoint.cs
Original file line number Diff line number Diff line change
Expand Up @@ -60,17 +60,32 @@ public class AmqpTcpEndpoint// : ICloneable

private int _port;

private readonly uint _maxMessageSize;

/// <summary>
/// Creates a new instance of the <see cref="AmqpTcpEndpoint"/>.
/// </summary>
/// <param name="hostName">Hostname.</param>
/// <param name="portOrMinusOne"> Port number. If the port number is -1, the default port number will be used.</param>
/// <param name="ssl">Ssl option.</param>
public AmqpTcpEndpoint(string hostName, int portOrMinusOne, SslOption ssl)
/// <param name="maxMessageSize">Maximum message size from RabbitMQ. 0 means "unlimited"</param>
public AmqpTcpEndpoint(string hostName, int portOrMinusOne, SslOption ssl, uint maxMessageSize)
{
HostName = hostName;
_port = portOrMinusOne;
Ssl = ssl;
_maxMessageSize = maxMessageSize;
}

/// <summary>
/// Creates a new instance of the <see cref="AmqpTcpEndpoint"/>.
/// </summary>
/// <param name="hostName">Hostname.</param>
/// <param name="portOrMinusOne"> Port number. If the port number is -1, the default port number will be used.</param>
/// <param name="ssl">Ssl option.</param>
public AmqpTcpEndpoint(string hostName, int portOrMinusOne, SslOption ssl) :
this(hostName, portOrMinusOne, ssl, 0)
{
}

/// <summary>
Expand Down Expand Up @@ -116,7 +131,7 @@ public AmqpTcpEndpoint(Uri uri) : this(uri.Host, uri.Port)
/// <returns>A copy with the same hostname, port, and TLS settings</returns>
public object Clone()
{
return new AmqpTcpEndpoint(HostName, _port, Ssl);
return new AmqpTcpEndpoint(HostName, _port, Ssl, _maxMessageSize);
}

/// <summary>
Expand All @@ -126,7 +141,7 @@ public object Clone()
/// <returns>A copy with the provided hostname and port/TLS settings of this endpoint</returns>
public AmqpTcpEndpoint CloneWithHostname(string hostname)
{
return new AmqpTcpEndpoint(hostname, _port, Ssl);
return new AmqpTcpEndpoint(hostname, _port, Ssl, _maxMessageSize);
}

/// <summary>
Expand Down Expand Up @@ -176,9 +191,12 @@ public IProtocol Protocol
public SslOption Ssl { get; set; }

/// <summary>
/// Set the maximum size for a message in bytes. The default value is 0 (unlimited)
/// Get the maximum size for a message in bytes. The default value is 0 (unlimited)
/// </summary>
public uint MaxMessageSize { get; set; }
public uint MaxMessageSize
{
get { return _maxMessageSize; }
}

/// <summary>
/// Construct an instance from a protocol and an address in "hostname:port" format.
Expand Down
17 changes: 16 additions & 1 deletion projects/RabbitMQ.Client/client/api/ConnectionFactory.cs
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ namespace RabbitMQ.Client
/// factory.VirtualHost = ConnectionFactory.DefaultVHost;
/// factory.HostName = hostName;
/// factory.Port = AmqpTcpEndpoint.UseDefaultPort;
/// factory.MaxMessageSize = 512 * 1024 * 1024;
/// //
/// IConnection conn = factory.CreateConnection();
/// //
Expand Down Expand Up @@ -103,6 +104,13 @@ public sealed class ConnectionFactory : ConnectionFactoryBase, IAsyncConnectionF
/// </summary>
public const uint DefaultFrameMax = 0;

/// <summary>
/// Default value for the maximum allowed message size, in bytes, from RabbitMQ.
/// Corresponds to the <code>rabbit.max_message_size</code> setting.
/// Note: the default is 0 which means "unlimited".
/// </summary>
public const uint DefaultMaxMessageSize = 0;

/// <summary>
/// Default value for desired heartbeat interval. Default is 60 seconds,
/// TimeSpan.Zero means "heartbeats are disabled".
Expand Down Expand Up @@ -276,12 +284,13 @@ public ConnectionFactory()
/// </summary>
public AmqpTcpEndpoint Endpoint
{
get { return new AmqpTcpEndpoint(HostName, Port, Ssl); }
get { return new AmqpTcpEndpoint(HostName, Port, Ssl, MaxMessageSize); }
set
{
Port = value.Port;
HostName = value.HostName;
Ssl = value.Ssl;
MaxMessageSize = value.MaxMessageSize;
}
}

Expand Down Expand Up @@ -325,6 +334,12 @@ public AmqpTcpEndpoint Endpoint
/// </summary>
public string VirtualHost { get; set; } = DefaultVHost;

/// <summary>
/// Maximum allowed message size, in bytes, from RabbitMQ.
/// Corresponds to the <code>rabbit.max_message_size</code> setting.
/// </summary>
public uint MaxMessageSize { get; set; } = DefaultMaxMessageSize;

/// <summary>
/// The uri to use for the connection.
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,20 @@ namespace RabbitMQ.Client.Exceptions
///requiring a connection.close.</summary>
public abstract class HardProtocolException : ProtocolException
{
protected readonly bool _canShutdownCleanly = true;

protected HardProtocolException(string message) : base(message)
{
}

protected HardProtocolException(string message, bool canShutdownCleanly) : base(message)
{
_canShutdownCleanly= canShutdownCleanly;
}

public bool CanShutdownCleanly
{
get { return _canShutdownCleanly; }
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,11 @@ public MalformedFrameException(string message) : base(message)
{
}

public MalformedFrameException(string message, bool canShutdownCleanly) :
base(message, canShutdownCleanly)
{
}

public override ushort ReplyCode
{
get { return Constants.FrameError; }
Expand Down
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/client/impl/Connection.cs
Original file line number Diff line number Diff line change
Expand Up @@ -479,7 +479,7 @@ public bool HardProtocolExceptionHandler(HardProtocolException hpe)
hpe.ShutdownReason.ReplyCode,
hpe.ShutdownReason.ReplyText));
}
return true;
return hpe.CanShutdownCleanly;
}
catch (IOException ioe)
{
Expand Down
3 changes: 2 additions & 1 deletion projects/RabbitMQ.Client/client/impl/Frame.cs
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,8 @@ internal static InboundFrame ReadFrom(Stream reader, byte[] frameHeaderBuffer, A
int payloadSize = NetworkOrderDeserializer.ReadInt32(new ReadOnlySpan<byte>(frameHeaderBuffer, 2, 4));
if ((maxMessageSize > 0) && (payloadSize > maxMessageSize))
{
throw new MalformedFrameException($"Frame payload size '{payloadSize}' exceeds maximum of '{maxMessageSize}' bytes");
string msg = $"Frame payload size '{payloadSize}' exceeds maximum of '{maxMessageSize}' bytes";
throw new MalformedFrameException(message: msg, canShutdownCleanly: false);
}

const int EndMarkerLength = 1;
Expand Down
9 changes: 8 additions & 1 deletion projects/Unit/APIApproval.Approve.verified.txt
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,10 @@ namespace RabbitMQ.Client
public AmqpTcpEndpoint(string hostName, int portOrMinusOne = -1) { }
public AmqpTcpEndpoint(System.Uri uri, RabbitMQ.Client.SslOption ssl) { }
public AmqpTcpEndpoint(string hostName, int portOrMinusOne, RabbitMQ.Client.SslOption ssl) { }
public AmqpTcpEndpoint(string hostName, int portOrMinusOne, RabbitMQ.Client.SslOption ssl, uint maxMessageSize) { }
public System.Net.Sockets.AddressFamily AddressFamily { get; set; }
public string HostName { get; set; }
public uint MaxMessageSize { get; set; }
public uint MaxMessageSize { get; }
public int Port { get; set; }
public RabbitMQ.Client.IProtocol Protocol { get; }
public RabbitMQ.Client.SslOption Ssl { get; set; }
Expand Down Expand Up @@ -72,6 +73,7 @@ namespace RabbitMQ.Client
{
public const ushort DefaultChannelMax = 2047;
public const uint DefaultFrameMax = 0u;
public const uint DefaultMaxMessageSize = 0u;
public const string DefaultPass = "guest";
public const string DefaultUser = "guest";
public const string DefaultVHost = "/";
Expand All @@ -91,6 +93,7 @@ namespace RabbitMQ.Client
public System.Func<System.Collections.Generic.IEnumerable<RabbitMQ.Client.AmqpTcpEndpoint>, RabbitMQ.Client.IEndpointResolver> EndpointResolverFactory { get; set; }
public System.TimeSpan HandshakeContinuationTimeout { get; set; }
public string HostName { get; set; }
public uint MaxMessageSize { get; set; }
public System.Buffers.ArrayPool<byte> MemoryPool { get; set; }
public System.TimeSpan NetworkRecoveryInterval { get; set; }
public string Password { get; set; }
Expand Down Expand Up @@ -728,11 +731,15 @@ namespace RabbitMQ.Client.Exceptions
}
public abstract class HardProtocolException : RabbitMQ.Client.Exceptions.ProtocolException
{
protected readonly bool _canShutdownCleanly;
protected HardProtocolException(string message) { }
protected HardProtocolException(string message, bool canShutdownCleanly) { }
public bool CanShutdownCleanly { get; }
}
public class MalformedFrameException : RabbitMQ.Client.Exceptions.HardProtocolException
{
public MalformedFrameException(string message) { }
public MalformedFrameException(string message, bool canShutdownCleanly) { }
public override ushort ReplyCode { get; }
}
[System.Serializable]
Expand Down
92 changes: 92 additions & 0 deletions projects/Unit/TestBasicPublish.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
using System;
using System.Text;
using System.Threading;
using System.Threading.Tasks;
using NUnit.Framework;
Expand Down Expand Up @@ -101,5 +102,96 @@ public void CanNotModifyPayloadAfterPublish()
m.BasicCancel(tag);
}
}

[Test]
public void TestMaxMessageSize()
{
var re = new ManualResetEventSlim();
const ushort maxMsgSize = 1024;

int count = 0;
byte[] msg0 = Encoding.UTF8.GetBytes("hi");

var r = new System.Random();
byte[] msg1 = new byte[maxMsgSize * 2];
r.NextBytes(msg1);

var cf = new ConnectionFactory();
cf.AutomaticRecoveryEnabled = false;
cf.TopologyRecoveryEnabled = false;
cf.MaxMessageSize = maxMsgSize;

bool sawConnectionShutdown = false;
bool sawModelShutdown = false;
bool sawConsumerRegistered = false;
bool sawConsumerCancelled = false;

using (IConnection c = cf.CreateConnection())
{
c.ConnectionShutdown += (o, a) =>
{
sawConnectionShutdown= true;
};

Assert.AreEqual(maxMsgSize, cf.MaxMessageSize);
Assert.AreEqual(maxMsgSize, cf.Endpoint.MaxMessageSize);
Assert.AreEqual(maxMsgSize, c.Endpoint.MaxMessageSize);

using (IModel m = c.CreateModel())
{
m.ModelShutdown += (o, a) =>
{
sawModelShutdown= true;
};

m.CallbackException += (o, a) =>
{
Assert.Fail("Unexpected m.CallbackException");
};

QueueDeclareOk q = m.QueueDeclare();
IBasicProperties bp = m.CreateBasicProperties();

var consumer = new EventingBasicConsumer(m);

consumer.Shutdown += (o, a) =>
{
re.Set();
};

consumer.Registered += (o, a) =>
{
sawConsumerRegistered = true;
};

consumer.Unregistered += (o, a) =>
{
Assert.Fail("Unexpected consumer.Unregistered");
};

consumer.ConsumerCancelled += (o, a) =>
{
sawConsumerCancelled = true;
};

consumer.Received += (o, a) =>
{
Interlocked.Increment(ref count);
};

string tag = m.BasicConsume(q.QueueName, true, consumer);

m.BasicPublish("", q.QueueName, bp, msg0);
m.BasicPublish("", q.QueueName, bp, msg1);
Assert.IsTrue(re.Wait(TimeSpan.FromSeconds(5)));

Assert.AreEqual(1, count);
Assert.IsTrue(sawConnectionShutdown);
Assert.IsTrue(sawModelShutdown);
Assert.IsTrue(sawConsumerRegistered);
Assert.IsTrue(sawConsumerCancelled);
}
}
}
}
}
Loading

0 comments on commit 5930d91

Please sign in to comment.