Skip to content

Commit

Permalink
Acquiring a sequence number should be an async method
Browse files Browse the repository at this point in the history
  • Loading branch information
danielmarbach committed Sep 13, 2024
1 parent a6b7e38 commit f036020
Show file tree
Hide file tree
Showing 7 changed files with 44 additions and 47 deletions.
2 changes: 1 addition & 1 deletion projects/RabbitMQ.Client/PublicAPI.Shipped.txt
Original file line number Diff line number Diff line change
Expand Up @@ -440,9 +440,9 @@ RabbitMQ.Client.IChannel.CurrentQueue.get -> string
RabbitMQ.Client.IChannel.DefaultConsumer.get -> RabbitMQ.Client.IAsyncBasicConsumer
RabbitMQ.Client.IChannel.DefaultConsumer.set -> void
RabbitMQ.Client.IChannel.FlowControl -> System.EventHandler<RabbitMQ.Client.Events.FlowControlEventArgs>
RabbitMQ.Client.IChannel.GetNextPublishSequenceNumberAsync(System.Threading.CancellationToken cancellationToken = default(System.Threading.CancellationToken)) -> System.Threading.Tasks.ValueTask<ulong>
RabbitMQ.Client.IChannel.IsClosed.get -> bool
RabbitMQ.Client.IChannel.IsOpen.get -> bool
RabbitMQ.Client.IChannel.NextPublishSeqNo.get -> ulong
RabbitMQ.Client.IChannelExtensions
RabbitMQ.Client.IConnection
RabbitMQ.Client.IConnection.CallbackException -> System.EventHandler<RabbitMQ.Client.Events.CallbackExceptionEventArgs>
Expand Down
10 changes: 5 additions & 5 deletions projects/RabbitMQ.Client/client/api/IChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,11 +95,6 @@ public interface IChannel : IDisposable
/// </summary>
bool IsOpen { get; }

/// <summary>
/// When in confirm mode, return the sequence number of the next message to be published.
/// </summary>
ulong NextPublishSeqNo { get; }

/// <summary>
/// The name of the last queue declared on this channel.
/// </summary>
Expand Down Expand Up @@ -142,6 +137,11 @@ public interface IChannel : IDisposable
/// handler is added to this event, the event handler will be fired immediately.
/// </remarks>
event EventHandler<ShutdownEventArgs> ChannelShutdown;

/// <summary>
/// When in confirm mode, return the sequence number of the next message to be published.
/// </summary>
ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default);

/// <summary>Asynchronously acknknowledges one or more messages.</summary>
/// <param name="deliveryTag">The delivery tag.</param>
Expand Down
4 changes: 2 additions & 2 deletions projects/RabbitMQ.Client/client/impl/AutorecoveringChannel.cs
Original file line number Diff line number Diff line change
Expand Up @@ -144,8 +144,6 @@ public IAsyncBasicConsumer? DefaultConsumer

public bool IsOpen => !_disposed && _innerChannel.IsOpen;

public ulong NextPublishSeqNo => InnerChannel.NextPublishSeqNo;

public string? CurrentQueue => InnerChannel.CurrentQueue;

internal async Task<bool> AutomaticallyRecoverAsync(AutorecoveringConnection conn, bool recoverConsumers,
Expand Down Expand Up @@ -274,6 +272,8 @@ public void Dispose()
_disposed = true;
}

public ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default) => InnerChannel.GetNextPublishSequenceNumberAsync(cancellationToken);

public ValueTask BasicAckAsync(ulong deliveryTag, bool multiple, CancellationToken cancellationToken)
=> InnerChannel.BasicAckAsync(deliveryTag, multiple, cancellationToken);

Expand Down
43 changes: 20 additions & 23 deletions projects/RabbitMQ.Client/client/impl/ChannelBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -181,29 +181,6 @@ public IAsyncBasicConsumer? DefaultConsumer
[MemberNotNullWhen(false, nameof(CloseReason))]
public bool IsOpen => CloseReason is null;

public ulong NextPublishSeqNo
{
get
{
if (ConfirmsAreEnabled)
{
_confirmSemaphore.Wait();
try
{
return _nextPublishSeqNo;
}
finally
{
_confirmSemaphore.Release();
}
}
else
{
return _nextPublishSeqNo;
}
}
}

public string? CurrentQueue { get; private set; }

public ISession Session { get; private set; }
Expand Down Expand Up @@ -805,6 +782,26 @@ protected void HandleConnectionUnblocked()
Session.Connection.HandleConnectionUnblocked();
}

public async ValueTask<ulong> GetNextPublishSequenceNumberAsync(CancellationToken cancellationToken = default)
{
if (ConfirmsAreEnabled)
{
await _confirmSemaphore.WaitAsync(cancellationToken).ConfigureAwait(false);
try
{
return _nextPublishSeqNo;
}
finally
{
_confirmSemaphore.Release();
}
}
else
{
return _nextPublishSeqNo;
}
}

public abstract ValueTask BasicAckAsync(ulong deliveryTag, bool multiple,
CancellationToken cancellationToken);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -173,7 +173,7 @@ void CleanOutstandingConfirms(ulong deliveryTag, bool multiple)
{
string msg = i.ToString();
byte[] body = Encoding.UTF8.GetBytes(msg);
ulong nextPublishSeqNo = channel.NextPublishSeqNo;
ulong nextPublishSeqNo = await channel.GetNextPublishSequenceNumberAsync();
if ((ulong)(i + 1) != nextPublishSeqNo)
{
Console.WriteLine($"{DateTime.Now} [WARNING] i {i + 1} does not equal next sequence number: {nextPublishSeqNo}");
Expand Down
18 changes: 9 additions & 9 deletions projects/Test/Integration/TestConfirmSelect.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,19 +53,19 @@ ValueTask PublishAsync()
}

await _channel.ConfirmSelectAsync();
Assert.Equal(1ul, _channel.NextPublishSeqNo);
Assert.Equal(1ul, await _channel.GetNextPublishSequenceNumberAsync());
await PublishAsync();
Assert.Equal(2ul, _channel.NextPublishSeqNo);
Assert.Equal(2ul, await _channel.GetNextPublishSequenceNumberAsync());
await PublishAsync();
Assert.Equal(3ul, _channel.NextPublishSeqNo);
Assert.Equal(3ul, await _channel.GetNextPublishSequenceNumberAsync());

await _channel.ConfirmSelectAsync();
await PublishAsync();
Assert.Equal(4ul, _channel.NextPublishSeqNo);
Assert.Equal(4ul, await _channel.GetNextPublishSequenceNumberAsync());
await PublishAsync();
Assert.Equal(5ul, _channel.NextPublishSeqNo);
Assert.Equal(5ul, await _channel.GetNextPublishSequenceNumberAsync());
await PublishAsync();
Assert.Equal(6ul, _channel.NextPublishSeqNo);
Assert.Equal(6ul, await _channel.GetNextPublishSequenceNumberAsync());
}

[Theory]
Expand All @@ -80,7 +80,7 @@ public async Task TestDeliveryTagDiverged_GH1043(ushort correlationIdLength)
await _channel.ConfirmSelectAsync();

var properties = new BasicProperties();
// _output.WriteLine("Client delivery tag {0}", _channel.NextPublishSeqNo);
// _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync());
await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty,
mandatory: false, basicProperties: properties, body: body);
await _channel.WaitForConfirmsOrDieAsync();
Expand All @@ -91,7 +91,7 @@ await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty,
{
CorrelationId = new string('o', correlationIdLength)
};
// _output.WriteLine("Client delivery tag {0}", _channel.NextPublishSeqNo);
// _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync());
await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body);
await _channel.WaitForConfirmsOrDieAsync();
}
Expand All @@ -101,7 +101,7 @@ await _channel.BasicPublishAsync(exchange: "sample", routingKey: string.Empty,
}

properties = new BasicProperties();
// _output.WriteLine("Client delivery tag {0}", _channel.NextPublishSeqNo);
// _output.WriteLine("Client delivery tag {0}", await _channel.GetNextPublishSequenceNumberAsync());
await _channel.BasicPublishAsync("sample", string.Empty, false, properties, body);
await _channel.WaitForConfirmsOrDieAsync();
// _output.WriteLine("I'm done...");
Expand Down
12 changes: 6 additions & 6 deletions projects/Test/Integration/TestConfirmSelectAsync.cs
Original file line number Diff line number Diff line change
Expand Up @@ -49,19 +49,19 @@ public TestConfirmSelectAsync(ITestOutputHelper output) : base(output)
public async Task TestConfirmSelectIdempotency()
{
await _channel.ConfirmSelectAsync();
Assert.Equal(1ul, _channel.NextPublishSeqNo);
Assert.Equal(1ul, await _channel.GetNextPublishSequenceNumberAsync());
await Publish();
Assert.Equal(2ul, _channel.NextPublishSeqNo);
Assert.Equal(2ul, await _channel.GetNextPublishSequenceNumberAsync());
await Publish();
Assert.Equal(3ul, _channel.NextPublishSeqNo);
Assert.Equal(3ul, await _channel.GetNextPublishSequenceNumberAsync());

await _channel.ConfirmSelectAsync();
await Publish();
Assert.Equal(4ul, _channel.NextPublishSeqNo);
Assert.Equal(4ul, await _channel.GetNextPublishSequenceNumberAsync());
await Publish();
Assert.Equal(5ul, _channel.NextPublishSeqNo);
Assert.Equal(5ul, await _channel.GetNextPublishSequenceNumberAsync());
await Publish();
Assert.Equal(6ul, _channel.NextPublishSeqNo);
Assert.Equal(6ul, await _channel.GetNextPublishSequenceNumberAsync());
}

private ValueTask Publish()
Expand Down

0 comments on commit f036020

Please sign in to comment.