Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[fix][txn] Implement compatibility for transaction buffer segmented snapshot feature upgrade #20235

Merged
merged 7 commits into from
May 16, 2023
Merged
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.systopic.SystemTopicClient;
import org.apache.pulsar.broker.transaction.buffer.AbortedTxnProcessor;
import org.apache.pulsar.broker.transaction.buffer.metadata.TransactionBufferSnapshot;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndex;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexesMetadata;
Expand Down Expand Up @@ -265,7 +266,7 @@ public CompletableFuture<PositionImpl> recoverFromSnapshot() {
PositionImpl finalStartReadCursorPosition = startReadCursorPosition;
TransactionBufferSnapshotIndexes finalPersistentSnapshotIndexes = persistentSnapshotIndexes;
if (persistentSnapshotIndexes == null) {
return CompletableFuture.completedFuture(null);
return recoverOldSnapshot();
} else {
this.unsealedTxnIds = convertTypeToTxnID(persistentSnapshotIndexes
.getSnapshot().getAborts());
Expand Down Expand Up @@ -378,6 +379,59 @@ public void openReadOnlyManagedLedgerFailed(ManagedLedgerException exception, Ob
.getExecutor(this));
}

// This method will be deprecated and removed in version 4.x.0
private CompletableFuture<PositionImpl> recoverOldSnapshot() {
return topic.getBrokerService().getPulsar().getTransactionBufferSnapshotServiceFactory()
.getTxnBufferSnapshotService()
.createReader(TopicName.get(topic.getName())).thenComposeAsync(snapshotReader -> {
PositionImpl startReadCursorPositionInOldSnapshot = null;
try {
while (snapshotReader.hasMoreEvents()) {
Message<TransactionBufferSnapshot> message = snapshotReader.readNextAsync()
.get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
if (topic.getName().equals(message.getKey())) {
TransactionBufferSnapshot transactionBufferSnapshot =
message.getValue();
if (transactionBufferSnapshot != null) {
handleOldSnapshot(transactionBufferSnapshot);
startReadCursorPositionInOldSnapshot = PositionImpl.get(
transactionBufferSnapshot.getMaxReadPositionLedgerId(),
transactionBufferSnapshot.getMaxReadPositionEntryId());
}
}
}
} catch (TimeoutException ex) {
Throwable t = FutureUtil.unwrapCompletionException(ex);
String errorMessage = String.format("[%s] Transaction buffer recover fail by "
+ "read transactionBufferSnapshot timeout!", topic.getName());
log.error(errorMessage, t);
return FutureUtil.failedFuture(new BrokerServiceException
.ServiceUnitNotReadyException(errorMessage, t));
} catch (Exception ex) {
log.error("[{}] Transaction buffer recover fail when read "
+ "transactionBufferSnapshot!", topic.getName(), ex);
return FutureUtil.failedFuture(ex);
} finally {
assert snapshotReader != null;
closeReader(snapshotReader);
}
return CompletableFuture.completedFuture(startReadCursorPositionInOldSnapshot);
});
}

// This method will be deprecated and removed in version 4.x.0
private void handleOldSnapshot(TransactionBufferSnapshot snapshot) {
if (snapshot.getAborts() != null) {
snapshot.getAborts().forEach(abortTxnMetadata -> {
TxnID txnID = new TxnID(abortTxnMetadata.getTxnIdMostBits(),
abortTxnMetadata.getTxnIdLeastBits());
aborts.put(txnID, txnID);
//The old data will be written into the first segment.
unsealedTxnIds.add(txnID);
});
}
}

@Override
public CompletableFuture<Void> clearAbortedTxnSnapshot() {
return persistentWorker.appendTask(PersistentWorker.OperationType.Clear,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.transaction;

import static org.junit.Assert.assertEquals;
import static org.testng.Assert.assertTrue;
import static org.testng.Assert.fail;

Expand All @@ -44,10 +45,15 @@
import org.apache.pulsar.broker.transaction.buffer.impl.SnapshotSegmentAbortedTxnProcessorImpl;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexes;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.transaction.Transaction;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.events.EventType;
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TopicStats;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
Expand Down Expand Up @@ -247,8 +253,8 @@ public void testClearSnapshotSegments() throws Exception {
private void verifySnapshotSegmentsSize(String topic, int size) throws Exception {
SystemTopicClient.Reader<TransactionBufferSnapshotSegment> reader =
pulsarService.getTransactionBufferSnapshotServiceFactory()
.getTxnBufferSnapshotSegmentService()
.createReader(TopicName.get(topic)).get();
.getTxnBufferSnapshotSegmentService()
.createReader(TopicName.get(topic)).get();
int segmentCount = 0;
while (reader.hasMoreEvents()) {
Message<TransactionBufferSnapshotSegment> message = reader.readNextAsync()
Expand Down Expand Up @@ -286,4 +292,79 @@ private void doCompaction(TopicName topic) throws Exception {
CompletableFuture<Long> compactionFuture = (CompletableFuture<Long>) field.get(snapshotTopic);
org.awaitility.Awaitility.await().untilAsserted(() -> assertTrue(compactionFuture.isDone()));
}

/**
* This test verifies the compatibility of the transaction buffer segmented snapshot feature
* when enabled on an existing topic.
* It performs the following steps:
* 1. Creates a topic with segmented snapshot disabled.
* 2. Sends 10 messages without using transactions.
* 3. Sends 10 messages using transactions and aborts them.
* 4. Verifies that only the non-transactional messages are received.
* 5. Enables the segmented snapshot feature and sets the snapshot segment size.
* 6. Unloads the topic and re-verifies that only the non-transactional messages are received.
* 7. Sends a new message, and checks if the topic has exactly one segment.
*/
@Test
public void testSnapshotProcessorUpdate() throws Exception {
this.pulsarService = getPulsarServiceList().get(0);
this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(false);

// Create a topic, send 10 messages without using transactions, and send 10 messages using transactions.
// Abort these transactions and verify the data.
final String topicName = "persistent://" + NAMESPACE1 + "/testSnapshotProcessorUpdate";
Producer<byte[]> producer = pulsarClient.newProducer().topic(topicName).create();
Consumer<byte[]> consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("test-sub").subscribe();

// Send 10 messages without using transactions
for (int i = 0; i < 10; i++) {
producer.send(("test-message-" + i).getBytes());
}

// Send 10 messages using transactions and abort them
for (int i = 0; i < 10; i++) {
Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build().get();
producer.newMessage(txn).value(("test-txn-message-" + i).getBytes()).sendAsync();
txn.abort().get();
}

// Verify the data
for (int i = 0; i < 10; i++) {
Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
assertEquals("test-message-" + i, new String(msg.getData()));
consumer.acknowledge(msg);
}

// Enable segmented snapshot
this.pulsarService.getConfig().setTransactionBufferSegmentedSnapshotEnabled(true);
this.pulsarService.getConfig().setTransactionBufferSnapshotSegmentSize(8 + PROCESSOR_TOPIC.length() +
SEGMENT_SIZE * 3);

// Unload the topic and re-verify the data
admin.topics().unload(topicName);
consumer.close();
consumer = pulsarClient.newConsumer().topic(topicName).subscriptionName("test-sub").subscribe();
for (int i = 0; i < 10; i++) {
Message<byte[]> msg = consumer.receive(5, TimeUnit.SECONDS);
assertEquals("test-message-" + i, new String(msg.getData()));
consumer.acknowledge(msg);
}

// Send a message, unload the topic, and verify that the topic has exactly one segment
Transaction txn = pulsarClient.newTransaction()
.withTransactionTimeout(5, TimeUnit.SECONDS)
.build().get();
producer.newMessage(txn).value("test-message-new".getBytes()).send();
txn.abort().get();

// Check if the topic has only one segment
Awaitility.await().untilAsserted(() -> {
String segmentTopic = "persistent://" + NAMESPACE1 + "/" +
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS;
TopicStats topicStats = admin.topics().getStats(segmentTopic);
assertEquals(1, topicStats.getMsgInCounter());
});
}
}