Skip to content

Commit

Permalink
Instead scheduler executor to io.netty.timer
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Jul 26, 2022
1 parent f48e17e commit 346dd5d
Show file tree
Hide file tree
Showing 4 changed files with 20 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ComparisonChain;
import io.netty.buffer.ByteBuf;
import io.netty.util.Timer;
import io.netty.util.concurrent.FastThreadLocal;
import java.util.ArrayList;
import java.util.Collections;
Expand All @@ -30,7 +31,6 @@
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Predicate;
import java.util.stream.Stream;
Expand Down Expand Up @@ -117,7 +117,7 @@ public class MLPendingAckStore implements PendingAckStore {
public MLPendingAckStore(ManagedLedger managedLedger, ManagedCursor cursor,
ManagedCursor subManagedCursor, long transactionPendingAckLogIndexMinLag,
TxnLogBufferedWriterConfig bufferedWriterConfig,
ScheduledExecutorService scheduledExecutorService) {
Timer timer) {
this.managedLedger = managedLedger;
this.cursor = cursor;
this.currentLoadPosition = (PositionImpl) this.cursor.getMarkDeletedPosition();
Expand All @@ -128,7 +128,7 @@ public MLPendingAckStore(ManagedLedger managedLedger, ManagedCursor cursor,
this.logIndexBackoff = new LogIndexLagBackoff(transactionPendingAckLogIndexMinLag, Long.MAX_VALUE, 1);
this.maxIndexLag = logIndexBackoff.next(0);
this.bufferedWriter = new TxnLogBufferedWriter(managedLedger, ((ManagedLedgerImpl) managedLedger).getExecutor(),
scheduledExecutorService, PendingAckLogSerializer.INSTANCE,
timer, PendingAckLogSerializer.INSTANCE,
bufferedWriterConfig.getBatchedWriteMaxRecords(), bufferedWriterConfig.getBatchedWriteMaxSize(),
bufferedWriterConfig.getBatchedWriteMaxDelayInMillis(), bufferedWriterConfig.isBatchEnabled());
this.batchedPendingAckLogsWaitingForHandle = new ArrayList<>();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,8 @@
*/
package org.apache.pulsar.broker.transaction.pendingack.impl;

import io.netty.util.Timer;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ScheduledExecutorService;
import lombok.extern.slf4j.Slf4j;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
Expand Down Expand Up @@ -57,8 +57,8 @@ public CompletableFuture<PendingAckStore> newPendingAckStore(PersistentSubscript
PersistentTopic originPersistentTopic = (PersistentTopic) subscription.getTopic();
PulsarService pulsarService = originPersistentTopic.getBrokerService().getPulsar();

final ScheduledExecutorService transactionLogScheduledExecutor =
pulsarService.getBrokerService().getTransactionTimer();
final Timer brokerClientSharedTimer =
pulsarService.getBrokerClientSharedTimer();
final ServiceConfiguration serviceConfiguration = pulsarService.getConfiguration();
final TxnLogBufferedWriterConfig txnLogBufferedWriterConfig = new TxnLogBufferedWriterConfig();
txnLogBufferedWriterConfig.setBatchEnabled(serviceConfiguration.isTransactionPendingAckBatchedWriteEnabled());
Expand Down Expand Up @@ -105,7 +105,7 @@ public void openCursorComplete(ManagedCursor cursor, Object ctx) {
.getConfiguration()
.getTransactionPendingAckLogIndexMinLag(),
txnLogBufferedWriterConfig,
transactionLogScheduledExecutor));
brokerClientSharedTimer));
if (log.isDebugEnabled()) {
log.debug("{},{} open MLPendingAckStore cursor success",
originPersistentTopic.getName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,8 @@ public void testEndTBRecoveringWhenManagerLedgerDisReadable() throws Exception{
@Test
public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{
TxnLogBufferedWriterConfig bufferedWriterConfig = new TxnLogBufferedWriterConfig();
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
HashedWheelTimer transactionTimer = new HashedWheelTimer(new DefaultThreadFactory("transaction-timer"),
1, TimeUnit.MILLISECONDS);

String topic = NAMESPACE1 + "/testEndTPRecoveringWhenManagerLedgerDisReadable";
admin.topics().createNonPartitionedTopic(topic);
Expand Down Expand Up @@ -647,7 +648,7 @@ public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{
TransactionPendingAckStoreProvider pendingAckStoreProvider = mock(TransactionPendingAckStoreProvider.class);
doReturn(CompletableFuture.completedFuture(
new MLPendingAckStore(persistentTopic.getManagedLedger(), managedCursor, null,
500, bufferedWriterConfig, scheduledExecutorService)))
500, bufferedWriterConfig, transactionTimer)))
.when(pendingAckStoreProvider).newPendingAckStore(any());
doReturn(CompletableFuture.completedFuture(true)).when(pendingAckStoreProvider).checkInitializedBefore(any());

Expand Down Expand Up @@ -680,6 +681,9 @@ public void testEndTPRecoveringWhenManagerLedgerDisReadable() throws Exception{

Awaitility.await().untilAsserted(() ->
assertEquals(pendingAckHandle3.getStats(false).state, "Ready"));

// cleanup
transactionTimer.stop();
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,8 +18,9 @@
*/
package org.apache.pulsar.broker.transaction.pendingack;

import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import io.netty.util.HashedWheelTimer;
import io.netty.util.concurrent.DefaultThreadFactory;
import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.ManagedCursor;
Expand Down Expand Up @@ -52,7 +53,8 @@ public PendingAckMetadataTest() {
@Test
public void testPendingAckManageLedgerWriteFailState() throws Exception {
TxnLogBufferedWriterConfig bufferedWriterConfig = new TxnLogBufferedWriterConfig();
ScheduledExecutorService scheduledExecutorService = Executors.newScheduledThreadPool(1);
HashedWheelTimer transactionTimer = new HashedWheelTimer(new DefaultThreadFactory("transaction-timer"),
1, TimeUnit.MILLISECONDS);

ManagedLedgerFactoryConfig factoryConf = new ManagedLedgerFactoryConfig();
factoryConf.setMaxCacheSize(0);
Expand All @@ -79,7 +81,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
ManagedCursor subCursor = completableFuture.get().openCursor("test");
MLPendingAckStore pendingAckStore =
new MLPendingAckStore(completableFuture.get(), cursor, subCursor, 500,
bufferedWriterConfig, scheduledExecutorService);
bufferedWriterConfig, transactionTimer);

Field field = MLPendingAckStore.class.getDeclaredField("managedLedger");
field.setAccessible(true);
Expand All @@ -102,7 +104,7 @@ public void openLedgerFailed(ManagedLedgerException exception, Object ctx) {
completableFuture.get().close();
cursor.close();
subCursor.close();
scheduledExecutorService.shutdown();
transactionTimer.stop();
}

}

0 comments on commit 346dd5d

Please sign in to comment.