Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][txn] PIP-160: Pending ack log store enables the batch feature #16707

Merged
merged 9 commits into from
Jul 27, 2022
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,18 @@ public int compareTo(PositionImpl that) {
return 0;
}

public int compareTo(long ledgerId, long entryId) {
if (this.ledgerId != ledgerId) {
return (this.ledgerId < ledgerId ? -1 : 1);
}

if (this.entryId != entryId) {
return (this.entryId < entryId ? -1 : 1);
}

return 0;
}

@Override
public int hashCode() {
int result = (int) (ledgerId ^ (ledgerId >>> 32));
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,23 @@
*/
package org.apache.pulsar.broker.transaction.pendingack.impl;

import io.netty.util.Timer;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.exception.pendingack.TransactionPendingAckException;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;


/**
Expand All @@ -49,7 +53,25 @@ public CompletableFuture<PendingAckStore> newPendingAckStore(PersistentSubscript
.TransactionPendingAckStoreProviderException("The subscription is null."));
return pendingAckStoreFuture;
}

PersistentTopic originPersistentTopic = (PersistentTopic) subscription.getTopic();
PulsarService pulsarService = originPersistentTopic.getBrokerService().getPulsar();

final Timer brokerClientSharedTimer =
pulsarService.getBrokerClientSharedTimer();
final ServiceConfiguration serviceConfiguration = pulsarService.getConfiguration();
final TxnLogBufferedWriterConfig txnLogBufferedWriterConfig = new TxnLogBufferedWriterConfig();
txnLogBufferedWriterConfig.setBatchEnabled(serviceConfiguration.isTransactionPendingAckBatchedWriteEnabled());
txnLogBufferedWriterConfig.setBatchedWriteMaxRecords(
serviceConfiguration.getTransactionPendingAckBatchedWriteMaxRecords()
);
txnLogBufferedWriterConfig.setBatchedWriteMaxSize(
serviceConfiguration.getTransactionPendingAckBatchedWriteMaxSize()
);
txnLogBufferedWriterConfig.setBatchedWriteMaxDelayInMillis(
serviceConfiguration.getTransactionPendingAckBatchedWriteMaxDelayInMillis()
);

String pendingAckTopicName = MLPendingAckStore
.getTransactionPendingAckStoreSuffix(originPersistentTopic.getName(), subscription.getName());
originPersistentTopic.getBrokerService().getManagedLedgerFactory()
Expand Down Expand Up @@ -81,7 +103,9 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) {
.getBrokerService()
.getPulsar()
.getConfiguration()
.getTransactionPendingAckLogIndexMinLag()));
.getTransactionPendingAckLogIndexMinLag(),
txnLogBufferedWriterConfig,
brokerClientSharedTimer));
if (log.isDebugEnabled()) {
log.debug("{},{} open MLPendingAckStore cursor success",
originPersistentTopic.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,10 @@ public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{

@Test
public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{
TxnLogBufferedWriterConfig bufferedWriterConfig = new TxnLogBufferedWriterConfig();
HashedWheelTimer transactionTimer = new HashedWheelTimer(new DefaultThreadFactory("transaction-timer"),
1, TimeUnit.MILLISECONDS);

String topic = NAMESPACE1 + "/testEndTPRecoveringWhenManagerLedgerDisReadable";
admin.topics().createNonPartitionedTopic(topic);
@Cleanup
Expand Down Expand Up @@ -643,7 +647,8 @@ public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{

TransactionPendingAckStoreProvider pendingAckStoreProvider = mock(TransactionPendingAckStoreProvider.class);
doReturn(CompletableFuture.completedFuture(
new MLPendingAckStore(persistentTopic.getManagedLedger(), managedCursor, null, 500)))
new MLPendingAckStore(persistentTopic.getManagedLedger(), managedCursor, null,
500, bufferedWriterConfig, transactionTimer)))
.when(pendingAckStoreProvider).newPendingAckStore(any());
doReturn(CompletableFuture.completedFuture(true)).when(pendingAckStoreProvider).checkInitializedBefore(any());

Expand Down Expand Up @@ -676,6 +681,9 @@ public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{

Awaitility.await().untilAsserted(() ->
assertEquals(pendingAckHandle3.getStats(false).state, "Ready"));

// cleanup
transactionTimer.stop();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.pulsar.broker.transaction.pendingack;

import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
Expand All @@ -31,6 +34,7 @@
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
import org.testng.annotations.Test;
import java.lang.reflect.Field;
import java.util.concurrent.CompletableFuture;
Expand All @@ -48,6 +52,10 @@ public PendingAckMetadataTest() {

@Test
public void testPendingAckManageLedgerWriteFailState() throws Exception {
TxnLogBufferedWriterConfig bufferedWriterConfig = new TxnLogBufferedWriterConfig();
HashedWheelTimer transactionTimer = new HashedWheelTimer(new DefaultThreadFactory("transaction-timer"),
1, TimeUnit.MILLISECONDS);

ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
factoryConf.setMaxCacheSize(0);

Expand All @@ -72,7 +80,8 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
ManagedCursor cursor = completableFuture.get().openCursor("test");
ManagedCursor subCursor = completableFuture.get().openCursor("test");
MLPendingAckStore pendingAckStore =
new MLPendingAckStore(completableFuture.get(), cursor, subCursor, 500);
new MLPendingAckStore(completableFuture.get(), cursor, subCursor, 500,
bufferedWriterConfig, transactionTimer);

Field field = MLPendingAckStore.class.getDeclaredField("managedLedger");
field.setAccessible(true);
Expand All @@ -90,9 +99,12 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
}
pendingAckStore.appendAbortMark(new TxnID(1, 1), CommandAck.AckType.Cumulative).get();

// cleanup.
pendingAckStore.closeAsync();
completableFuture.get().close();
cursor.close();
subCursor.close();
transactionTimer.stop();
}

}
Loading