From fcbec7c09a48c09889dcfc49af3216cefd1bb790 Mon Sep 17 00:00:00 2001 From: "tsaitsung-han.tht" Date: Wed, 8 Jan 2025 12:30:45 +0800 Subject: [PATCH 1/2] Add recalling API for C# client --- .../rocketmq-client-csharp/ClientManager.cs | 9 ++++ .../rocketmq-client-csharp/IClientManager.cs | 10 +++++ .../rocketmq-client-csharp/IRecallReceipt.cs | 24 +++++++++++ csharp/rocketmq-client-csharp/IRpcClient.cs | 2 + csharp/rocketmq-client-csharp/ISendReceipt.cs | 1 + csharp/rocketmq-client-csharp/Producer.cs | 27 ++++++++++++ .../rocketmq-client-csharp/RecallReceipt.cs | 34 +++++++++++++++ csharp/rocketmq-client-csharp/RpcClient.cs | 9 ++++ csharp/rocketmq-client-csharp/SendReceipt.cs | 12 ++++-- csharp/tests/ClientManagerTest.cs | 9 ++++ csharp/tests/ProducerTest.cs | 42 +++++++++++++++++++ 11 files changed, 176 insertions(+), 3 deletions(-) create mode 100644 csharp/rocketmq-client-csharp/IRecallReceipt.cs create mode 100644 csharp/rocketmq-client-csharp/RecallReceipt.cs diff --git a/csharp/rocketmq-client-csharp/ClientManager.cs b/csharp/rocketmq-client-csharp/ClientManager.cs index e42a29da2..b061b5c9c 100644 --- a/csharp/rocketmq-client-csharp/ClientManager.cs +++ b/csharp/rocketmq-client-csharp/ClientManager.cs @@ -120,6 +120,15 @@ public async Task Shutdown() request, response, metadata); } + public async Task> + RecallMessage(Endpoints endpoints, Proto.RecallMessageRequest request, TimeSpan timeout) + { + var metadata = _client.Sign(); + var response = await GetRpcClient(endpoints).RecallMessage(metadata, request, timeout); + return new RpcInvocation( + request, response, metadata); + } + public async Task> SendMessage( Endpoints endpoints, Proto::SendMessageRequest request, TimeSpan timeout) { diff --git a/csharp/rocketmq-client-csharp/IClientManager.cs b/csharp/rocketmq-client-csharp/IClientManager.cs index 743df9fe4..ac9108dc7 100644 --- a/csharp/rocketmq-client-csharp/IClientManager.cs +++ b/csharp/rocketmq-client-csharp/IClientManager.cs @@ -61,6 +61,16 @@ Task> Heartbeat(Endpoints end /// Task of response. Task> NotifyClientTermination( Endpoints endpoints, NotifyClientTerminationRequest request, TimeSpan timeout); + + /// + /// Recall messages. + /// + /// The target endpoints. + /// gRPC request of recalling messages. + /// Request max duration. + /// Task of response. + Task> RecallMessage( + Endpoints endpoints, RecallMessageRequest request, TimeSpan timeout); /// /// Send message to remote endpoints. diff --git a/csharp/rocketmq-client-csharp/IRecallReceipt.cs b/csharp/rocketmq-client-csharp/IRecallReceipt.cs new file mode 100644 index 000000000..8291cd660 --- /dev/null +++ b/csharp/rocketmq-client-csharp/IRecallReceipt.cs @@ -0,0 +1,24 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Org.Apache.Rocketmq +{ + public interface IRecallReceipt + { + string MessageId { get; } + } +} \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/IRpcClient.cs b/csharp/rocketmq-client-csharp/IRpcClient.cs index 8145ea187..eb369c2da 100644 --- a/csharp/rocketmq-client-csharp/IRpcClient.cs +++ b/csharp/rocketmq-client-csharp/IRpcClient.cs @@ -52,6 +52,8 @@ Task ForwardMessageToDeadLetterQueue(Me Task NotifyClientTermination(Metadata metadata, NotifyClientTerminationRequest request, TimeSpan timeout); + Task RecallMessage(Metadata metadata, RecallMessageRequest request, TimeSpan timeout); + Task Shutdown(); } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/ISendReceipt.cs b/csharp/rocketmq-client-csharp/ISendReceipt.cs index f1004b5b9..eeba4e036 100644 --- a/csharp/rocketmq-client-csharp/ISendReceipt.cs +++ b/csharp/rocketmq-client-csharp/ISendReceipt.cs @@ -20,5 +20,6 @@ namespace Org.Apache.Rocketmq public interface ISendReceipt { string MessageId { get; } + string RecallHandle { get; } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs index 24f1a0ac1..4f92303d0 100644 --- a/csharp/rocketmq-client-csharp/Producer.cs +++ b/csharp/rocketmq-client-csharp/Producer.cs @@ -350,6 +350,33 @@ internal async Task EndTransaction(Endpoints endpoints, string topic, string mes StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId); } + public async Task RecallMessage(string topic, string recallhandle) + { + var recallReceipt = await RecallMessage0(topic, recallhandle); + return recallReceipt; + } + + private async Task RecallMessage0(string topic, string recallhandle) + { + if (State.Running != State) + { + throw new InvalidOperationException("Producer is not running"); + } + if (recallhandle == null) + { + throw new InvalidOperationException("Recall handle is invalid"); + } + var request = new Proto.RecallMessageRequest + { + Topic = new Proto.Resource { ResourceNamespace = ClientConfig.Namespace, Name = topic }, + RecallHandle = recallhandle + }; + var invocation = + await ClientManager.RecallMessage(new Endpoints(ClientConfig.Endpoints), request, ClientConfig.RequestTimeout); + StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId); + return new RecallReceipt(invocation.Response.MessageId); + } + public class Builder { private ClientConfig _clientConfig; diff --git a/csharp/rocketmq-client-csharp/RecallReceipt.cs b/csharp/rocketmq-client-csharp/RecallReceipt.cs new file mode 100644 index 000000000..80cf120ce --- /dev/null +++ b/csharp/rocketmq-client-csharp/RecallReceipt.cs @@ -0,0 +1,34 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +namespace Org.Apache.Rocketmq +{ + public sealed class RecallReceipt : IRecallReceipt + { + public RecallReceipt(string messageId) + { + MessageId = messageId; + } + + public string MessageId { get; } + + public override string ToString() + { + return $"{nameof(MessageId)}: {MessageId}"; + } + } +} \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/RpcClient.cs b/csharp/rocketmq-client-csharp/RpcClient.cs index eeff96e5c..346ead282 100644 --- a/csharp/rocketmq-client-csharp/RpcClient.cs +++ b/csharp/rocketmq-client-csharp/RpcClient.cs @@ -189,5 +189,14 @@ internal static HttpMessageHandler CreateHttpHandler() var call = _stub.NotifyClientTerminationAsync(request, callOptions); return await call.ResponseAsync; } + + public async Task RecallMessage(Metadata metadata, Proto.RecallMessageRequest request, TimeSpan timeout) + { + var deadline = DateTime.UtcNow.Add(timeout); + var callOptions = new CallOptions(metadata, deadline); + + var call = _stub.RecallMessageAsync(request, callOptions); + return await call.ResponseAsync; + } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/SendReceipt.cs b/csharp/rocketmq-client-csharp/SendReceipt.cs index c9fe80149..06567b494 100644 --- a/csharp/rocketmq-client-csharp/SendReceipt.cs +++ b/csharp/rocketmq-client-csharp/SendReceipt.cs @@ -23,14 +23,20 @@ namespace Org.Apache.Rocketmq { public sealed class SendReceipt : ISendReceipt { - private SendReceipt(string messageId, string transactionId, MessageQueue messageQueue) + private SendReceipt(string messageId, string transactionId, MessageQueue messageQueue, long offset, string recallHandle) { MessageId = messageId; TransactionId = transactionId; MessageQueue = messageQueue; + Offset = offset; + RecallHandle = recallHandle; } public string MessageId { get; } + + public string RecallHandle { get; } + + public long Offset { get; } public string TransactionId { get; } @@ -40,7 +46,7 @@ private SendReceipt(string messageId, string transactionId, MessageQueue message public override string ToString() { - return $"{nameof(MessageId)}: {MessageId}"; + return $"{nameof(MessageId)}: {MessageId}, {nameof(RecallHandle)}: {RecallHandle}"; } public static IEnumerable ProcessSendMessageResponse(MessageQueue mq, @@ -58,7 +64,7 @@ public static IEnumerable ProcessSendMessageResponse(MessageQueue m // May throw exception. StatusChecker.Check(status, invocation.Request, invocation.RequestId); - return invocation.Response.Entries.Select(entry => new SendReceipt(entry.MessageId, entry.TransactionId, mq)).ToList(); + return invocation.Response.Entries.Select(entry => new SendReceipt(entry.MessageId, entry.TransactionId, mq, entry.Offset, entry.RecallHandle)).ToList(); } } } \ No newline at end of file diff --git a/csharp/tests/ClientManagerTest.cs b/csharp/tests/ClientManagerTest.cs index 5e4e7eef6..e1b07fb75 100644 --- a/csharp/tests/ClientManagerTest.cs +++ b/csharp/tests/ClientManagerTest.cs @@ -120,6 +120,15 @@ public void TestNotifyClientTermination() _clientManager.NotifyClientTermination(null, request, TimeSpan.FromSeconds(1)); // Expect no exception thrown. } + + [TestMethod] + public void TestRecallMessage() + { + var request = new RecallMessageRequest(); + _clientManager.RecallMessage(FakeEndpoints, request, TimeSpan.FromSeconds(1)); + _clientManager.RecallMessage(null, request, TimeSpan.FromSeconds(1)); + // Expect no exception thrown. + } private Client CreateTestClient() { diff --git a/csharp/tests/ProducerTest.cs b/csharp/tests/ProducerTest.cs index ce0cca15e..bc38208ed 100644 --- a/csharp/tests/ProducerTest.cs +++ b/csharp/tests/ProducerTest.cs @@ -95,6 +95,48 @@ public async Task TestSendFailureWithTopic() mockClientManager.Verify(cm => cm.SendMessage(It.IsAny(), It.IsAny(), It.IsAny()), Times.Exactly(maxAttempts)); } + + [TestMethod] + public async Task TestRecall() + { + var producer = CreateTestClient(); + producer.State = State.Running; + var metadata = producer.Sign(); + var recallReceipt = new RecallReceipt(MessageIdGenerator.GetInstance().Next()); + var recallMessageResponse = new Proto.RecallMessageResponse + { + Status = new Proto.Status + { + Code = Proto.Code.Ok + }, + MessageId = recallReceipt.MessageId + }; + var recallMessageInvocation = new RpcInvocation(null, + recallMessageResponse, metadata); + var mockClientManager = new Mock(); + producer.SetClientManager(mockClientManager.Object); + mockClientManager.Setup(cm => cm.RecallMessage(It.IsAny(), + It.IsAny(), It.IsAny())).Returns(Task.FromResult(recallMessageInvocation)); + await producer.RecallMessage("testTopic", "handle"); + mockClientManager.Verify(cm => cm.RecallMessage(It.IsAny(), + It.IsAny(), It.IsAny()), Times.Once); + } + + [TestMethod] + [ExpectedException(typeof(ArgumentException))] + public async Task TestRecallFailure() + { + var producer = CreateTestClient(); + producer.State = State.Running; + var mockClientManager = new Mock(); + producer.SetClientManager(mockClientManager.Object); + var exception = new ArgumentException(); + mockClientManager.Setup(cm => cm.RecallMessage(It.IsAny(), + It.IsAny(), It.IsAny())).Throws(exception); + await producer.RecallMessage("testTopic", "handle"); + mockClientManager.Verify(cm => cm.RecallMessage(It.IsAny(), + It.IsAny(), It.IsAny()), Times.Once); + } private Producer CreateTestClient() { From 50b664884078ee4b451d423c66ae2982a0172980 Mon Sep 17 00:00:00 2001 From: "tsaitsung-han.tht" Date: Mon, 20 Jan 2025 18:03:03 +0800 Subject: [PATCH 2/2] Update code --- csharp/rocketmq-client-csharp/ISendReceipt.cs | 1 - csharp/rocketmq-client-csharp/Producer.cs | 27 ------------ csharp/rocketmq-client-csharp/SendReceipt.cs | 12 ++---- csharp/tests/ClientManagerTest.cs | 9 ---- csharp/tests/ProducerTest.cs | 42 ------------------- 5 files changed, 3 insertions(+), 88 deletions(-) diff --git a/csharp/rocketmq-client-csharp/ISendReceipt.cs b/csharp/rocketmq-client-csharp/ISendReceipt.cs index eeba4e036..f1004b5b9 100644 --- a/csharp/rocketmq-client-csharp/ISendReceipt.cs +++ b/csharp/rocketmq-client-csharp/ISendReceipt.cs @@ -20,6 +20,5 @@ namespace Org.Apache.Rocketmq public interface ISendReceipt { string MessageId { get; } - string RecallHandle { get; } } } \ No newline at end of file diff --git a/csharp/rocketmq-client-csharp/Producer.cs b/csharp/rocketmq-client-csharp/Producer.cs index 4f92303d0..24f1a0ac1 100644 --- a/csharp/rocketmq-client-csharp/Producer.cs +++ b/csharp/rocketmq-client-csharp/Producer.cs @@ -350,33 +350,6 @@ internal async Task EndTransaction(Endpoints endpoints, string topic, string mes StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId); } - public async Task RecallMessage(string topic, string recallhandle) - { - var recallReceipt = await RecallMessage0(topic, recallhandle); - return recallReceipt; - } - - private async Task RecallMessage0(string topic, string recallhandle) - { - if (State.Running != State) - { - throw new InvalidOperationException("Producer is not running"); - } - if (recallhandle == null) - { - throw new InvalidOperationException("Recall handle is invalid"); - } - var request = new Proto.RecallMessageRequest - { - Topic = new Proto.Resource { ResourceNamespace = ClientConfig.Namespace, Name = topic }, - RecallHandle = recallhandle - }; - var invocation = - await ClientManager.RecallMessage(new Endpoints(ClientConfig.Endpoints), request, ClientConfig.RequestTimeout); - StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId); - return new RecallReceipt(invocation.Response.MessageId); - } - public class Builder { private ClientConfig _clientConfig; diff --git a/csharp/rocketmq-client-csharp/SendReceipt.cs b/csharp/rocketmq-client-csharp/SendReceipt.cs index 06567b494..3b1775805 100644 --- a/csharp/rocketmq-client-csharp/SendReceipt.cs +++ b/csharp/rocketmq-client-csharp/SendReceipt.cs @@ -23,21 +23,15 @@ namespace Org.Apache.Rocketmq { public sealed class SendReceipt : ISendReceipt { - private SendReceipt(string messageId, string transactionId, MessageQueue messageQueue, long offset, string recallHandle) + private SendReceipt(string messageId, string transactionId, MessageQueue messageQueue) { MessageId = messageId; TransactionId = transactionId; MessageQueue = messageQueue; - Offset = offset; - RecallHandle = recallHandle; } public string MessageId { get; } - public string RecallHandle { get; } - - public long Offset { get; } - public string TransactionId { get; } private MessageQueue MessageQueue { get; } @@ -46,7 +40,7 @@ private SendReceipt(string messageId, string transactionId, MessageQueue message public override string ToString() { - return $"{nameof(MessageId)}: {MessageId}, {nameof(RecallHandle)}: {RecallHandle}"; + return $"{nameof(MessageId)}: {MessageId}"; } public static IEnumerable ProcessSendMessageResponse(MessageQueue mq, @@ -64,7 +58,7 @@ public static IEnumerable ProcessSendMessageResponse(MessageQueue m // May throw exception. StatusChecker.Check(status, invocation.Request, invocation.RequestId); - return invocation.Response.Entries.Select(entry => new SendReceipt(entry.MessageId, entry.TransactionId, mq, entry.Offset, entry.RecallHandle)).ToList(); + return invocation.Response.Entries.Select(entry => new SendReceipt(entry.MessageId, entry.TransactionId, mq)).ToList(); } } } \ No newline at end of file diff --git a/csharp/tests/ClientManagerTest.cs b/csharp/tests/ClientManagerTest.cs index e1b07fb75..5e4e7eef6 100644 --- a/csharp/tests/ClientManagerTest.cs +++ b/csharp/tests/ClientManagerTest.cs @@ -120,15 +120,6 @@ public void TestNotifyClientTermination() _clientManager.NotifyClientTermination(null, request, TimeSpan.FromSeconds(1)); // Expect no exception thrown. } - - [TestMethod] - public void TestRecallMessage() - { - var request = new RecallMessageRequest(); - _clientManager.RecallMessage(FakeEndpoints, request, TimeSpan.FromSeconds(1)); - _clientManager.RecallMessage(null, request, TimeSpan.FromSeconds(1)); - // Expect no exception thrown. - } private Client CreateTestClient() { diff --git a/csharp/tests/ProducerTest.cs b/csharp/tests/ProducerTest.cs index bc38208ed..ce0cca15e 100644 --- a/csharp/tests/ProducerTest.cs +++ b/csharp/tests/ProducerTest.cs @@ -95,48 +95,6 @@ public async Task TestSendFailureWithTopic() mockClientManager.Verify(cm => cm.SendMessage(It.IsAny(), It.IsAny(), It.IsAny()), Times.Exactly(maxAttempts)); } - - [TestMethod] - public async Task TestRecall() - { - var producer = CreateTestClient(); - producer.State = State.Running; - var metadata = producer.Sign(); - var recallReceipt = new RecallReceipt(MessageIdGenerator.GetInstance().Next()); - var recallMessageResponse = new Proto.RecallMessageResponse - { - Status = new Proto.Status - { - Code = Proto.Code.Ok - }, - MessageId = recallReceipt.MessageId - }; - var recallMessageInvocation = new RpcInvocation(null, - recallMessageResponse, metadata); - var mockClientManager = new Mock(); - producer.SetClientManager(mockClientManager.Object); - mockClientManager.Setup(cm => cm.RecallMessage(It.IsAny(), - It.IsAny(), It.IsAny())).Returns(Task.FromResult(recallMessageInvocation)); - await producer.RecallMessage("testTopic", "handle"); - mockClientManager.Verify(cm => cm.RecallMessage(It.IsAny(), - It.IsAny(), It.IsAny()), Times.Once); - } - - [TestMethod] - [ExpectedException(typeof(ArgumentException))] - public async Task TestRecallFailure() - { - var producer = CreateTestClient(); - producer.State = State.Running; - var mockClientManager = new Mock(); - producer.SetClientManager(mockClientManager.Object); - var exception = new ArgumentException(); - mockClientManager.Setup(cm => cm.RecallMessage(It.IsAny(), - It.IsAny(), It.IsAny())).Throws(exception); - await producer.RecallMessage("testTopic", "handle"); - mockClientManager.Verify(cm => cm.RecallMessage(It.IsAny(), - It.IsAny(), It.IsAny()), Times.Once); - } private Producer CreateTestClient() {