Skip to content

Commit

Permalink
support topic level reserved time for tiered storage
Browse files Browse the repository at this point in the history
  • Loading branch information
yuzhou committed May 12, 2024
1 parent 159a603 commit 1e254ff
Show file tree
Hide file tree
Showing 12 changed files with 274 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,7 @@
import org.apache.rocketmq.remoting.protocol.header.ResumeCheckHalfMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SearchOffsetRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SearchOffsetResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.TieredStoreUpdateTopicMetadataRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.UpdateAclRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.UpdateGlobalWhiteAddrsConfigRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.UpdateGroupForbiddenRequestHeader;
Expand Down Expand Up @@ -212,6 +213,9 @@
import org.apache.rocketmq.store.timer.TimerCheckpoint;
import org.apache.rocketmq.store.timer.TimerMessageStore;
import org.apache.rocketmq.store.util.LibC;
import org.apache.rocketmq.tieredstore.TieredMessageStore;
import org.apache.rocketmq.tieredstore.metadata.MetadataStore;
import org.apache.rocketmq.tieredstore.metadata.entity.TopicMetadata;

import static org.apache.rocketmq.remoting.protocol.RemotingCommand.buildErrorResponse;

Expand Down Expand Up @@ -379,6 +383,8 @@ public RemotingCommand processRequest(ChannelHandlerContext ctx,
return this.getAcl(ctx, request);
case RequestCode.AUTH_LIST_ACL:
return this.listAcl(ctx, request);
case RequestCode.TIERED_STORE_UPDATE_TOPIC_METADATA:
return this.tieredStoreUpdateTopicMetadata(ctx, request);
default:
return getUnknownCmdResponse(ctx, request);
}
Expand Down Expand Up @@ -3161,4 +3167,43 @@ private boolean validateBlackListConfigExist(Properties properties) {
}
return false;
}

private synchronized RemotingCommand tieredStoreUpdateTopicMetadata(ChannelHandlerContext ctx,
RemotingCommand request) throws RemotingCommandException {
final RemotingCommand response = RemotingCommand.createResponseCommand(null);

if (!(brokerController.getMessageStore() instanceof TieredMessageStore)) {
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark("tiered storage is not enabled");
return response;
}

final TieredStoreUpdateTopicMetadataRequestHeader requestHeader = request.decodeCommandCustomHeader(TieredStoreUpdateTopicMetadataRequestHeader.class);

LOGGER.info("Broker receive request to update tiered storage topic metadata={}, caller address={}",
requestHeader.getTopic(), RemotingHelper.parseChannelRemoteAddr(ctx.channel()));

String topic = requestHeader.getTopic();

TieredMessageStore tieredMessageStore = (TieredMessageStore) brokerController.getMessageStore();
MetadataStore metadataStore = tieredMessageStore.getMetadataStore();
TopicMetadata topicMetadata = metadataStore.getTopic(topic);
if (topicMetadata == null) {
response.setCode(ResponseCode.TOPIC_NOT_EXIST);
response.setRemark("topic[" + requestHeader.getTopic() + "] not exist");
return response;
}

try {
topicMetadata.setReserveTime(requestHeader.getReserveTime());
metadataStore.updateTopic(topicMetadata);
response.setCode(ResponseCode.SUCCESS);
} catch (Exception e) {
LOGGER.error("Update topic metadata failed for [{}]", request, e);
response.setCode(ResponseCode.SYSTEM_ERROR);
response.setRemark(e.getMessage());
}

return response;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,7 @@
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.SendMessageRequestHeaderV2;
import org.apache.rocketmq.remoting.protocol.header.SendMessageResponseHeader;
import org.apache.rocketmq.remoting.protocol.header.TieredStoreUpdateTopicMetadataRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.UnlockBatchMqRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.UnregisterClientRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.UpdateAclRequestHeader;
Expand Down Expand Up @@ -3470,4 +3471,21 @@ public List<AclInfo> listAcl(String addr, String subjectFilter, String resourceF
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}

public void tieredStoreUpdateTopicMetadata(String addr, String topic, long reserveTime, long millis) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, InterruptedException, MQBrokerException {
TieredStoreUpdateTopicMetadataRequestHeader requestHeader = new TieredStoreUpdateTopicMetadataRequestHeader();
requestHeader.setTopic(topic);
requestHeader.setReserveTime(reserveTime);
RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.TIERED_STORE_UPDATE_TOPIC_METADATA, requestHeader);
RemotingCommand response = this.remotingClient.invokeSync(addr, request, millis);
assert response != null;
switch (response.getCode()) {
case ResponseCode.SUCCESS: {
return;
}
default:
break;
}
throw new MQBrokerException(response.getCode(), response.getRemark());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -301,4 +301,7 @@ public class RequestCode {
public static final int AUTH_DELETE_ACL = 3008;
public static final int AUTH_GET_ACL = 3009;
public static final int AUTH_LIST_ACL = 3010;

public static final int TIERED_STORE_UPDATE_TOPIC_METADATA = 4000;

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.rocketmq.remoting.protocol.header;

import com.google.common.base.MoreObjects;
import org.apache.rocketmq.common.action.Action;
import org.apache.rocketmq.common.action.RocketMQAction;
import org.apache.rocketmq.common.resource.ResourceType;
import org.apache.rocketmq.common.resource.RocketMQResource;
import org.apache.rocketmq.remoting.annotation.CFNotNull;
import org.apache.rocketmq.remoting.annotation.CFNullable;
import org.apache.rocketmq.remoting.exception.RemotingCommandException;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.rpc.RpcRequestHeader;

@RocketMQAction(value = RequestCode.TIERED_STORE_UPDATE_TOPIC_METADATA, action = Action.UPDATE)
public class TieredStoreUpdateTopicMetadataRequestHeader extends RpcRequestHeader {
@CFNotNull
@RocketMQResource(ResourceType.TOPIC)
private String topic;

@CFNullable
@CFNotNull
private Long reserveTime = -1L;

@Override
public void checkFields() throws RemotingCommandException {
}


public String getTopic() {
return topic;
}

public void setTopic(String topic) {
this.topic = topic;
}

public long getReserveTime() {
return reserveTime;
}

public void setReserveTime(long reserveTime) {
this.reserveTime = reserveTime;
}

@Override
public String toString() {
return MoreObjects.toStringHelper(this)
.add("topic", topic)
.add("reserveTime", reserveTime)
.toString();
}
}
4 changes: 2 additions & 2 deletions tieredstore/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ Tiered storage provides some useful metrics, see [RIP-46](https://github.com/apa

## How to contribute

We need community participation to add more backend service providers for tiered storage. [PosixFileSegment](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/posix/PosixFileSegment.java), the implementation provided by default is just an example. People who want to contribute can follow it to implement their own providers, such as S3FileSegment, OSSFileSegment, and MinIOFileSegment. Here are some guidelines:
We need community participation to add more backend service providers for tiered storage. [PosixFileSegment](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/PosixFileSegment.java), the implementation provided by default is just an example. People who want to contribute can follow it to implement their own providers, such as S3FileSegment, OSSFileSegment, and MinIOFileSegment. Here are some guidelines:

1. Extend [TieredFileSegment](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredFileSegment.java) and implement the methods of [TieredStoreProvider](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/TieredStoreProvider.java) interface.
1. Extend [FileSegment](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegment.java) and implement the methods of [FileSegmentProvider](https://github.com/apache/rocketmq/blob/develop/tieredstore/src/main/java/org/apache/rocketmq/tieredstore/provider/FileSegmentProvider.java) interface.
2. Record metrics where appropriate. See `rocketmq_tiered_store_provider_rpc_latency`, `rocketmq_tiered_store_provider_upload_bytes`, and `rocketmq_tiered_store_provider_download_bytes`
3. No need to maintain your own cache and avoid polluting the page cache. It is already having the read-ahead cache.
Original file line number Diff line number Diff line change
Expand Up @@ -60,9 +60,9 @@ public boolean load() {
this.flatFileConcurrentMap.clear();
this.recover();
this.executor.commonExecutor.scheduleWithFixedDelay(() -> {
long expiredTimeStamp = System.currentTimeMillis() -
TimeUnit.HOURS.toMillis(storeConfig.getTieredStoreFileReservedTime());
for (FlatMessageFile flatFile : deepCopyFlatFileToList()) {
long expiredTimeStamp = System.currentTimeMillis() -
TimeUnit.HOURS.toMillis(flatFile.getFileReservedHours());
flatFile.destroyExpiredFile(expiredTimeStamp);
if (flatFile.consumeQueue.fileSegmentTable.isEmpty()) {
this.destroyFile(flatFile.getMessageQueue());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -399,4 +399,11 @@ public void destroy() {
fileLock.unlock();
}
}

public long getFileReservedHours() {
if (topicMetadata.getReserveTime() > 0) {
return topicMetadata.getReserveTime();
}
return storeConfig.getTieredStoreFileReservedTime();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.function.Supplier;
import org.apache.rocketmq.common.Pair;
import org.apache.rocketmq.common.message.MessageQueue;
Expand Down Expand Up @@ -180,7 +181,7 @@ public static void init(Meter meter, Supplier<AttributesBuilder> attributesBuild
MessageQueue mq = flatFile.getMessageQueue();
long maxOffset = next.getMaxOffsetInQueue(mq.getTopic(), mq.getQueueId());
long maxTimestamp = next.getMessageStoreTimeStamp(mq.getTopic(), mq.getQueueId(), maxOffset - 1);
if (maxTimestamp > 0 && System.currentTimeMillis() - maxTimestamp > (long) storeConfig.getTieredStoreFileReservedTime() * 60 * 60 * 1000) {
if (maxTimestamp > 0 && System.currentTimeMillis() - maxTimestamp > TimeUnit.HOURS.toMillis(flatFile.getFileReservedHours())) {
continue;
}

Expand Down Expand Up @@ -209,7 +210,7 @@ public static void init(Meter meter, Supplier<AttributesBuilder> attributesBuild
MessageQueue mq = flatFile.getMessageQueue();
long maxOffset = next.getMaxOffsetInQueue(mq.getTopic(), mq.getQueueId());
long maxTimestamp = next.getMessageStoreTimeStamp(mq.getTopic(), mq.getQueueId(), maxOffset - 1);
if (maxTimestamp > 0 && System.currentTimeMillis() - maxTimestamp > (long) storeConfig.getTieredStoreFileReservedTime() * 60 * 60 * 1000) {
if (maxTimestamp > 0 && System.currentTimeMillis() - maxTimestamp > TimeUnit.HOURS.toMillis(flatFile.getFileReservedHours())) {
continue;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -935,4 +935,9 @@ public AclInfo getAcl(String brokerAddr, String subject) throws RemotingConnectE
public List<AclInfo> listAcl(String brokerAddr, String subjectFilter, String resourceFilter) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException {
return defaultMQAdminExtImpl.listAcl(brokerAddr, subjectFilter, resourceFilter);
}

@Override
public void tieredStoreUpdateTopicMetadata(String brokerAddr, String topic, long reserveTime) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException {
defaultMQAdminExtImpl.tieredStoreUpdateTopicMetadata(brokerAddr, topic, reserveTime);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -1991,4 +1991,10 @@ public AclInfo getAcl(String brokerAddr, String subject) throws RemotingConnectE
public List<AclInfo> listAcl(String brokerAddr, String subjectFilter, String resourceFilter) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException {
return this.mqClientInstance.getMQClientAPIImpl().listAcl(brokerAddr, subjectFilter, resourceFilter, timeoutMillis);
}

@Override
public void tieredStoreUpdateTopicMetadata(String brokerAddr, String topic, long reserveTime) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException {
this.mqClientInstance.getMQClientAPIImpl().tieredStoreUpdateTopicMetadata(brokerAddr, topic, reserveTime, timeoutMillis);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -503,4 +503,6 @@ String setCommitLogReadAheadMode(final String brokerAddr, String mode)
AclInfo getAcl(String brokerAddr, String subject) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException;

List<AclInfo> listAcl(String brokerAddr, String subjectFilter, String resourceFilter) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException;

void tieredStoreUpdateTopicMetadata(String brokerAddr, String topic, long reservedTime) throws RemotingConnectException, RemotingSendRequestException, RemotingTimeoutException, MQBrokerException, InterruptedException;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.rocketmq.tools.command.tieredstore;

import java.util.Set;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.OptionGroup;
import org.apache.commons.cli.Options;
import org.apache.rocketmq.remoting.RPCHook;
import org.apache.rocketmq.srvutil.ServerUtil;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
import org.apache.rocketmq.tools.command.CommandUtil;
import org.apache.rocketmq.tools.command.SubCommand;
import org.apache.rocketmq.tools.command.SubCommandException;

public class TieredStoreUpdateTopicMetadataCommand implements SubCommand {

@Override
public String commandName() {
return "updateTieredTopic";
}

@Override
public String commandDesc() {
return "Update tiered storage topic metadata.";
}

@Override
public Options buildCommandlineOptions(Options options) {
OptionGroup optionGroup = new OptionGroup();

Option opt = new Option("b", "brokerAddr", true, "create topic to which broker");
optionGroup.addOption(opt);

opt = new Option("c", "clusterName", true, "create topic to which cluster");
optionGroup.addOption(opt);

optionGroup.setRequired(true);
options.addOptionGroup(optionGroup);

opt = new Option("t", "topic", true, "topic name");
opt.setRequired(true);
options.addOption(opt);

opt = new Option("r", "reserveTime", true, "set reserve time in hours");
opt.setRequired(false);
options.addOption(opt);

return options;
}

@Override
public void execute(final CommandLine commandLine, final Options options,
RPCHook rpcHook) throws SubCommandException {
DefaultMQAdminExt defaultMQAdminExt = new DefaultMQAdminExt(rpcHook);
defaultMQAdminExt.setInstanceName(Long.toString(System.currentTimeMillis()));

try {
String topic = commandLine.getOptionValue('t').trim();
long reserveTime = -1;
if (commandLine.hasOption('r')) {
reserveTime = Long.parseLong(commandLine.getOptionValue('r').trim());
}

if (commandLine.hasOption('b')) {
String addr = commandLine.getOptionValue('b').trim();

defaultMQAdminExt.start();
defaultMQAdminExt.tieredStoreUpdateTopicMetadata(addr, topic, reserveTime);

System.out.printf("update tiered topic metadata to %s success.%n", addr);
return;

} else if (commandLine.hasOption('c')) {
String clusterName = commandLine.getOptionValue('c').trim();

defaultMQAdminExt.start();

Set<String> masterSet =
CommandUtil.fetchMasterAddrByClusterName(defaultMQAdminExt, clusterName);
for (String addr : masterSet) {
defaultMQAdminExt.tieredStoreUpdateTopicMetadata(addr, topic, reserveTime);
System.out.printf("update tiered topic metadata to %s success.%n", addr);
}

return;
}

ServerUtil.printCommandLineHelp("mqadmin " + this.commandName(), options);
} catch (Exception e) {
throw new SubCommandException(this.getClass().getSimpleName() + " command failed", e);
} finally {
defaultMQAdminExt.shutdown();
}
}
}

0 comments on commit 1e254ff

Please sign in to comment.