Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(proxy): support transaction message #796

Merged
merged 2 commits into from
Dec 6, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,7 @@
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.common.constant.LoggerName;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.logging.org.slf4j.Logger;
Expand Down Expand Up @@ -117,10 +118,12 @@ public BrokerController(BrokerConfig brokerConfig) throws Exception {
metadataStore.setDataStore(dataStore);

LockService lockService = new LockService(brokerConfig.proxy());
MessageServiceImpl messageServiceImpl = new MessageServiceImpl(brokerConfig.proxy(), messageStore, proxyMetadataService, lockService, dlqService);

ProducerManager producerManager = new ProducerManager();
MessageServiceImpl messageServiceImpl = new MessageServiceImpl(brokerConfig.proxy(), messageStore, proxyMetadataService, lockService, dlqService, producerManager);
this.messageService = messageServiceImpl;
this.extendMessageService = messageServiceImpl;
serviceManager = new DefaultServiceManager(brokerConfig, proxyMetadataService, dlqService, messageService, messageStore);
serviceManager = new DefaultServiceManager(brokerConfig, proxyMetadataService, dlqService, messageService, messageStore, producerManager);

messagingProcessor = ExtendMessagingProcessor.createForS3RocketMQ(serviceManager, brokerConfig.proxy());

Expand Down
2 changes: 2 additions & 0 deletions common/src/main/fbs/flat_message.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -56,4 +56,6 @@ table SystemProperties {
prepared_transaction_mark: bool;
// For a pending transaction message, this field is the first check time. If set to zero, the broker will use the default value
orphaned_transaction_recovery_seconds: long;
// For a pending transaction message, this field records the check times.
orphaned_transaction_check_times:int = -1;
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ public class StoreConfig {

private long transactionTimeoutMillis = 6 * 1000;

private long transactionCheckInterval = 30 * 1000;

private int transactionCheckMaxTimes = 15;

public int maxFetchCount() {
return maxFetchCount;
}
Expand Down Expand Up @@ -93,4 +97,12 @@ public void setWorkingThreadQueueCapacity(int workingThreadQueueCapacity) {
public long transactionTimeoutMillis() {
return transactionTimeoutMillis;
}

public long transactionCheckInterval() {
return transactionCheckInterval;
}

public int transactionCheckMaxTimes() {
return transactionCheckMaxTimes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@

public DefaultServiceManager(BrokerConfig config, ProxyMetadataService proxyMetadataService,
DeadLetterService deadLetterService, MessageService messageService,
MessageStore messageStore) {
MessageStore messageStore, ProducerManager producerManager) {
this.metadataService = proxyMetadataService;
this.deadLetterService = deadLetterService;
this.resourceMetadataService = new ResourceMetadataService(proxyMetadataService);
this.messageService = messageService;
this.topicRouteService = new TopicRouteServiceImpl(config, proxyMetadataService);
this.producerManager = new ProducerManager();
this.producerManager = producerManager;

Check warning on line 55 in proxy/src/main/java/com/automq/rocketmq/proxy/service/DefaultServiceManager.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/DefaultServiceManager.java#L55

Added line #L55 was not covered by tests
this.consumerManager = new ConsumerManager(new ConsumerIdsChangeListenerImpl(), config.proxy().channelExpiredTimeout());
this.proxyRelayService = new ProxyRelayServiceImpl();
this.transactionService = new TransactionServiceImpl();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,14 +38,18 @@
import com.automq.rocketmq.store.api.MessageStore;
import com.automq.rocketmq.store.exception.StoreException;
import com.automq.rocketmq.store.model.StoreContext;
import com.automq.rocketmq.store.model.generated.TimerTag;
import com.automq.rocketmq.store.model.message.Filter;
import com.automq.rocketmq.store.model.message.PutResult;
import com.automq.rocketmq.store.model.message.ResetConsumeOffsetResult;
import com.automq.rocketmq.store.model.message.SQLFilter;
import com.automq.rocketmq.store.model.message.TagFilter;
import com.automq.rocketmq.store.model.transaction.TransactionResolution;
import io.netty.channel.Channel;
import io.opentelemetry.api.trace.SpanKind;
import io.opentelemetry.instrumentation.annotations.SpanAttribute;
import io.opentelemetry.instrumentation.annotations.WithSpan;
import java.nio.ByteBuffer;
import java.time.Duration;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -56,10 +60,12 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.AckStatus;
import org.apache.rocketmq.client.consumer.PopResult;
Expand All @@ -76,17 +82,23 @@
import org.apache.rocketmq.common.message.MessageClientIDSetter;
import org.apache.rocketmq.common.message.MessageConst;
import org.apache.rocketmq.common.message.MessageDecoder;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.common.sysflag.MessageSysFlag;
import org.apache.rocketmq.common.thread.ThreadPoolMonitor;
import org.apache.rocketmq.proxy.common.ProxyContext;
import org.apache.rocketmq.proxy.common.utils.ExceptionUtils;
import org.apache.rocketmq.proxy.remoting.common.RemotingConverter;
import org.apache.rocketmq.proxy.service.message.MessageService;
import org.apache.rocketmq.proxy.service.route.AddressableMessageQueue;
import org.apache.rocketmq.remoting.protocol.RemotingCommand;
import org.apache.rocketmq.remoting.protocol.RequestCode;
import org.apache.rocketmq.remoting.protocol.ResponseCode;
import org.apache.rocketmq.remoting.protocol.body.LockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.body.UnlockBatchRequestBody;
import org.apache.rocketmq.remoting.protocol.header.AckMessageRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ChangeInvisibleTimeRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.CheckTransactionStateRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.ConsumerSendMsgBackRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.EndTransactionRequestHeader;
import org.apache.rocketmq.remoting.protocol.header.GetMaxOffsetRequestHeader;
Expand All @@ -107,18 +119,27 @@
private final LockService lockService;
private final DeadLetterSender deadLetterService;
private final SuspendRequestService suspendRequestService;
private final ProducerManager producerManager;
private final ExecutorService executorService = ThreadPoolMonitor.createAndMonitor(2, 5, 100, TimeUnit.SECONDS,
"Transaction-msg-check-thread", 2000);

public MessageServiceImpl(ProxyConfig config, MessageStore store, ProxyMetadataService metadataService,
LockService lockService, DeadLetterSender deadLetterService) throws StoreException {
LockService lockService, DeadLetterSender deadLetterService,
ProducerManager producerManager) throws StoreException {
this.config = config;
this.store = store;
this.metadataService = metadataService;
this.deadLetterService = deadLetterService;
this.lockService = lockService;
this.suspendRequestService = SuspendRequestService.getInstance();
store.registerTransactionCheckHandler(timerTag -> {
// TODO check transaction status
});
this.producerManager = producerManager;
store.registerTransactionCheckHandler(timerTag -> executorService.execute(() -> {
try {
checkTransactionStatus(timerTag);
} catch (Throwable t) {
LOGGER.error("Error while check transaction status", t);

Check warning on line 140 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L138-L140

Added lines #L138 - L140 were not covered by tests
}
}));

Check warning on line 142 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L142

Added line #L142 was not covered by tests
}

public TopicMessageType getMessageType(SendMessageRequestHeader requestHeader) {
Expand Down Expand Up @@ -305,7 +326,53 @@
@Override
public CompletableFuture<Void> endTransactionOneway(ProxyContext ctx, String brokerName,
EndTransactionRequestHeader requestHeader, long timeoutMillis) {
throw new UnsupportedOperationException();
TransactionResolution resolution;
switch (requestHeader.getCommitOrRollback()) {
case MessageSysFlag.TRANSACTION_COMMIT_TYPE -> resolution = TransactionResolution.COMMIT;

Check warning on line 331 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L331

Added line #L331 was not covered by tests
case MessageSysFlag.TRANSACTION_ROLLBACK_TYPE, MessageSysFlag.TRANSACTION_NOT_TYPE ->
resolution = TransactionResolution.ROLLBACK;

Check warning on line 333 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L333

Added line #L333 was not covered by tests
default -> {
return CompletableFuture.failedFuture(new ProxyException(apache.rocketmq.v2.Code.BAD_REQUEST, "Unknown transaction resolution"));

Check warning on line 335 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L335

Added line #L335 was not covered by tests
}
}

return store.endTransaction(requestHeader.getTransactionId(), resolution);

Check warning on line 339 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L339

Added line #L339 was not covered by tests
}

private void checkTransactionStatus(TimerTag timerTag) {
ByteBuffer payload = timerTag.payloadAsByteBuffer();
FlatMessage message = FlatMessage.getRootAsFlatMessage(payload);

Check warning on line 344 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L343-L344

Added lines #L343 - L344 were not covered by tests
try {
message.systemProperties().mutateOrphanedTransactionCheckTimes(message.systemProperties().orphanedTransactionCheckTimes() + 1);
store.scheduleCheckTransaction(message);

Check warning on line 347 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L346-L347

Added lines #L346 - L347 were not covered by tests

Topic topic = metadataService.topicOf(message.topicId()).join();
Channel channel = producerManager.getAvailableChannel(topic.getName());

Check warning on line 350 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L349-L350

Added lines #L349 - L350 were not covered by tests
if (channel != null) {
CheckTransactionStateRequestHeader requestHeader = new CheckTransactionStateRequestHeader();
requestHeader.setCommitLogOffset(0L);
requestHeader.setOffsetMsgId("");
requestHeader.setTranStateTableOffset(0L);
requestHeader.setMsgId(message.systemProperties().messageId());
requestHeader.setTransactionId(message.systemProperties().messageId());

Check warning on line 357 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L352-L357

Added lines #L352 - L357 were not covered by tests

ByteBuffer buffer = timerTag.identityAsByteBuffer();
byte[] identity = new byte[buffer.remaining()];
buffer.get(identity);
requestHeader.setTransactionId(new String(identity));

Check warning on line 362 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L359-L362

Added lines #L359 - L362 were not covered by tests

RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.CHECK_TRANSACTION_STATE, requestHeader);
FlatMessageExt flatMessageExt = FlatMessageExt.Builder.builder()
.offset(0L)
.message(message)
.build();
MessageExt messageExt = FlatMessageUtil.convertTo(flatMessageExt, topic.getName(), 0, config.hostName(), config.grpcListenPort());
request.setBody(RemotingConverter.getInstance().convertMsgToBytes(messageExt));
channel.writeAndFlush(request);

Check warning on line 371 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L364-L371

Added lines #L364 - L371 were not covered by tests
}
} catch (Exception e) {
LOGGER.error("Error while check transaction: message {}", message.systemProperties().messageId(), e);

Check warning on line 374 in proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/MessageServiceImpl.java#L373-L374

Added lines #L373 - L374 were not covered by tests
}
}

record InnerPopResult(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
@Override
public RelayData<TransactionData, Void> processCheckTransactionState(ProxyContext context, RemotingCommand command,
CheckTransactionStateRequestHeader header, MessageExt messageExt) {
throw new UnsupportedOperationException();
TransactionData transactionData = new TransactionData("", header.getTranStateTableOffset(), header.getCommitLogOffset(), header.getTransactionId(), System.currentTimeMillis(), 15_000L);
return new RelayData<>(transactionData, new CompletableFuture<>());

Check warning on line 51 in proxy/src/main/java/com/automq/rocketmq/proxy/service/ProxyRelayServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/ProxyRelayServiceImpl.java#L50-L51

Added lines #L50 - L51 were not covered by tests
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,50 +23,51 @@
import org.apache.rocketmq.proxy.service.transaction.EndTransactionRequestData;
import org.apache.rocketmq.proxy.service.transaction.TransactionData;
import org.apache.rocketmq.proxy.service.transaction.TransactionService;
import org.apache.rocketmq.remoting.protocol.header.EndTransactionRequestHeader;

public class TransactionServiceImpl implements TransactionService {
@Override
public void addTransactionSubscription(ProxyContext ctx, String group, List<String> topicList) {
throw new UnsupportedOperationException();
}

@Override
public void addTransactionSubscription(ProxyContext ctx, String group, String topic) {
throw new UnsupportedOperationException();
}

@Override
public void replaceTransactionSubscription(ProxyContext ctx, String group, List<String> topicList) {
throw new UnsupportedOperationException();

}

@Override
public void unSubscribeAllTransactionTopic(ProxyContext ctx, String group) {
throw new UnsupportedOperationException();
}

@Override
public TransactionData addTransactionDataByBrokerAddr(ProxyContext ctx, String brokerAddr, String producerGroup,
long tranStateTableOffset, long commitLogOffset, String transactionId, Message message) {
throw new UnsupportedOperationException();
return null;

Check warning on line 48 in proxy/src/main/java/com/automq/rocketmq/proxy/service/TransactionServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/TransactionServiceImpl.java#L48

Added line #L48 was not covered by tests
}

@Override
public TransactionData addTransactionDataByBrokerName(ProxyContext ctx, String brokerName, String producerGroup,
long tranStateTableOffset, long commitLogOffset, String transactionId, Message message) {
throw new UnsupportedOperationException();
return null;

Check warning on line 54 in proxy/src/main/java/com/automq/rocketmq/proxy/service/TransactionServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/TransactionServiceImpl.java#L54

Added line #L54 was not covered by tests
}

@Override
public EndTransactionRequestData genEndTransactionRequestHeader(ProxyContext ctx, String producerGroup,
Integer commitOrRollback, boolean fromTransactionCheck, String msgId, String transactionId) {
throw new UnsupportedOperationException();
EndTransactionRequestHeader requestHeader = new EndTransactionRequestHeader();
requestHeader.setProducerGroup(producerGroup);
requestHeader.setCommitOrRollback(commitOrRollback);
requestHeader.setFromTransactionCheck(fromTransactionCheck);
requestHeader.setMsgId(msgId);
requestHeader.setTransactionId(transactionId);
return new EndTransactionRequestData("", requestHeader);

Check warning on line 66 in proxy/src/main/java/com/automq/rocketmq/proxy/service/TransactionServiceImpl.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/service/TransactionServiceImpl.java#L60-L66

Added lines #L60 - L66 were not covered by tests
}

@Override
public void onSendCheckTransactionStateFailed(ProxyContext context, String producerGroup,
TransactionData transactionData) {
throw new UnsupportedOperationException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -254,6 +254,7 @@
String preparedTransactionMark = properties.remove(MessageConst.PROPERTY_TRANSACTION_PREPARED);
if ("true".equals(preparedTransactionMark)) {
systemPropertiesT.setPreparedTransactionMark(true);
systemPropertiesT.setOrphanedTransactionCheckTimes(0);

Check warning on line 257 in proxy/src/main/java/com/automq/rocketmq/proxy/util/FlatMessageUtil.java

View check run for this annotation

Codecov / codecov/patch

proxy/src/main/java/com/automq/rocketmq/proxy/util/FlatMessageUtil.java#L257

Added line #L257 was not covered by tests
}

String orphanedTransactionRecoverySeconds = properties.remove(MessageConst.PROPERTY_CHECK_IMMUNITY_TIME_IN_SECONDS);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
import com.automq.rocketmq.store.model.message.PullResult;
import com.automq.rocketmq.store.model.message.PutResult;
import com.automq.rocketmq.store.model.message.ResetConsumeOffsetResult;
import com.automq.rocketmq.store.model.transaction.TransactionResolution;
import com.automq.rocketmq.store.service.InflightService;
import java.util.ArrayList;
import java.util.HashMap;
Expand Down Expand Up @@ -187,6 +188,20 @@ public CompletableFuture<ClearRetryMessagesResult> clearRetryMessages(long consu
throw new UnsupportedOperationException("Not implemented yet");
}

@Override
public CompletableFuture<Boolean> cancelDelayMessage(String messageId) {
return CompletableFuture.completedFuture(true);
}

@Override
public CompletableFuture<Void> endTransaction(String transactionId, TransactionResolution resolution) {
return CompletableFuture.completedFuture(null);
}

@Override
public void scheduleCheckTransaction(FlatMessage message) throws StoreException {
}

@Override
public void registerMessageArriveListener(MessageArrivalListener listener) {
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import org.apache.rocketmq.broker.client.ProducerManager;
import org.apache.rocketmq.client.consumer.AckResult;
import org.apache.rocketmq.client.consumer.AckStatus;
import org.apache.rocketmq.client.consumer.PopResult;
Expand Down Expand Up @@ -102,7 +103,7 @@ public void setUp() throws StoreException {
ProxyConfig config = new ProxyConfig();
deadLetterSender = Mockito.mock(DeadLetterSender.class);
Mockito.doReturn(CompletableFuture.completedFuture(null)).when(deadLetterSender).send(Mockito.any(), Mockito.anyLong(), Mockito.any());
messageService = new MessageServiceImpl(config, messageStore, metadataService, new LockService(config), deadLetterSender);
messageService = new MessageServiceImpl(config, messageStore, metadataService, new LockService(config), deadLetterSender, new ProducerManager());
}

@Test
Expand Down
Loading
Loading