Skip to content

Commit

Permalink
[improve] [txn] [PIP-160] Transaction log store enable the batch feat…
Browse files Browse the repository at this point in the history
…ure (#16685)

Master Issue: #15370

### Motivation

see #15370

### Modifications

I will complete proposal #15370 with these pull requests( *current pull request is the step-3* ): 

1. Write the batch transaction log handler: `TxnLogBufferedWriter`
2. Configuration changes and protocol changes.
3. Transaction log store enables the batch feature.
4. Pending ack log store enables the batch feature.
5. Supports dynamic configuration.
6. Append admin API for transaction batch log and docs( admin and configuration doc ).
7. Append metrics support for transaction batch log.
  • Loading branch information
poorbarcode authored Jul 22, 2022
1 parent fd9c418 commit 4d16ad5
Show file tree
Hide file tree
Showing 17 changed files with 939 additions and 95 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.ConcurrentLinkedDeque;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.Semaphore;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
Expand Down Expand Up @@ -67,6 +68,7 @@
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.CoordinatorNotFoundException;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.InvalidTxnStatusException;
import org.apache.pulsar.transaction.coordinator.exceptions.CoordinatorException.TransactionMetadataStoreStateException;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
import org.apache.pulsar.transaction.coordinator.proto.TxnStatus;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -200,6 +202,17 @@ public CompletableFuture<Void> handleTcClientConnect(TransactionCoordinatorID tc
}

public CompletableFuture<TransactionMetadataStore> openTransactionMetadataStore(TransactionCoordinatorID tcId) {
final ScheduledExecutorService transactionLogBufferedWriteAsyncFlushTrigger =
pulsarService.getBrokerService().getTransactionTimer();
final ServiceConfiguration serviceConfiguration = pulsarService.getConfiguration();
final TxnLogBufferedWriterConfig txnLogBufferedWriterConfig = new TxnLogBufferedWriterConfig();
txnLogBufferedWriterConfig.setBatchEnabled(serviceConfiguration.isTransactionLogBatchedWriteEnabled());
txnLogBufferedWriterConfig
.setBatchedWriteMaxRecords(serviceConfiguration.getTransactionLogBatchedWriteMaxRecords());
txnLogBufferedWriterConfig.setBatchedWriteMaxSize(serviceConfiguration.getTransactionLogBatchedWriteMaxSize());
txnLogBufferedWriterConfig
.setBatchedWriteMaxDelayInMillis(serviceConfiguration.getTransactionLogBatchedWriteMaxDelayInMillis());

return pulsarService.getBrokerService()
.getManagedLedgerConfig(getMLTransactionLogName(tcId)).thenCompose(v -> {
TransactionTimeoutTracker timeoutTracker = timeoutTrackerFactory.newTracker(tcId);
Expand All @@ -209,7 +222,8 @@ public CompletableFuture<TransactionMetadataStore> openTransactionMetadataStore(
return transactionMetadataStoreProvider
.openStore(tcId, pulsarService.getManagedLedgerFactory(), v,
timeoutTracker, recoverTracker,
pulsarService.getConfig().getMaxActiveTransactionsPerCoordinator());
pulsarService.getConfig().getMaxActiveTransactionsPerCoordinator(),
txnLogBufferedWriterConfig, transactionLogBufferedWriteAsyncFlushTrigger);
});
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,6 +224,9 @@ public class BrokerService implements Closeable {
@Getter
private final ScheduledExecutorService backlogQuotaChecker;

@Getter
private final ScheduledExecutorService transactionTimer;

protected final AtomicReference<Semaphore> lookupRequestSemaphore;
protected final AtomicReference<Semaphore> topicLoadRequestSemaphore;

Expand Down Expand Up @@ -344,6 +347,8 @@ public BrokerService(PulsarService pulsar, EventLoopGroup eventLoopGroup) throws
this.backlogQuotaManager = new BacklogQuotaManager(pulsar);
this.backlogQuotaChecker = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker"));
this.transactionTimer = Executors
.newSingleThreadScheduledExecutor(new DefaultThreadFactory("pulsar-backlog-quota-checker"));
this.authenticationService = new AuthenticationService(pulsar.getConfiguration());
this.blockedDispatchers =
ConcurrentOpenHashSet.<PersistentDispatcherMultipleConsumers>newBuilder().build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,8 @@

import java.util.List;
import java.util.Map.Entry;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import com.google.common.collect.Sets;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
Expand All @@ -34,6 +36,7 @@
import org.apache.pulsar.common.stats.Metrics;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
import org.testng.Assert;
import org.testng.annotations.AfterClass;
import org.testng.annotations.BeforeClass;
Expand Down Expand Up @@ -94,17 +97,24 @@ public void testManagedLedgerMetrics() throws Exception {

@Test
public void testTransactionTopic() throws Exception {
TxnLogBufferedWriterConfig txnLogBufferedWriterConfig = new TxnLogBufferedWriterConfig();
txnLogBufferedWriterConfig.setBatchEnabled(false);
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(2);
admin.tenants().createTenant(NamespaceName.SYSTEM_NAMESPACE.getTenant(),
new TenantInfoImpl(Sets.newHashSet("appid1"), Sets.newHashSet("test")));
admin.namespaces().createNamespace(NamespaceName.SYSTEM_NAMESPACE.toString());
createTransactionCoordinatorAssign();
ManagedLedgerConfig managedLedgerConfig = new ManagedLedgerConfig();
managedLedgerConfig.setMaxEntriesPerLedger(2);
new MLTransactionLogImpl(TransactionCoordinatorID.get(0),
pulsar.getManagedLedgerFactory(), managedLedgerConfig)
.initialize().join();
MLTransactionLogImpl mlTransactionLog = new MLTransactionLogImpl(TransactionCoordinatorID.get(0),
pulsar.getManagedLedgerFactory(), managedLedgerConfig, txnLogBufferedWriterConfig,
scheduledExecutorService);
mlTransactionLog.initialize().get(2, TimeUnit.SECONDS);
ManagedLedgerMetrics metrics = new ManagedLedgerMetrics(pulsar);
metrics.generate();
// cleanup.
mlTransactionLog.closeAsync().get();
scheduledExecutorService.shutdown();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -130,6 +130,7 @@
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionSequenceIdGenerator;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionMetadataStore;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;
import org.awaitility.Awaitility;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
Expand Down Expand Up @@ -677,6 +678,8 @@ public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{

@Test
public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);

String topic = NAMESPACE1 + "/testEndTCRecoveringWhenManagerLedgerDisReadable";
admin.topics().createNonPartitionedTopic(topic);

Expand All @@ -699,7 +702,8 @@ public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{
persistentTopic.getManagedLedger().getConfig().setManagedLedgerInterceptor(mlTransactionSequenceIdGenerator);
MLTransactionLogImpl mlTransactionLog =
new MLTransactionLogImpl(new TransactionCoordinatorID(1), null,
persistentTopic.getManagedLedger().getConfig());
persistentTopic.getManagedLedger().getConfig(), new TxnLogBufferedWriterConfig(),
scheduledExecutorService);
Class<MLTransactionLogImpl> mlTransactionLogClass = MLTransactionLogImpl.class;
Field field = mlTransactionLogClass.getDeclaredField("cursor");
field.setAccessible(true);
Expand Down Expand Up @@ -746,6 +750,9 @@ public void testEndTCRecoveringWhenManagerLedgerDisReadable() throws Exception{
metadataStore3.init(transactionRecoverTracker).get();
Awaitility.await().untilAsserted(() ->
assertEquals(metadataStore3.getCoordinatorStats().state, "Ready"));

// cleanup.
scheduledExecutorService.shutdown();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,10 @@
import com.google.common.annotations.Beta;
import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.pulsar.transaction.coordinator.impl.TxnLogBufferedWriterConfig;

/**
* A provider that provides {@link TransactionMetadataStore}.
Expand Down Expand Up @@ -68,5 +70,6 @@ static TransactionMetadataStoreProvider newProvider(String providerClassName) th
CompletableFuture<TransactionMetadataStore> openStore(
TransactionCoordinatorID transactionCoordinatorId, ManagedLedgerFactory managedLedgerFactory,
ManagedLedgerConfig managedLedgerConfig, TransactionTimeoutTracker timeoutTracker,
TransactionRecoverTracker recoverTracker, long maxActiveTransactionsPerCoordinator);
TransactionRecoverTracker recoverTracker, long maxActiveTransactionsPerCoordinator,
TxnLogBufferedWriterConfig txnLogBufferedWriterConfig, ScheduledExecutorService scheduledExecutorService);
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.pulsar.transaction.coordinator.impl;

import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import org.apache.bookkeeper.mledger.ManagedLedgerConfig;
import org.apache.bookkeeper.mledger.ManagedLedgerFactory;
import org.apache.pulsar.transaction.coordinator.TransactionCoordinatorID;
Expand All @@ -38,7 +39,9 @@ public CompletableFuture<TransactionMetadataStore> openStore(TransactionCoordina
ManagedLedgerConfig managedLedgerConfig,
TransactionTimeoutTracker timeoutTracker,
TransactionRecoverTracker recoverTracker,
long maxActiveTransactionsPerCoordinator) {
long maxActiveTransactionsPerCoordinator,
TxnLogBufferedWriterConfig txnLogBufferedWriterConfig,
ScheduledExecutorService scheduledExecutorService) {
return CompletableFuture.completedFuture(
new InMemTransactionMetadataStore(transactionCoordinatorId));
}
Expand Down
Loading

0 comments on commit 4d16ad5

Please sign in to comment.