diff --git a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java index dedb7beb0ce0b7..b70836f433d239 100644 --- a/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java +++ b/managed-ledger/src/main/java/org/apache/bookkeeper/mledger/impl/PositionImpl.java @@ -115,6 +115,18 @@ public int compareTo(PositionImpl that) { return 0; } + public int compareTo(long ledgerId, long entryId) { + if (this.ledgerId != ledgerId) { + return (this.ledgerId < ledgerId ? -1 : 1); + } + + if (this.entryId != entryId) { + return (this.entryId < entryId ? -1 : 1); + } + + return 0; + } + @Override public int hashCode() { int result = (int) (ledgerId ^ (ledgerId >>> 32)); diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java index dd58fe774a8fd8..649016349d7823 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStore.java @@ -18,8 +18,11 @@ */ package org.apache.pulsar.broker.transaction.pendingack.impl; +import static org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriter.BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER; +import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ComparisonChain; import io.netty.buffer.ByteBuf; +import io.netty.util.concurrent.FastThreadLocal; import java.util.ArrayList; import java.util.Collections; import java.util.List; @@ -27,6 +30,7 @@ import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentSkipListMap; import java.util.concurrent.ExecutorService; +import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.atomic.AtomicLong; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.Entry; @@ -34,11 +38,13 @@ import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; import org.apache.bookkeeper.mledger.Position; +import org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl; import org.apache.bookkeeper.mledger.impl.PositionImpl; import org.apache.commons.lang3.tuple.MutablePair; import org.apache.pulsar.broker.service.BrokerServiceException.PersistenceException; import org.apache.pulsar.broker.transaction.pendingack.PendingAckReplyCallBack; import org.apache.pulsar.broker.transaction.pendingack.PendingAckStore; +import org.apache.pulsar.broker.transaction.pendingack.proto.BatchedPendingAckMetadataEntry; import org.apache.pulsar.broker.transaction.pendingack.proto.PendingAckMetadata; import org.apache.pulsar.broker.transaction.pendingack.proto.PendingAckMetadataEntry; import org.apache.pulsar.broker.transaction.pendingack.proto.PendingAckOp; @@ -48,6 +54,10 @@ import org.apache.pulsar.common.api.proto.CommandAck.AckType; import org.apache.pulsar.common.naming.SystemTopicNames; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.common.util.collections.BitSetRecyclable; +import org.apache.pulsar.transaction.coordinator.impl.TxnBatchedPositionImpl; +import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriter; +import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig; import org.jctools.queues.MessagePassingQueue; import org.jctools.queues.SpscArrayQueue; import org.slf4j.Logger; @@ -76,6 +86,8 @@ public class MLPendingAckStore implements PendingAckStore { protected PositionImpl maxAckPosition = PositionImpl.EARLIEST; private final LogIndexLagBackoff logIndexBackoff; + private final ArrayList batchedPendingAckLogsWaitingForHandle; + /** * The map is for pending ack store clear useless data. *

@@ -89,12 +101,16 @@ public class MLPendingAckStore implements PendingAckStore { * If the max position (key) is smaller than the subCursor mark delete position, * the log cursor will mark delete the position before log position (value). */ - private final ConcurrentSkipListMap pendingAckLogIndex; + final ConcurrentSkipListMap pendingAckLogIndex; private final ManagedCursor subManagedCursor; + private TxnLogBufferedWriter bufferedWriter; + public MLPendingAckStore(ManagedLedger managedLedger, ManagedCursor cursor, - ManagedCursor subManagedCursor, long transactionPendingAckLogIndexMinLag) { + ManagedCursor subManagedCursor, long transactionPendingAckLogIndexMinLag, + TxnLogBufferedWriterConfig bufferedWriterConfig, + ScheduledExecutorService scheduledExecutorService) { this.managedLedger = managedLedger; this.cursor = cursor; this.currentLoadPosition = (PositionImpl) this.cursor.getMarkDeletedPosition(); @@ -104,6 +120,11 @@ public MLPendingAckStore(ManagedLedger managedLedger, ManagedCursor cursor, this.subManagedCursor = subManagedCursor; this.logIndexBackoff = new LogIndexLagBackoff(transactionPendingAckLogIndexMinLag, Long.MAX_VALUE, 1); this.maxIndexLag = logIndexBackoff.next(0); + this.bufferedWriter = new TxnLogBufferedWriter(managedLedger, ((ManagedLedgerImpl) managedLedger).getExecutor(), + scheduledExecutorService, PendingAckLogSerializer.INSTANCE, + bufferedWriterConfig.getBatchedWriteMaxRecords(), bufferedWriterConfig.getBatchedWriteMaxSize(), + bufferedWriterConfig.getBatchedWriteMaxDelayInMillis(), bufferedWriterConfig.isBatchEnabled()); + this.batchedPendingAckLogsWaitingForHandle = new ArrayList<>(bufferedWriterConfig.getBatchedWriteMaxRecords()); } @Override @@ -131,6 +152,7 @@ public void closeComplete(Object ctx) { if (log.isDebugEnabled()) { log.debug("[{}][{}] MLPendingAckStore closed successfully!", managedLedger.getName(), ctx); } + bufferedWriter.close(); completableFuture.complete(null); } @@ -213,22 +235,26 @@ private CompletableFuture appendCommon(PendingAckMetadataEntry pendingAckM CompletableFuture completableFuture = new CompletableFuture<>(); pendingAckMetadataEntry.setTxnidLeastBits(txnID.getLeastSigBits()); pendingAckMetadataEntry.setTxnidMostBits(txnID.getMostSigBits()); - int transactionMetadataEntrySize = pendingAckMetadataEntry.getSerializedSize(); - ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(transactionMetadataEntrySize, transactionMetadataEntrySize); - pendingAckMetadataEntry.writeTo(buf); - managedLedger.asyncAddEntry(buf, new AsyncCallbacks.AddEntryCallback() { + bufferedWriter.asyncAddData(pendingAckMetadataEntry, new TxnLogBufferedWriter.AddDataCallback() { @Override - public void addComplete(Position position, ByteBuf entryData, Object ctx) { + public void addComplete(Position position, Object ctx) { if (log.isDebugEnabled()) { log.debug("[{}][{}] MLPendingAckStore message append success at {} txnId: {}, operation : {}", managedLedger.getName(), ctx, position, txnID, pendingAckMetadataEntry.getPendingAckOp()); } currentIndexLag.incrementAndGet(); - handleMetadataEntry((PositionImpl) position, pendingAckMetadataEntry); - buf.release(); - completableFuture.complete(null); + if (position instanceof TxnBatchedPositionImpl batchedPosition){ + batchedPendingAckLogsWaitingForHandle.add(pendingAckMetadataEntry); + if (batchedPosition.getBatchIndex() == batchedPosition.getBatchSize() - 1){ + handleMetadataEntry((PositionImpl) position, batchedPendingAckLogsWaitingForHandle); + batchedPendingAckLogsWaitingForHandle.clear(); + } + } else { + handleMetadataEntry((PositionImpl) position, pendingAckMetadataEntry); + } + completableFuture.complete(null); clearUselessLogData(); } @@ -240,13 +266,48 @@ public void addFailed(ManagedLedgerException exception, Object ctx) { if (exception instanceof ManagedLedgerException.ManagedLedgerAlreadyClosedException) { managedLedger.readyToCreateNewLedger(); } - buf.release(); completableFuture.completeExceptionally(new PersistenceException(exception)); } }, null); return completableFuture; } + /** + * Build the index mapping of Transaction pending ack log (aka t-log) and Topic message log (aka m-log). + * When m-log has been ack, t-log which holds m-log is no longer useful, this method builder the mapping of them. + * + * If a Ledger Entry has many t-log, we only need to care about the record that carries the largest acknowledgement + * info. Because all Commit/Abort log after this record describes behavior acknowledgement, if the behavior + * acknowledgement has been handle correct, these Commit/Abort log is no longer useful. + * @param logPosition The position of batch log Entry. + * @param logList Pending ack log records in a batch log Entry. + */ + private void handleMetadataEntry(PositionImpl logPosition, List logList) { + // Find the record that carries the largest ack info, and call "handleMetadataEntry(position, pendingAckLog)" + PendingAckMetadataEntry pendingAckLogHasMaxAckPosition = null; + PositionImpl maxAcknowledgementPosition = this.maxAckPosition; + for (int i = logList.size() - 1; i >= 0; i--){ + PendingAckMetadataEntry pendingAckLog = logList.get(i); + if (pendingAckLog.getPendingAckOp() == PendingAckOp.ABORT + && pendingAckLog.getPendingAckOp() == PendingAckOp.COMMIT) { + continue; + } + if (pendingAckLog.getPendingAckMetadatasList().isEmpty()){ + continue; + } + for (PendingAckMetadata ack : pendingAckLog.getPendingAckMetadatasList()){ + if (maxAcknowledgementPosition.compareTo(ack.getLedgerId(), ack.getEntryId()) < 0){ + maxAcknowledgementPosition = PositionImpl.get(ack.getLedgerId(), ack.getEntryId()); + pendingAckLogHasMaxAckPosition = pendingAckLog; + } + } + } + + if (pendingAckLogHasMaxAckPosition != null) { + handleMetadataEntry(logPosition, pendingAckLogHasMaxAckPosition); + } + } + private void handleMetadataEntry(PositionImpl logPosition, PendingAckMetadataEntry pendingAckMetadataEntry) { // store the persistent position in to memory // store the max position of this entry retain @@ -273,7 +334,8 @@ private void handleMetadataEntry(PositionImpl logPosition, PendingAckMetadataEnt } } - private void clearUselessLogData() { + @VisibleForTesting + void clearUselessLogData() { if (!pendingAckLogIndex.isEmpty()) { PositionImpl deletePosition = null; while (!pendingAckLogIndex.isEmpty() @@ -332,14 +394,41 @@ public void run() { while (lastConfirmedEntry.compareTo(currentLoadPosition) > 0 && fillEntryQueueCallback.fillQueue()) { Entry entry = entryQueue.poll(); if (entry != null) { - ByteBuf buffer = entry.getDataBuffer(); currentLoadPosition = PositionImpl.get(entry.getLedgerId(), entry.getEntryId()); - PendingAckMetadataEntry pendingAckMetadataEntry = new PendingAckMetadataEntry(); - pendingAckMetadataEntry.parseFrom(buffer, buffer.readableBytes()); - currentIndexLag.incrementAndGet(); - handleMetadataEntry(new PositionImpl(entry.getLedgerId(), entry.getEntryId()), - pendingAckMetadataEntry); - pendingAckReplyCallBack.handleMetadataEntry(pendingAckMetadataEntry); + List logs = deserializeEntry(entry); + if (logs.isEmpty()){ + continue; + } else if (logs.size() == 1){ + currentIndexLag.incrementAndGet(); + PendingAckMetadataEntry log = logs.get(0); + handleMetadataEntry(new PositionImpl(entry.getLedgerId(), entry.getEntryId()), log); + pendingAckReplyCallBack.handleMetadataEntry(log); + } else { + /** + * 1. Query batch index of current entry from cursor. + * 2. Filter the data which has already ack. + * 3. Build batched position and handle valid data. + */ + long[] ackSetAlreadyAck = cursor.getDeletedBatchIndexesAsLongArray( + PositionImpl.get(entry.getLedgerId(), entry.getEntryId())); + BitSetRecyclable bitSetAlreadyAck = null; + if (ackSetAlreadyAck != null){ + bitSetAlreadyAck = BitSetRecyclable.valueOf(ackSetAlreadyAck); + } + int batchSize = logs.size(); + for (int batchIndex = 0; batchIndex < batchSize; batchIndex++){ + if (bitSetAlreadyAck != null && !bitSetAlreadyAck.get(batchIndex)){ + continue; + } + PendingAckMetadataEntry log = logs.get(batchIndex); + pendingAckReplyCallBack.handleMetadataEntry(log); + } + currentIndexLag.addAndGet(batchSize); + handleMetadataEntry(new PositionImpl(entry.getLedgerId(), entry.getEntryId()), logs); + if (ackSetAlreadyAck != null){ + bitSetAlreadyAck.recycle(); + } + } entry.release(); clearUselessLogData(); } else { @@ -362,6 +451,26 @@ public void run() { } } + private List deserializeEntry(Entry entry){ + ByteBuf buffer = entry.getDataBuffer(); + // Check whether it is batched Entry. + buffer.markReaderIndex(); + short magicNum = buffer.readShort(); + buffer.resetReaderIndex(); + if (magicNum == BATCHED_ENTRY_DATA_PREFIX_MAGIC_NUMBER){ + // skip version + buffer.skipBytes(4); + BatchedPendingAckMetadataEntry batchedPendingAckMetadataEntry = localBatchedPendingAckLogCache.get(); + batchedPendingAckMetadataEntry.clear(); + batchedPendingAckMetadataEntry.parseFrom(buffer, buffer.readableBytes()); + return batchedPendingAckMetadataEntry.getPendingAckLogsList(); + } else { + PendingAckMetadataEntry pendingAckMetadataEntry = new PendingAckMetadataEntry(); + pendingAckMetadataEntry.parseFrom(buffer, buffer.readableBytes()); + return Collections.singletonList(pendingAckMetadataEntry); + } + } + class FillEntryQueueCallback implements AsyncCallbacks.ReadEntriesCallback { private volatile boolean isReadable = true; @@ -421,4 +530,43 @@ public static String getTransactionPendingAckStoreCursorName() { } private static final Logger log = LoggerFactory.getLogger(MLPendingAckStore.class); + + private static final FastThreadLocal localBatchedPendingAckLogCache = + new FastThreadLocal<>() { + @Override + protected BatchedPendingAckMetadataEntry initialValue() throws Exception { + return new BatchedPendingAckMetadataEntry(); + } + }; + + private static class PendingAckLogSerializer + implements TxnLogBufferedWriter.DataSerializer{ + + private static final PendingAckLogSerializer INSTANCE = new PendingAckLogSerializer(); + + @Override + public int getSerializedSize(PendingAckMetadataEntry data) { + return data.getSerializedSize(); + } + + @Override + public ByteBuf serialize(PendingAckMetadataEntry data) { + int batchSize = data.getSerializedSize(); + ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(batchSize, batchSize); + data.writeTo(buf); + return buf; + } + + @Override + public ByteBuf serialize(ArrayList dataArray) { + // Since all writes are in the same thread, so we can use threadLocal variables here. + BatchedPendingAckMetadataEntry batch = localBatchedPendingAckLogCache.get(); + batch.clear(); + batch.addAllPendingAckLogs(dataArray); + int batchSize = batch.getSerializedSize(); + ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(batchSize, batchSize); + batch.writeTo(buf); + return buf; + } + } } \ No newline at end of file diff --git a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java index 6b84d6e329a3ec..818223f7a67f76 100644 --- a/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java +++ b/pulsar-broker/src/main/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreProvider.java @@ -19,11 +19,14 @@ package org.apache.pulsar.broker.transaction.pendingack.impl; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ScheduledExecutorService; import lombok.extern.slf4j.Slf4j; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedCursor; import org.apache.bookkeeper.mledger.ManagedLedger; import org.apache.bookkeeper.mledger.ManagedLedgerException; +import org.apache.pulsar.broker.PulsarService; +import org.apache.pulsar.broker.ServiceConfiguration; import org.apache.pulsar.broker.service.persistent.PersistentSubscription; import org.apache.pulsar.broker.service.persistent.PersistentTopic; import org.apache.pulsar.broker.transaction.exception.pendingack.TransactionPendingAckException; @@ -31,6 +34,7 @@ import org.apache.pulsar.broker.transaction.pendingack.TransactionPendingAckStoreProvider; import org.apache.pulsar.common.api.proto.CommandSubscribe.InitialPosition; import org.apache.pulsar.common.naming.TopicName; +import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig; /** @@ -49,7 +53,25 @@ public CompletableFuture newPendingAckStore(PersistentSubscript .TransactionPendingAckStoreProviderException("The subscription is null.")); return pendingAckStoreFuture; } + PersistentTopic originPersistentTopic = (PersistentTopic) subscription.getTopic(); + PulsarService pulsarService = originPersistentTopic.getBrokerService().getPulsar(); + + final ScheduledExecutorService transactionLogExecutor = + pulsarService.getBrokerService().getTransactionLogBufferedWriteAsyncFlushTrigger(); + final ServiceConfiguration serviceConfiguration = pulsarService.getConfiguration(); + final TxnLogBufferedWriterConfig txnLogBufferedWriterConfig = new TxnLogBufferedWriterConfig(); + txnLogBufferedWriterConfig.setBatchEnabled(serviceConfiguration.isTransactionPendingAckBatchedWriteEnabled()); + txnLogBufferedWriterConfig.setBatchedWriteMaxRecords( + serviceConfiguration.getTransactionPendingAckBatchedWriteMaxRecords() + ); + txnLogBufferedWriterConfig.setBatchedWriteMaxSize( + serviceConfiguration.getTransactionPendingAckBatchedWriteMaxSize() + ); + txnLogBufferedWriterConfig.setBatchedWriteMaxDelayInMillis( + serviceConfiguration.getTransactionPendingAckBatchedWriteMaxDelayInMillis() + ); + String pendingAckTopicName = MLPendingAckStore .getTransactionPendingAckStoreSuffix(originPersistentTopic.getName(), subscription.getName()); originPersistentTopic.getBrokerService().getManagedLedgerFactory() @@ -81,7 +103,9 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) { .getBrokerService() .getPulsar() .getConfiguration() - .getTransactionPendingAckLogIndexMinLag())); + .getTransactionPendingAckLogIndexMinLag(), + txnLogBufferedWriterConfig, + transactionLogExecutor)); if (log.isDebugEnabled()) { log.debug("{},{} open MLPendingAckStore cursor success", originPersistentTopic.getName(), diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java index 30eded8eb25a3c..02b85e38510145 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/TransactionTest.java @@ -130,6 +130,7 @@ import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator; import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore; +import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig; import org.awaitility.Awaitility; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -609,6 +610,10 @@ public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{ @Test public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{ + TxnLogBufferedWriterConfig bufferedWriterConfig = new TxnLogBufferedWriterConfig(); + bufferedWriterConfig.setBatchEnabled(true); + ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); + String topic = NAMESPACE1 + "/testEndTPRecoveringWhenManagerLedgerDisReadable"; admin.topics().createNonPartitionedTopic(topic); @Cleanup @@ -640,7 +645,8 @@ public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{ TransactionPendingAckStoreProvider pendingAckStoreProvider = mock(TransactionPendingAckStoreProvider.class); doReturn(CompletableFuture.completedFuture( - new MLPendingAckStore(persistentTopic.getManagedLedger(), managedCursor, null, 500))) + new MLPendingAckStore(persistentTopic.getManagedLedger(), managedCursor, null, + 500, bufferedWriterConfig, scheduledExecutorService))) .when(pendingAckStoreProvider).newPendingAckStore(any()); doReturn(CompletableFuture.completedFuture(true)).when(pendingAckStoreProvider).checkInitializedBefore(any()); diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckMetadataTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckMetadataTest.java index 14dbcdb8897f68..0ae0ab79212362 100644 --- a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckMetadataTest.java +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/PendingAckMetadataTest.java @@ -18,6 +18,8 @@ */ package org.apache.pulsar.broker.transaction.pendingack; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; import lombok.Cleanup; import org.apache.bookkeeper.mledger.AsyncCallbacks; import org.apache.bookkeeper.mledger.ManagedCursor; @@ -31,6 +33,7 @@ import org.apache.pulsar.broker.transaction.pendingack.impl.MLPendingAckStore; import org.apache.pulsar.client.api.transaction.TxnID; import org.apache.pulsar.common.api.proto.CommandAck; +import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig; import org.testng.annotations.Test; import java.lang.reflect.Field; import java.util.concurrent.CompletableFuture; @@ -48,6 +51,9 @@ public PendingAckMetadataTest() { @Test public void testPendingAckManageLedgerWriteFailState() throws Exception { + TxnLogBufferedWriterConfig bufferedWriterConfig = new TxnLogBufferedWriterConfig(); + ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1); + ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig(); factoryConf.setMaxCacheSize(0); @@ -72,7 +78,8 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { ManagedCursor cursor = completableFuture.get().openCursor("test"); ManagedCursor subCursor = completableFuture.get().openCursor("test"); MLPendingAckStore pendingAckStore = - new MLPendingAckStore(completableFuture.get(), cursor, subCursor, 500); + new MLPendingAckStore(completableFuture.get(), cursor, subCursor, 500, + bufferedWriterConfig, scheduledExecutorService); Field field = MLPendingAckStore.class.getDeclaredField("managedLedger"); field.setAccessible(true); @@ -90,9 +97,12 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) { } pendingAckStore.appendAbortMark(new TxnID(1, 1), CommandAck.AckType.Cumulative).get(); + // cleanup. + pendingAckStore.closeAsync(); completableFuture.get().close(); cursor.close(); subCursor.close(); + scheduledExecutorService.shutdown(); } } diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java new file mode 100644 index 00000000000000..702b5769d7c29a --- /dev/null +++ b/pulsar-broker/src/test/java/org/apache/pulsar/broker/transaction/pendingack/impl/MLPendingAckStoreTest.java @@ -0,0 +1,227 @@ +package org.apache.pulsar.broker.transaction.pendingack.impl; + +import java.util.ArrayList; +import java.util.Iterator; +import java.util.LinkedHashSet; +import java.util.List; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.stream.Collectors; +import lombok.extern.slf4j.Slf4j; +import org.apache.bookkeeper.mledger.ManagedCursor; +import org.apache.bookkeeper.mledger.impl.PositionImpl; +import org.apache.pulsar.broker.ServiceConfiguration; +import org.apache.pulsar.broker.service.Subscription; +import org.apache.pulsar.broker.service.persistent.PersistentSubscription; +import org.apache.pulsar.broker.service.persistent.PersistentTopic; +import org.apache.pulsar.broker.transaction.TransactionTestBase; +import org.apache.pulsar.broker.transaction.util.LogIndexLagBackoff; +import org.apache.pulsar.client.api.transaction.TxnID; +import org.apache.pulsar.common.api.proto.CommandAck; +import org.apache.pulsar.common.api.proto.CommandSubscribe; +import org.apache.pulsar.common.util.FutureUtil; +import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig; +import static org.mockito.Mockito.*; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; +import org.testcontainers.shaded.org.awaitility.Awaitility; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.DataProvider; +import org.testng.annotations.Test; + +@Slf4j +@Test(groups = "broker") +public class MLPendingAckStoreTest extends TransactionTestBase { + + private PersistentSubscription persistentSubscriptionMock; + + private ManagedCursor managedCursorMock; + + private ExecutorService internalPinnedExecutor; + + private int pendingAckLogIndexMinLag = 1; + + @BeforeMethod + @Override + protected void setup() throws Exception { + setUpBase(1, 1, NAMESPACE1 + "/test", 0); + String topic = NAMESPACE1 + "/test-txn-topic"; + admin.topics().createNonPartitionedTopic(topic); + PersistentTopic persistentTopic = (PersistentTopic) getPulsarServiceList().get(0).getBrokerService() + .getTopic(topic, false).get().get(); + getPulsarServiceList().get(0).getConfig().setTransactionPendingAckLogIndexMinLag(pendingAckLogIndexMinLag); + CompletableFuture subscriptionFuture = persistentTopic .createSubscription("test", + CommandSubscribe.InitialPosition.Earliest, false, null); + PersistentSubscription subscription = (PersistentSubscription) subscriptionFuture.get(); + ManagedCursor managedCursor = subscription.getCursor(); + this.managedCursorMock = spy(managedCursor); + this.persistentSubscriptionMock = spy(subscription); + when(this.persistentSubscriptionMock.getCursor()).thenReturn(managedCursorMock); + this.internalPinnedExecutor = this.persistentSubscriptionMock + .getTopic() + .getBrokerService() + .getPulsar() + .getTransactionExecutorProvider() + .getExecutor(this); + } + + @AfterMethod + public void cleanup(){ + super.internalCleanup(); + } + + private MLPendingAckStore createPendingAckStore(TxnLogBufferedWriterConfig txnLogBufferedWriterConfig) + throws Exception { + MLPendingAckStoreProvider mlPendingAckStoreProvider = new MLPendingAckStoreProvider(); + ServiceConfiguration serviceConfiguration = + persistentSubscriptionMock.getTopic().getBrokerService().getPulsar().getConfiguration(); + serviceConfiguration.setTransactionPendingAckBatchedWriteMaxRecords( + txnLogBufferedWriterConfig.getBatchedWriteMaxRecords() + ); + serviceConfiguration.setTransactionPendingAckBatchedWriteMaxSize( + txnLogBufferedWriterConfig.getBatchedWriteMaxSize() + ); + serviceConfiguration.setTransactionPendingAckBatchedWriteMaxDelayInMillis( + txnLogBufferedWriterConfig.getBatchedWriteMaxDelayInMillis() + ); + serviceConfiguration.setTransactionPendingAckBatchedWriteEnabled(txnLogBufferedWriterConfig.isBatchEnabled()); + return (MLPendingAckStore) mlPendingAckStoreProvider.newPendingAckStore(persistentSubscriptionMock).get(); + } + + @DataProvider(name = "mainProcessArgs") + public Object[][] mainProcessArgsProvider(){ + Object[][] args = new Object[4][]; + args[0] = new Object[]{true, true}; + args[1] = new Object[]{false, false}; + args[2] = new Object[]{true, false}; + args[3] = new Object[]{false, true}; + return args; + } + + @Test(dataProvider = "mainProcessArgs") + public void test1(boolean writeWithBatch, boolean readWithBatch) throws Exception { + // Write some data. + TxnLogBufferedWriterConfig configForWrite = new TxnLogBufferedWriterConfig(); + configForWrite.setBatchEnabled(writeWithBatch); + configForWrite.setBatchedWriteMaxRecords(2); + // Denied scheduled flush. + configForWrite.setBatchedWriteMaxDelayInMillis(1000 * 3600); + MLPendingAckStore mlPendingAckStoreForWrite = createPendingAckStore(configForWrite); + List> futureList = new ArrayList<>(); + for (int i = 0; i < 20; i++){ + TxnID txnID = new TxnID(i, i); + PositionImpl position = PositionImpl.get(i, i); + futureList.add(mlPendingAckStoreForWrite.appendCumulativeAck(txnID, position)); + } + for (int i = 0; i < 10; i++){ + TxnID txnID = new TxnID(i, i); + futureList.add(mlPendingAckStoreForWrite.appendCommitMark(txnID, CommandAck.AckType.Cumulative)); + } + for (int i = 10; i < 20; i++){ + TxnID txnID = new TxnID(i, i); + futureList.add(mlPendingAckStoreForWrite.appendAbortMark(txnID, CommandAck.AckType.Cumulative)); + } + for (int i = 40; i < 50; i++){ + TxnID txnID = new TxnID(i, i); + PositionImpl position = PositionImpl.get(i, i); + futureList.add(mlPendingAckStoreForWrite.appendCumulativeAck(txnID, position)); + } + FutureUtil.waitForAll(futureList).get(); + // Verify build sparse indexes correct after add many cmd-ack. + ArrayList positionList = new ArrayList<>(); + for (long i = 0; i < 50; i++){ + positionList.add(i); + } + // The indexes not contains the data which is commit or abort. + LinkedHashSet skipSet = new LinkedHashSet<>(); + for (long i = 20; i < 40; i++){ + skipSet.add(i); + } + if (writeWithBatch) { + for (long i = 0; i < 50; i++){ + if (i % 2 == 0){ + // The indexes contains only the last position in the batch. + skipSet.add(i); + } + } + } + LinkedHashSet expectedPositions = calculatePendingAckIndexes(positionList, skipSet); + Assert.assertEquals(mlPendingAckStoreForWrite.pendingAckLogIndex.keySet().stream().map(PositionImpl::getEntryId).collect( + Collectors.toList()), new ArrayList<>(expectedPositions)); + // Replay. + TxnLogBufferedWriterConfig configForReplay = new TxnLogBufferedWriterConfig(); + configForReplay.setBatchEnabled(readWithBatch); + configForReplay.setBatchedWriteMaxRecords(2); + // Denied scheduled flush. + configForReplay.setBatchedWriteMaxDelayInMillis(1000 * 3600); + MLPendingAckStore mlPendingAckStoreForRead = createPendingAckStore(configForReplay); + PendingAckHandleImpl pendingAckHandle = mock(PendingAckHandleImpl.class); + when(pendingAckHandle.getInternalPinnedExecutor()).thenReturn(internalPinnedExecutor); + when(pendingAckHandle.changeToReadyState()).thenReturn(true); + // Process controller, mark the replay task already finish. + final AtomicInteger processController = new AtomicInteger(); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + processController.incrementAndGet(); + return null; + } + }).when(pendingAckHandle).completeHandleFuture(); + mlPendingAckStoreForRead.replayAsync(pendingAckHandle, internalPinnedExecutor); + Awaitility.await().atMost(200, TimeUnit.SECONDS).until(() -> processController.get() == 1); + // Verify build sparse indexes correct after replay. + Assert.assertEquals(mlPendingAckStoreForRead.pendingAckLogIndex.size(), + mlPendingAckStoreForWrite.pendingAckLogIndex.size()); + Iterator> iteratorReplay = + mlPendingAckStoreForRead.pendingAckLogIndex.entrySet().iterator(); + Iterator> iteratorWrite = + mlPendingAckStoreForWrite.pendingAckLogIndex.entrySet().iterator(); + while (iteratorReplay.hasNext()){ + Map.Entry replayEntry = iteratorReplay.next(); + Map.Entry writeEntry = iteratorWrite.next(); + Assert.assertEquals(replayEntry.getKey(), writeEntry.getKey()); + Assert.assertEquals(replayEntry.getValue().getLedgerId(), writeEntry.getValue().getLedgerId()); + Assert.assertEquals(replayEntry.getValue().getEntryId(), writeEntry.getValue().getEntryId()); + } + // Verify delete correct. + when(managedCursorMock.getPersistentMarkDeletedPosition()).thenReturn(PositionImpl.get(19, 19)); + mlPendingAckStoreForWrite.clearUselessLogData(); + mlPendingAckStoreForRead.clearUselessLogData(); + Assert.assertTrue(mlPendingAckStoreForWrite.pendingAckLogIndex.keySet().iterator().next().getEntryId() > 19); + Assert.assertTrue(mlPendingAckStoreForRead.pendingAckLogIndex.keySet().iterator().next().getEntryId() > 19); + + // cleanup. + mlPendingAckStoreForWrite.closeAsync().get(); + mlPendingAckStoreForRead.closeAsync().get(); + } + + /** + * Build a sparse index from the {@param positionList}, the logic same as {@link MLPendingAckStore}. + * @param positionList the position add to pending ack log/ + * @param skipSet the position which should increment the count but not marked to indexes. aka: commit & abort. + */ + private LinkedHashSet calculatePendingAckIndexes(List positionList, LinkedHashSet skipSet){ + LogIndexLagBackoff logIndexLagBackoff = new LogIndexLagBackoff(pendingAckLogIndexMinLag, Long.MAX_VALUE, 1); + long nextCount = logIndexLagBackoff.next(0); + long recordCountInCurrentLoop = 0; + LinkedHashSet indexes = new LinkedHashSet<>(); + for (int i = 0; i < positionList.size(); i++){ + recordCountInCurrentLoop ++; + long value = positionList.get(i); + if (skipSet.contains(value)){ + continue; + } + if (recordCountInCurrentLoop >= nextCount){ + indexes.add(value); + nextCount = logIndexLagBackoff.next(indexes.size()); + recordCountInCurrentLoop = 0; + } + } + return indexes; + } +} \ No newline at end of file diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java index b3e4a7a8641901..061a0ac66154ca 100644 --- a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriter.java @@ -535,7 +535,7 @@ public void recycle(){ - interface AddDataCallback { + public interface AddDataCallback { void addComplete(Position position, Object context); diff --git a/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterConfig.java b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterConfig.java new file mode 100644 index 00000000000000..a056fd1bb9cc80 --- /dev/null +++ b/pulsar-transaction/coordinator/src/main/java/org/apache/pulsar/transaction/coordinator/impl/TxnLogBufferedWriterConfig.java @@ -0,0 +1,30 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.pulsar.transaction.coordinator.impl; + +import lombok.Data; + +@Data +public class TxnLogBufferedWriterConfig { + + private int batchedWriteMaxRecords = 512; + private int batchedWriteMaxSize = 1024 * 1024 * 4; + private int batchedWriteMaxDelayInMillis = 1; + private boolean batchEnabled = false; +}