Skip to content

Commit

Permalink
[improve][txn] PIP-160: Pending ack log store enables the batch feature
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Jul 20, 2022
1 parent 74bafe2 commit 3630888
Show file tree
Hide file tree
Showing 8 changed files with 480 additions and 23 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,18 @@ public int compareTo(PositionImpl that) {
return 0;
}

public int compareTo(long ledgerId, long entryId) {
if (this.ledgerId != ledgerId) {
return (this.ledgerId < ledgerId ? -1 : 1);
}

if (this.entryId != entryId) {
return (this.entryId < entryId ? -1 : 1);
}

return 0;
}

@Override
public int hashCode() {
int result = (int) (ledgerId ^ (ledgerId >>> 32));
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -19,18 +19,22 @@
package org.apache.pulsar.broker.transaction.pendingack.impl;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.ManagedLedger;
import org.apache.bookkeeper.mledger.ManagedLedgerException;
import org.apache.pulsar.broker.PulsarService;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.exception.pendingack.TransactionPendingAckException;
import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore;
import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider;
import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;


/**
Expand All @@ -49,7 +53,25 @@ public CompletableFuture<PendingAckStore> newPendingAckStore(PersistentSubscript
.TransactionPendingAckStoreProviderException("The subscription is null."));
return pendingAckStoreFuture;
}

PersistentTopic originPersistentTopic = (PersistentTopic) subscription.getTopic();
PulsarService pulsarService = originPersistentTopic.getBrokerService().getPulsar();

final ScheduledExecutorService transactionLogExecutor =
pulsarService.getBrokerService().getTransactionLogBufferedWriteAsyncFlushTrigger();
final ServiceConfiguration serviceConfiguration = pulsarService.getConfiguration();
final TxnLogBufferedWriterConfig txnLogBufferedWriterConfig = new TxnLogBufferedWriterConfig();
txnLogBufferedWriterConfig.setBatchEnabled(serviceConfiguration.isTransactionPendingAckBatchedWriteEnabled());
txnLogBufferedWriterConfig.setBatchedWriteMaxRecords(
serviceConfiguration.getTransactionPendingAckBatchedWriteMaxRecords()
);
txnLogBufferedWriterConfig.setBatchedWriteMaxSize(
serviceConfiguration.getTransactionPendingAckBatchedWriteMaxSize()
);
txnLogBufferedWriterConfig.setBatchedWriteMaxDelayInMillis(
serviceConfiguration.getTransactionPendingAckBatchedWriteMaxDelayInMillis()
);

String pendingAckTopicName = MLPendingAckStore
.getTransactionPendingAckStoreSuffix(originPersistentTopic.getName(), subscription.getName());
originPersistentTopic.getBrokerService().getManagedLedgerFactory()
Expand Down Expand Up @@ -81,7 +103,9 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) {
.getBrokerService()
.getPulsar()
.getConfiguration()
.getTransactionPendingAckLogIndexMinLag()));
.getTransactionPendingAckLogIndexMinLag(),
txnLogBufferedWriterConfig,
transactionLogExecutor));
if (log.isDebugEnabled()) {
log.debug("{},{} open MLPendingAckStore cursor success",
originPersistentTopic.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
import org.awaitility.Awaitility;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
Expand Down Expand Up @@ -609,6 +610,10 @@ public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{

@Test
public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{
TxnLogBufferedWriterConfig bufferedWriterConfig = new TxnLogBufferedWriterConfig();
bufferedWriterConfig.setBatchEnabled(true);
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

String topic = NAMESPACE1 + "/testEndTPRecoveringWhenManagerLedgerDisReadable";
admin.topics().createNonPartitionedTopic(topic);
@Cleanup
Expand Down Expand Up @@ -640,7 +645,8 @@ public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{

TransactionPendingAckStoreProvider pendingAckStoreProvider = mock(TransactionPendingAckStoreProvider.class);
doReturn(CompletableFuture.completedFuture(
new MLPendingAckStore(persistentTopic.getManagedLedger(), managedCursor, null, 500)))
new MLPendingAckStore(persistentTopic.getManagedLedger(), managedCursor, null,
500, bufferedWriterConfig, scheduledExecutorService)))
.when(pendingAckStoreProvider).newPendingAckStore(any());
doReturn(CompletableFuture.completedFuture(true)).when(pendingAckStoreProvider).checkInitializedBefore(any());

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
*/
package org.apache.pulsar.broker.transaction.pendingack;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
Expand All @@ -31,6 +33,7 @@
import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
import org.testng.annotations.Test;
import java.lang.reflect.Field;
import java.util.concurrent.CompletableFuture;
Expand All @@ -48,6 +51,9 @@ public PendingAckMetadataTest() {

@Test
public void testPendingAckManageLedgerWriteFailState() throws Exception {
TxnLogBufferedWriterConfig bufferedWriterConfig = new TxnLogBufferedWriterConfig();
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
factoryConf.setMaxCacheSize(0);

Expand All @@ -72,7 +78,8 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
ManagedCursor cursor = completableFuture.get().openCursor("test");
ManagedCursor subCursor = completableFuture.get().openCursor("test");
MLPendingAckStore pendingAckStore =
new MLPendingAckStore(completableFuture.get(), cursor, subCursor, 500);
new MLPendingAckStore(completableFuture.get(), cursor, subCursor, 500,
bufferedWriterConfig, scheduledExecutorService);

Field field = MLPendingAckStore.class.getDeclaredField("managedLedger");
field.setAccessible(true);
Expand All @@ -90,9 +97,12 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
}
pendingAckStore.appendAbortMark(new TxnID(1, 1), CommandAck.AckType.Cumulative).get();

// cleanup.
pendingAckStore.closeAsync();
completableFuture.get().close();
cursor.close();
subCursor.close();
scheduledExecutorService.shutdown();
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
package org.apache.pulsar.broker.transaction.pendingack.impl;

import java.util.ArrayList;
import java.util.Iterator;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.ManagedCursor;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.service.Subscription;
import org.apache.pulsar.broker.service.persistent.PersistentSubscription;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.TransactionTestBase;
import org.apache.pulsar.broker.transaction.util.LogIndexLagBackoff;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.api.proto.CommandAck;
import org.apache.pulsar.common.api.proto.CommandSubscribe;
import org.apache.pulsar.common.util.FutureUtil;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
import static org.mockito.Mockito.*;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.testcontainers.shaded.org.awaitility.Awaitility;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
import org.testng.annotations.DataProvider;
import org.testng.annotations.Test;

@Slf4j
@Test(groups = "broker")
public class MLPendingAckStoreTest extends TransactionTestBase {

private PersistentSubscription persistentSubscriptionMock;

private ManagedCursor managedCursorMock;

private ExecutorService internalPinnedExecutor;

private int pendingAckLogIndexMinLag = 1;

@BeforeMethod
@Override
protected void setup() throws Exception {
setUpBase(1, 1, NAMESPACE1 + "/test", 0);
String topic = NAMESPACE1 + "/test-txn-topic";
admin.topics().createNonPartitionedTopic(topic);
PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService()
.getTopic(topic, false).get().get();
getPulsarServiceList().get(0).getConfig().setTransactionPendingAckLogIndexMinLag(pendingAckLogIndexMinLag);
CompletableFuture<Subscription> subscriptionFuture = persistentTopic .createSubscription("test",
CommandSubscribe.InitialPosition.Earliest, false, null);
PersistentSubscription subscription = (PersistentSubscription) subscriptionFuture.get();
ManagedCursor managedCursor = subscription.getCursor();
this.managedCursorMock = spy(managedCursor);
this.persistentSubscriptionMock = spy(subscription);
when(this.persistentSubscriptionMock.getCursor()).thenReturn(managedCursorMock);
this.internalPinnedExecutor = this.persistentSubscriptionMock
.getTopic()
.getBrokerService()
.getPulsar()
.getTransactionExecutorProvider()
.getExecutor(this);
}

@AfterMethod
public void cleanup(){
super.internalCleanup();
}

private MLPendingAckStore createPendingAckStore(TxnLogBufferedWriterConfig txnLogBufferedWriterConfig)
throws Exception {
MLPendingAckStoreProvider mlPendingAckStoreProvider = new MLPendingAckStoreProvider();
ServiceConfiguration serviceConfiguration =
persistentSubscriptionMock.getTopic().getBrokerService().getPulsar().getConfiguration();
serviceConfiguration.setTransactionPendingAckBatchedWriteMaxRecords(
txnLogBufferedWriterConfig.getBatchedWriteMaxRecords()
);
serviceConfiguration.setTransactionPendingAckBatchedWriteMaxSize(
txnLogBufferedWriterConfig.getBatchedWriteMaxSize()
);
serviceConfiguration.setTransactionPendingAckBatchedWriteMaxDelayInMillis(
txnLogBufferedWriterConfig.getBatchedWriteMaxDelayInMillis()
);
serviceConfiguration.setTransactionPendingAckBatchedWriteEnabled(txnLogBufferedWriterConfig.isBatchEnabled());
return (MLPendingAckStore) mlPendingAckStoreProvider.newPendingAckStore(persistentSubscriptionMock).get();
}

@DataProvider(name = "mainProcessArgs")
public Object[][] mainProcessArgsProvider(){
Object[][] args = new Object[4][];
args[0] = new Object[]{true, true};
args[1] = new Object[]{false, false};
args[2] = new Object[]{true, false};
args[3] = new Object[]{false, true};
return args;
}

@Test(dataProvider = "mainProcessArgs")
public void test1(boolean writeWithBatch, boolean readWithBatch) throws Exception {
// Write some data.
TxnLogBufferedWriterConfig configForWrite = new TxnLogBufferedWriterConfig();
configForWrite.setBatchEnabled(writeWithBatch);
configForWrite.setBatchedWriteMaxRecords(2);
// Denied scheduled flush.
configForWrite.setBatchedWriteMaxDelayInMillis(1000 * 3600);
MLPendingAckStore mlPendingAckStoreForWrite = createPendingAckStore(configForWrite);
List<CompletableFuture<Void>> futureList = new ArrayList<>();
for (int i = 0; i < 20; i++){
TxnID txnID = new TxnID(i, i);
PositionImpl position = PositionImpl.get(i, i);
futureList.add(mlPendingAckStoreForWrite.appendCumulativeAck(txnID, position));
}
for (int i = 0; i < 10; i++){
TxnID txnID = new TxnID(i, i);
futureList.add(mlPendingAckStoreForWrite.appendCommitMark(txnID, CommandAck.AckType.Cumulative));
}
for (int i = 10; i < 20; i++){
TxnID txnID = new TxnID(i, i);
futureList.add(mlPendingAckStoreForWrite.appendAbortMark(txnID, CommandAck.AckType.Cumulative));
}
for (int i = 40; i < 50; i++){
TxnID txnID = new TxnID(i, i);
PositionImpl position = PositionImpl.get(i, i);
futureList.add(mlPendingAckStoreForWrite.appendCumulativeAck(txnID, position));
}
FutureUtil.waitForAll(futureList).get();
// Verify build sparse indexes correct after add many cmd-ack.
ArrayList<Long> positionList = new ArrayList<>();
for (long i = 0; i < 50; i++){
positionList.add(i);
}
// The indexes not contains the data which is commit or abort.
LinkedHashSet<Long> skipSet = new LinkedHashSet<>();
for (long i = 20; i < 40; i++){
skipSet.add(i);
}
if (writeWithBatch) {
for (long i = 0; i < 50; i++){
if (i % 2 == 0){
// The indexes contains only the last position in the batch.
skipSet.add(i);
}
}
}
LinkedHashSet<Long> expectedPositions = calculatePendingAckIndexes(positionList, skipSet);
Assert.assertEquals(mlPendingAckStoreForWrite.pendingAckLogIndex.keySet().stream().map(PositionImpl::getEntryId).collect(
Collectors.toList()), new ArrayList<>(expectedPositions));
// Replay.
TxnLogBufferedWriterConfig configForReplay = new TxnLogBufferedWriterConfig();
configForReplay.setBatchEnabled(readWithBatch);
configForReplay.setBatchedWriteMaxRecords(2);
// Denied scheduled flush.
configForReplay.setBatchedWriteMaxDelayInMillis(1000 * 3600);
MLPendingAckStore mlPendingAckStoreForRead = createPendingAckStore(configForReplay);
PendingAckHandleImpl pendingAckHandle = mock(PendingAckHandleImpl.class);
when(pendingAckHandle.getInternalPinnedExecutor()).thenReturn(internalPinnedExecutor);
when(pendingAckHandle.changeToReadyState()).thenReturn(true);
// Process controller, mark the replay task already finish.
final AtomicInteger processController = new AtomicInteger();
doAnswer(new Answer() {
@Override
public Object answer(InvocationOnMock invocation) throws Throwable {
processController.incrementAndGet();
return null;
}
}).when(pendingAckHandle).completeHandleFuture();
mlPendingAckStoreForRead.replayAsync(pendingAckHandle, internalPinnedExecutor);
Awaitility.await().atMost(200, TimeUnit.SECONDS).until(() -> processController.get() == 1);
// Verify build sparse indexes correct after replay.
Assert.assertEquals(mlPendingAckStoreForRead.pendingAckLogIndex.size(),
mlPendingAckStoreForWrite.pendingAckLogIndex.size());
Iterator<Map.Entry<PositionImpl, PositionImpl>> iteratorReplay =
mlPendingAckStoreForRead.pendingAckLogIndex.entrySet().iterator();
Iterator<Map.Entry<PositionImpl, PositionImpl>> iteratorWrite =
mlPendingAckStoreForWrite.pendingAckLogIndex.entrySet().iterator();
while (iteratorReplay.hasNext()){
Map.Entry<PositionImpl, PositionImpl> replayEntry = iteratorReplay.next();
Map.Entry<PositionImpl, PositionImpl> writeEntry = iteratorWrite.next();
Assert.assertEquals(replayEntry.getKey(), writeEntry.getKey());
Assert.assertEquals(replayEntry.getValue().getLedgerId(), writeEntry.getValue().getLedgerId());
Assert.assertEquals(replayEntry.getValue().getEntryId(), writeEntry.getValue().getEntryId());
}
// Verify delete correct.
when(managedCursorMock.getPersistentMarkDeletedPosition()).thenReturn(PositionImpl.get(19, 19));
mlPendingAckStoreForWrite.clearUselessLogData();
mlPendingAckStoreForRead.clearUselessLogData();
Assert.assertTrue(mlPendingAckStoreForWrite.pendingAckLogIndex.keySet().iterator().next().getEntryId() > 19);
Assert.assertTrue(mlPendingAckStoreForRead.pendingAckLogIndex.keySet().iterator().next().getEntryId() > 19);

// cleanup.
mlPendingAckStoreForWrite.closeAsync().get();
mlPendingAckStoreForRead.closeAsync().get();
}

/**
* Build a sparse index from the {@param positionList}, the logic same as {@link MLPendingAckStore}.
* @param positionList the position add to pending ack log/
* @param skipSet the position which should increment the count but not marked to indexes. aka: commit & abort.
*/
private LinkedHashSet<Long> calculatePendingAckIndexes(List<Long> positionList, LinkedHashSet<Long> skipSet){
LogIndexLagBackoff logIndexLagBackoff = new LogIndexLagBackoff(pendingAckLogIndexMinLag, Long.MAX_VALUE, 1);
long nextCount = logIndexLagBackoff.next(0);
long recordCountInCurrentLoop = 0;
LinkedHashSet<Long> indexes = new LinkedHashSet<>();
for (int i = 0; i < positionList.size(); i++){
recordCountInCurrentLoop ++;
long value = positionList.get(i);
if (skipSet.contains(value)){
continue;
}
if (recordCountInCurrentLoop >= nextCount){
indexes.add(value);
nextCount = logIndexLagBackoff.next(indexes.size());
recordCountInCurrentLoop = 0;
}
}
return indexes;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -535,7 +535,7 @@ public void recycle(){



interface AddDataCallback {
public interface AddDataCallback {

void addComplete(Position position, Object context);

Expand Down
Loading

0 comments on commit 3630888

Please sign in to comment.