Skip to content

Commit

Permalink
[improve][txn] PIP-160: Pending ack log store enables the batch featu…
Browse files Browse the repository at this point in the history
…re (apache#16707)

Master Issue: apache#15370

### Motivation

see apache#15370

### Modifications

I will complete proposal apache#15370 with these pull requests( *current pull request is the step-4* ): 

1. Write the batch transaction log handler: `TxnLogBufferedWriter`
2. Configuration changes and protocol changes.
3. Transaction log store enables the batch feature.
4. Pending ack log store enables the batch feature.
5. Supports dynamic configuration.
6. Append admin API for transaction batch log and docs( admin and configuration doc ).
7. Append metrics support for transaction batch log.
  • Loading branch information
poorbarcode authored and Gleiphir2769 committed Aug 4, 2022
1 parent 9711735 commit 59c7261
Show file tree
Hide file tree
Showing 7 changed files with 506 additions and 44 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,18 @@ public int compareTo(PositionImpl that) {
return 0;
}

public int compareTo(long ledgerId, long entryId) {
if (this.ledgerId != ledgerId) {
return (this.ledgerId < ledgerId ? -1 : 1);
}

if (this.entryId != entryId) {
return (this.entryId < entryId ? -1 : 1);
}

return 0;
}

@Override
public int hashCode() {
int result = (int) (ledgerId ^ (ledgerId >>> 32));
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -18,19 +18,23 @@
*/
package org.apache.pulsar.broker.transaction.pendingack.impl;

import io.netty.util.Timer;
import java.util.concurrent.CompletableFuture;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.exception.pendingack.TransactionPendingAckException;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;


/**
Expand All @@ -49,7 +53,25 @@ public CompletableFuture<PendingAckStore> newPendingAckStore(PersistentSubscript
.TransactionPendingAckStoreProviderException("The subscription is null."));
return pendingAckStoreFuture;
}

PersistentTopic originPersistentTopic = (PersistentTopic) subscription.getTopic();
PulsarService pulsarService = originPersistentTopic.getBrokerService().getPulsar();

final Timer brokerClientSharedTimer =
pulsarService.getBrokerClientSharedTimer();
final ServiceConfiguration serviceConfiguration = pulsarService.getConfiguration();
final TxnLogBufferedWriterConfig txnLogBufferedWriterConfig = new TxnLogBufferedWriterConfig();
txnLogBufferedWriterConfig.setBatchEnabled(serviceConfiguration.isTransactionPendingAckBatchedWriteEnabled());
txnLogBufferedWriterConfig.setBatchedWriteMaxRecords(
serviceConfiguration.getTransactionPendingAckBatchedWriteMaxRecords()
);
txnLogBufferedWriterConfig.setBatchedWriteMaxSize(
serviceConfiguration.getTransactionPendingAckBatchedWriteMaxSize()
);
txnLogBufferedWriterConfig.setBatchedWriteMaxDelayInMillis(
serviceConfiguration.getTransactionPendingAckBatchedWriteMaxDelayInMillis()
);

String pendingAckTopicName = MLPendingAckStore
.getTransactionPendingAckStoreSuffix(originPersistentTopic.getName(), subscription.getName());
originPersistentTopic.getBrokerService().getManagedLedgerFactory()
Expand Down Expand Up @@ -81,7 +103,9 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) {
.getBrokerService()
.getPulsar()
.getConfiguration()
.getTransactionPendingAckLogIndexMinLag()));
.getTransactionPendingAckLogIndexMinLag(),
txnLogBufferedWriterConfig,
brokerClientSharedTimer));
if (log.isDebugEnabled()) {
log.debug("{},{} open MLPendingAckStore cursor success",
originPersistentTopic.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -612,6 +612,10 @@ public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{

@Test
public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{
TxnLogBufferedWriterConfig bufferedWriterConfig = new TxnLogBufferedWriterConfig();
HashedWheelTimer transactionTimer = new HashedWheelTimer(new DefaultThreadFactory("transaction-timer"),
1, TimeUnit.MILLISECONDS);

String topic = NAMESPACE1 + "/testEndTPRecoveringWhenManagerLedgerDisReadable";
admin.topics().createNonPartitionedTopic(topic);
@Cleanup
Expand Down Expand Up @@ -643,7 +647,8 @@ public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{

TransactionPendingAckStoreProvider pendingAckStoreProvider = mock(TransactionPendingAckStoreProvider.class);
doReturn(CompletableFuture.completedFuture(
new MLPendingAckStore(persistentTopic.getManagedLedger(), managedCursor, null, 500)))
new MLPendingAckStore(persistentTopic.getManagedLedger(), managedCursor, null,
500, bufferedWriterConfig, transactionTimer)))
.when(pendingAckStoreProvider).newPendingAckStore(any());
doReturn(CompletableFuture.completedFuture(true)).when(pendingAckStoreProvider).checkInitializedBefore(any());

Expand Down Expand Up @@ -676,6 +681,9 @@ public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{

Awaitility.await().untilAsserted(() ->
assertEquals(pendingAckHandle3.getStats(false).state, "Ready"));

// cleanup
transactionTimer.stop();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,9 @@
*/
package org.apache.pulsar.broker.transaction.pendingack;

import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
Expand All @@ -31,6 +34,7 @@
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
import org.testng.annotations.Test;
import java.lang.reflect.Field;
import java.util.concurrent.CompletableFuture;
Expand All @@ -48,6 +52,10 @@ public PendingAckMetadataTest() {

@Test
public void testPendingAckManageLedgerWriteFailState() throws Exception {
TxnLogBufferedWriterConfig bufferedWriterConfig = new TxnLogBufferedWriterConfig();
HashedWheelTimer transactionTimer = new HashedWheelTimer(new DefaultThreadFactory("transaction-timer"),
1, TimeUnit.MILLISECONDS);

ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
factoryConf.setMaxCacheSize(0);

Expand All @@ -72,7 +80,8 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
ManagedCursor cursor = completableFuture.get().openCursor("test");
ManagedCursor subCursor = completableFuture.get().openCursor("test");
MLPendingAckStore pendingAckStore =
new MLPendingAckStore(completableFuture.get(), cursor, subCursor, 500);
new MLPendingAckStore(completableFuture.get(), cursor, subCursor, 500,
bufferedWriterConfig, transactionTimer);

Field field = MLPendingAckStore.class.getDeclaredField("managedLedger");
field.setAccessible(true);
Expand All @@ -90,9 +99,12 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
}
pendingAckStore.appendAbortMark(new TxnID(1, 1), CommandAck.AckType.Cumulative).get();

// cleanup.
pendingAckStore.closeAsync();
completableFuture.get().close();
cursor.close();
subCursor.close();
transactionTimer.stop();
}

}
Loading

0 comments on commit 59c7261

Please sign in to comment.