Skip to content

Commit

Permalink
Update code
Browse files Browse the repository at this point in the history
  • Loading branch information
tsaitsung-han.tht committed Jan 20, 2025
1 parent fcbec7c commit 50b6648
Show file tree
Hide file tree
Showing 5 changed files with 3 additions and 88 deletions.
1 change: 0 additions & 1 deletion csharp/rocketmq-client-csharp/ISendReceipt.cs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,5 @@ namespace Org.Apache.Rocketmq
public interface ISendReceipt
{
string MessageId { get; }
string RecallHandle { get; }
}
}
27 changes: 0 additions & 27 deletions csharp/rocketmq-client-csharp/Producer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -350,33 +350,6 @@ internal async Task EndTransaction(Endpoints endpoints, string topic, string mes
StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId);
}

public async Task<IRecallReceipt> RecallMessage(string topic, string recallhandle)
{
var recallReceipt = await RecallMessage0(topic, recallhandle);
return recallReceipt;
}

private async Task<RecallReceipt> RecallMessage0(string topic, string recallhandle)
{
if (State.Running != State)
{
throw new InvalidOperationException("Producer is not running");
}
if (recallhandle == null)
{
throw new InvalidOperationException("Recall handle is invalid");
}
var request = new Proto.RecallMessageRequest
{
Topic = new Proto.Resource { ResourceNamespace = ClientConfig.Namespace, Name = topic },
RecallHandle = recallhandle
};
var invocation =
await ClientManager.RecallMessage(new Endpoints(ClientConfig.Endpoints), request, ClientConfig.RequestTimeout);
StatusChecker.Check(invocation.Response.Status, request, invocation.RequestId);
return new RecallReceipt(invocation.Response.MessageId);
}

public class Builder
{
private ClientConfig _clientConfig;
Expand Down
12 changes: 3 additions & 9 deletions csharp/rocketmq-client-csharp/SendReceipt.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,21 +23,15 @@ namespace Org.Apache.Rocketmq
{
public sealed class SendReceipt : ISendReceipt
{
private SendReceipt(string messageId, string transactionId, MessageQueue messageQueue, long offset, string recallHandle)
private SendReceipt(string messageId, string transactionId, MessageQueue messageQueue)
{
MessageId = messageId;
TransactionId = transactionId;
MessageQueue = messageQueue;
Offset = offset;
RecallHandle = recallHandle;
}

public string MessageId { get; }

public string RecallHandle { get; }

public long Offset { get; }

public string TransactionId { get; }

private MessageQueue MessageQueue { get; }
Expand All @@ -46,7 +40,7 @@ private SendReceipt(string messageId, string transactionId, MessageQueue message

public override string ToString()
{
return $"{nameof(MessageId)}: {MessageId}, {nameof(RecallHandle)}: {RecallHandle}";
return $"{nameof(MessageId)}: {MessageId}";
}

public static IEnumerable<SendReceipt> ProcessSendMessageResponse(MessageQueue mq,
Expand All @@ -64,7 +58,7 @@ public static IEnumerable<SendReceipt> ProcessSendMessageResponse(MessageQueue m

// May throw exception.
StatusChecker.Check(status, invocation.Request, invocation.RequestId);
return invocation.Response.Entries.Select(entry => new SendReceipt(entry.MessageId, entry.TransactionId, mq, entry.Offset, entry.RecallHandle)).ToList();
return invocation.Response.Entries.Select(entry => new SendReceipt(entry.MessageId, entry.TransactionId, mq)).ToList();
}
}
}
9 changes: 0 additions & 9 deletions csharp/tests/ClientManagerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -120,15 +120,6 @@ public void TestNotifyClientTermination()
_clientManager.NotifyClientTermination(null, request, TimeSpan.FromSeconds(1));
// Expect no exception thrown.
}

[TestMethod]
public void TestRecallMessage()
{
var request = new RecallMessageRequest();
_clientManager.RecallMessage(FakeEndpoints, request, TimeSpan.FromSeconds(1));
_clientManager.RecallMessage(null, request, TimeSpan.FromSeconds(1));
// Expect no exception thrown.
}

private Client CreateTestClient()
{
Expand Down
42 changes: 0 additions & 42 deletions csharp/tests/ProducerTest.cs
Original file line number Diff line number Diff line change
Expand Up @@ -95,48 +95,6 @@ public async Task TestSendFailureWithTopic()
mockClientManager.Verify(cm => cm.SendMessage(It.IsAny<Endpoints>(),
It.IsAny<Proto.SendMessageRequest>(), It.IsAny<TimeSpan>()), Times.Exactly(maxAttempts));
}

[TestMethod]
public async Task TestRecall()
{
var producer = CreateTestClient();
producer.State = State.Running;
var metadata = producer.Sign();
var recallReceipt = new RecallReceipt(MessageIdGenerator.GetInstance().Next());
var recallMessageResponse = new Proto.RecallMessageResponse
{
Status = new Proto.Status
{
Code = Proto.Code.Ok
},
MessageId = recallReceipt.MessageId
};
var recallMessageInvocation = new RpcInvocation<Proto.RecallMessageRequest, Proto.RecallMessageResponse>(null,
recallMessageResponse, metadata);
var mockClientManager = new Mock<IClientManager>();
producer.SetClientManager(mockClientManager.Object);
mockClientManager.Setup(cm => cm.RecallMessage(It.IsAny<Endpoints>(),
It.IsAny<Proto.RecallMessageRequest>(), It.IsAny<TimeSpan>())).Returns(Task.FromResult(recallMessageInvocation));
await producer.RecallMessage("testTopic", "handle");
mockClientManager.Verify(cm => cm.RecallMessage(It.IsAny<Endpoints>(),
It.IsAny<Proto.RecallMessageRequest>(), It.IsAny<TimeSpan>()), Times.Once);
}

[TestMethod]
[ExpectedException(typeof(ArgumentException))]
public async Task TestRecallFailure()
{
var producer = CreateTestClient();
producer.State = State.Running;
var mockClientManager = new Mock<IClientManager>();
producer.SetClientManager(mockClientManager.Object);
var exception = new ArgumentException();
mockClientManager.Setup(cm => cm.RecallMessage(It.IsAny<Endpoints>(),
It.IsAny<Proto.RecallMessageRequest>(), It.IsAny<TimeSpan>())).Throws(exception);
await producer.RecallMessage("testTopic", "handle");
mockClientManager.Verify(cm => cm.RecallMessage(It.IsAny<Endpoints>(),
It.IsAny<Proto.RecallMessageRequest>(), It.IsAny<TimeSpan>()), Times.Once);
}

private Producer CreateTestClient()
{
Expand Down

0 comments on commit 50b6648

Please sign in to comment.