Skip to content

Commit

Permalink
feat: expose create topic API to ProxyMetadataService (#503)
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui authored Oct 30, 2023
1 parent 1f7e671 commit 8ebaf69
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package com.automq.rocketmq.metadata;

import apache.rocketmq.controller.v1.ConsumerGroup;
import apache.rocketmq.controller.v1.CreateTopicRequest;
import apache.rocketmq.controller.v1.MessageQueueAssignment;
import apache.rocketmq.controller.v1.Topic;
import com.automq.rocketmq.controller.metadata.MetadataStore;
Expand All @@ -38,6 +39,12 @@ public DefaultProxyMetadataService(MetadataStore metadataStore) {
this.metadataStore = metadataStore;
}

@Override
public CompletableFuture<Topic> createTopic(CreateTopicRequest request) {
return metadataStore.createTopic(request)
.thenComposeAsync(this::topicOf);
}

@Override
public CompletableFuture<Topic> topicOf(String topicName) {
return topicOf(null, topicName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,12 +18,16 @@
package com.automq.rocketmq.metadata.api;

import apache.rocketmq.controller.v1.ConsumerGroup;
import apache.rocketmq.controller.v1.CreateTopicRequest;
import apache.rocketmq.controller.v1.MessageQueueAssignment;
import apache.rocketmq.controller.v1.Topic;
import java.util.List;
import java.util.concurrent.CompletableFuture;

public interface ProxyMetadataService {

CompletableFuture<Topic> createTopic(CreateTopicRequest request);

/**
* Query the topic metadata of a given topic name.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import apache.rocketmq.controller.v1.AcceptTypes;
import apache.rocketmq.controller.v1.ConsumerGroup;
import apache.rocketmq.controller.v1.CreateTopicRequest;
import apache.rocketmq.controller.v1.MessageQueue;
import apache.rocketmq.controller.v1.MessageQueueAssignment;
import apache.rocketmq.controller.v1.MessageType;
Expand All @@ -32,6 +33,11 @@
public class MockProxyMetadataService implements ProxyMetadataService {
Map<Long, Long> offsetMap = new HashMap<>();

@Override
public CompletableFuture<Topic> createTopic(CreateTopicRequest request) {
return topicOf(request.getTopic());
}

@Override
public CompletableFuture<Topic> topicOf(String topicName) {
// Return a dummy topic
Expand Down

0 comments on commit 8ebaf69

Please sign in to comment.