Skip to content

Commit

Permalink
fix: handle error when open streams (#736)
Browse files Browse the repository at this point in the history
Signed-off-by: Li Zhanhui <[email protected]>
  • Loading branch information
lizhanhui authored Nov 27, 2023
1 parent 3524f0f commit 86ed529
Show file tree
Hide file tree
Showing 3 changed files with 44 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -112,22 +112,24 @@ public CompletableFuture<Long> createGroup(CreateGroupRequest request) {
groupCache.apply(List.of(group));
future.complete(group.getId());
}
return future;
} else {
Optional<String> leaderAddress = metadataStore.electionService().leaderAddress();
if (leaderAddress.isEmpty()) {
return CompletableFuture.failedFuture(new ControllerException(Code.NO_LEADER_VALUE, "No leader is elected yet"));
}
metadataStore.controllerClient().createGroup(leaderAddress.get(), request).whenComplete((reply, e) -> {
if (null != e) {
future.completeExceptionally(e);
} else {
future.complete(reply.getGroupId());
}
});
return metadataStore.controllerClient()
.createGroup(leaderAddress.get(), request)
.thenApply(reply -> {
if (reply.getStatus().getCode() == Code.OK) {
return reply.getGroupId();
} else {
throw new CompletionException(new ControllerException(reply.getStatus().getCode().getNumber(),
reply.getStatus().getMessage()));
}
});
}
break;
}
return future;
}

private void completeDescription(@Nonnull ConsumerGroup group) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,9 +22,7 @@
import apache.rocketmq.controller.v1.DescribeStreamReply;
import apache.rocketmq.controller.v1.DescribeStreamRequest;
import apache.rocketmq.controller.v1.GroupStatus;
import apache.rocketmq.controller.v1.ListOpenStreamsReply;
import apache.rocketmq.controller.v1.ListOpenStreamsRequest;
import apache.rocketmq.controller.v1.OpenStreamReply;
import apache.rocketmq.controller.v1.OpenStreamRequest;
import apache.rocketmq.controller.v1.Status;
import apache.rocketmq.controller.v1.StreamMetadata;
Expand Down Expand Up @@ -333,9 +331,17 @@ public CompletableFuture<StreamMetadata> openStream(long streamId, long epoch, i
OpenStreamRequest request = OpenStreamRequest.newBuilder()
.setStreamId(streamId)
.setStreamEpoch(epoch)
.setBrokerId(nodeId)
.build();
return metadataStore.controllerClient().openStream(leaderAddress.get(), request)
.thenApply(OpenStreamReply::getStreamMetadata);
return metadataStore.controllerClient()
.openStream(leaderAddress.get(), request)
.thenApply(reply -> {
if (reply.getStatus().getCode() == Code.OK) {
return reply.getStreamMetadata();
}
throw new CompletionException(new ControllerException(reply.getStatus().getCode().getNumber(),
reply.getStatus().getMessage()));
});
}
}
}
Expand Down Expand Up @@ -412,14 +418,15 @@ public CompletableFuture<Void> closeStream(long streamId, long streamEpoch, int
.setStreamId(streamId)
.setStreamEpoch(streamEpoch)
.build();
metadataStore.controllerClient().closeStream(leaderAddress.get(), request).whenComplete(((reply, e) -> {
if (null != e) {
future.completeExceptionally(e);
} else {
future.complete(null);
}
}));
break;
return metadataStore.controllerClient()
.closeStream(leaderAddress.get(), request)
.thenApply(reply -> {
if (reply.getStatus().getCode() == Code.OK) {
return null;
}
throw new CompletionException(new ControllerException(reply.getStatus().getCode().getNumber(),
reply.getStatus().getMessage()));
});
}
}
return future;
Expand Down Expand Up @@ -465,8 +472,15 @@ public CompletableFuture<List<StreamMetadata>> listOpenStreams(int nodeId) {
ListOpenStreamsRequest request = ListOpenStreamsRequest.newBuilder()
.setBrokerId(nodeId)
.build();
return metadataStore.controllerClient().listOpenStreams(leaderAddress.get(), request)
.thenApply((ListOpenStreamsReply::getStreamMetadataList));
return metadataStore.controllerClient()
.listOpenStreams(leaderAddress.get(), request)
.thenApply(reply -> {
if (reply.getStatus().getCode() == Code.OK) {
return reply.getStreamMetadataList();
}
throw new CompletionException(new ControllerException(reply.getStatus().getCode().getNumber(),
reply.getStatus().getMessage()));
});
}
}
return future;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,9 +33,13 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.tuple.Pair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class DefaultStoreMetadataService implements StoreMetadataService {

private static final Logger LOGGER = LoggerFactory.getLogger(DefaultStoreMetadataService.class);

private final MetadataStore metadataStore;

private final S3MetadataService s3MetadataService;
Expand Down Expand Up @@ -133,6 +137,7 @@ public CompletableFuture<StreamMetadata> openStream(long streamId, long streamEp
.thenApply(res -> {
loop.set(false);
s3MetadataService.onStreamOpen(streamId);
LOGGER.info("Open Stream[stream-id={}, epoch={}] returns metadata: {}", streamId, streamEpoch, res);
return res;
}), MoreExecutors.directExecutor());
}
Expand Down

0 comments on commit 86ed529

Please sign in to comment.