Skip to content

Commit

Permalink
fix(proxy): add PROPERTY_PRODUCER_GROUP property for transaction mess…
Browse files Browse the repository at this point in the history
…age (#803)

Signed-off-by: SSpirits <[email protected]>
Signed-off-by: wangxye <[email protected]>
Co-authored-by: wangxye <[email protected]>
  • Loading branch information
ShadowySpirits and wangxye authored Dec 8, 2023
1 parent 0a7e33c commit e284f99
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 1 deletion.
2 changes: 2 additions & 0 deletions common/src/main/fbs/flat_message.fbs
Original file line number Diff line number Diff line change
Expand Up @@ -58,4 +58,6 @@ table SystemProperties {
orphaned_transaction_recovery_seconds: long;
// For a pending transaction message, this field records the check times.
orphaned_transaction_check_times:int = -1;
// For a pending transaction message, this field records the producer group name.
orphaned_transaction_producer:string;
}
3 changes: 3 additions & 0 deletions distribution/helm/deploy/helm_sample_values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ broker:
accessKey: "foot"
secretKey: "bar"

transactionTimeoutMillis: 100
transactionCheckInterval: 1000

mysql:
enabled: false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,11 @@ private void checkTransactionStatus(TimerTag timerTag) {
store.scheduleCheckTransaction(message);

Topic topic = metadataService.topicOf(message.topicId()).join();
Channel channel = producerManager.getAvailableChannel(topic.getName());
String producerGroup = message.systemProperties().orphanedTransactionProducer();
if (StringUtils.isBlank(producerGroup)) {
producerGroup = topic.getName();
}
Channel channel = producerManager.getAvailableChannel(producerGroup);
if (channel != null) {
CheckTransactionStateRequestHeader requestHeader = new CheckTransactionStateRequestHeader();
requestHeader.setCommitLogOffset(0L);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,11 @@ private static SystemPropertiesT splitSystemProperties(FlatMessageT flatMessageT
}
}

String orphanedTransactionProducer = properties.remove(MessageConst.PROPERTY_PRODUCER_GROUP);
if (!Strings.isNullOrEmpty(orphanedTransactionProducer)) {
systemPropertiesT.setOrphanedTransactionProducer(orphanedTransactionProducer);
}

// Remove all system properties
for (String systemPropertyKey : MessageConst.STRING_HASH_SET) {
properties.remove(systemPropertyKey);
Expand Down Expand Up @@ -302,5 +307,8 @@ private static void fillSystemProperties(MessageExt messageExt, FlatMessageExt f
if (systemProperties.dlqOriginalMessageId() != null) {
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_DLQ_ORIGIN_MESSAGE_ID, String.valueOf(systemProperties.dlqOriginalMessageId()));
}
if (systemProperties.orphanedTransactionProducer() != null) {
MessageAccessor.putProperty(messageExt, MessageConst.PROPERTY_PRODUCER_GROUP, systemProperties.orphanedTransactionProducer());
}
}
}

0 comments on commit e284f99

Please sign in to comment.