Skip to content

Commit

Permalink
[improve][txn] PIP-160 admin api coordinatorStats append batch info
Browse files Browse the repository at this point in the history
  • Loading branch information
poorbarcode committed Jul 22, 2022
1 parent 0fe4731 commit ca52cde
Show file tree
Hide file tree
Showing 7 changed files with 68 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.pulsar.common.policies.data.TransactionInBufferStats;
import org.apache.pulsar.common.policies.data.TransactionInPendingAckStats;
import org.apache.pulsar.common.policies.data.TransactionMetadata;
import org.apache.pulsar.common.policies.data.TxnBufferedWriterStat;
import org.apache.pulsar.common.stats.PositionInPendingAckStats;
import org.apache.pulsar.packages.management.core.MockedPackagesStorageProvider;
import org.apache.pulsar.transaction.coordinator.impl.MLTransactionLogImpl;
Expand All @@ -81,13 +82,25 @@
@Test(groups = "broker-admin")
public class AdminApiTransactionTest extends MockedPulsarServiceBaseTest {

/**
* Transaction log batch Configuration.
*/
private boolean transactionBatchEnabled = true;
private int transactionMaxCountInBatch = 256;
private int transactionBatchMaxSize = 1024 * 1024;
private int transactionBatchMaxDelayMillis = 128;

@BeforeMethod
@Override
protected void setup() throws Exception {
conf.setEnablePackagesManagement(true);
conf.setPackagesManagementStorageProvider(MockedPackagesStorageProvider.class.getName());
conf.setTransactionCoordinatorEnabled(true);
conf.setTransactionBufferSnapshotMaxTransactionCount(1);
conf.setTransactionLogBatchedWriteEnabled(transactionBatchEnabled);
conf.setTransactionLogBatchedWriteMaxRecords(transactionMaxCountInBatch);
conf.setTransactionLogBatchedWriteMaxSize(transactionBatchMaxSize);
conf.setTransactionLogBatchedWriteMaxDelayInMillis(transactionBatchMaxDelayMillis);
super.internalSetup();
admin.clusters().createCluster("test", ClusterData.builder().serviceUrl(pulsar.getWebServiceAddress()).build());
TenantInfoImpl tenantInfo = new TenantInfoImpl(Sets.newHashSet("role1", "role2"), Sets.newHashSet("test"));
Expand Down Expand Up @@ -116,17 +129,20 @@ public void testGetTransactionCoordinatorStats() throws Exception {
transactionCoordinatorstats = admin.transactions().getCoordinatorStatsByIdAsync(0).get();
verifyCoordinatorStats(transactionCoordinatorstats.state,
transactionCoordinatorstats.leastSigBits, transactionCoordinatorstats.lowWaterMark);
verifyTxnBufferedWriterStat(transactionCoordinatorstats.bufferedWriterStat);
Map<Integer, TransactionCoordinatorStats> stats = admin.transactions().getCoordinatorStatsAsync().get();

assertEquals(stats.size(), 2);

transactionCoordinatorstats = stats.get(0);
verifyCoordinatorStats(transactionCoordinatorstats.state,
transactionCoordinatorstats.leastSigBits, transactionCoordinatorstats.lowWaterMark);
verifyTxnBufferedWriterStat(transactionCoordinatorstats.bufferedWriterStat);

transactionCoordinatorstats = stats.get(1);
verifyCoordinatorStats(transactionCoordinatorstats.state,
transactionCoordinatorstats.leastSigBits, transactionCoordinatorstats.lowWaterMark);
verifyTxnBufferedWriterStat(transactionCoordinatorstats.bufferedWriterStat);
}

@Test(timeOut = 20000)
Expand Down Expand Up @@ -795,6 +811,14 @@ private static void verifyCoordinatorStats(String state,
assertEquals(lowWaterMark, 0);
}


private void verifyTxnBufferedWriterStat(TxnBufferedWriterStat txnBufferedWriterStat) {
assertEquals(txnBufferedWriterStat.isBatchEnabled(), transactionBatchEnabled);
assertEquals(txnBufferedWriterStat.getBatchedWriteMaxRecords(), transactionMaxCountInBatch);
assertEquals(txnBufferedWriterStat.getBatchedWriteMaxSize(), transactionBatchMaxSize);
assertEquals(txnBufferedWriterStat.getBatchedWriteMaxDelayInMillis(), transactionBatchMaxDelayMillis);
}

private void initTransaction(int coordinatorSize) throws Exception {
pulsar.getPulsarResources()
.getNamespaceResources()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@ public class TransactionCoordinatorStats {
public long recoverStartTime;
//End timestamp of transaction coordinator recovery. 0L means no startup.
public long recoverEndTime;

/** The stat of transaction log buffered writer. **/
public TxnBufferedWriterStat bufferedWriterStat;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
package org.apache.pulsar.common.policies.data;

import lombok.Data;

@Data
public class TxnBufferedWriterStat {

/** Whether to enable the batch feature when writing to Bookie. **/
private boolean batchEnabled = false;

/** If enabled the feature-batch, this attribute means maximum log records count in a batch. **/
private int batchedWriteMaxRecords = 512;

/** If enabled the feature-batch, this attribute means bytes size in a batch. **/
private int batchedWriteMaxSize;

/**
* If enabled the feature-batch, this attribute means maximum wait time(in millis) for the first record in a batch.
*/
private int batchedWriteMaxDelayInMillis;
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.atomic.AtomicLong;
import lombok.Getter;
import org.apache.bookkeeper.mledger.AsyncCallbacks;
import org.apache.bookkeeper.mledger.Entry;
import org.apache.bookkeeper.mledger.ManagedCursor;
Expand Down Expand Up @@ -76,6 +77,7 @@ public class MLTransactionLogImpl implements TransactionLog {

private final TopicName topicName;

@Getter
private TxnLogBufferedWriter<TransactionMetadataEntry> bufferedWriter;

private final Timer timer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -453,6 +453,7 @@ public TransactionCoordinatorStats getCoordinatorStats() {
transactionCoordinatorstats.ongoingTxnSize = txnMetaMap.size();
transactionCoordinatorstats.recoverStartTime = recoverTime.getRecoverStartTime();
transactionCoordinatorstats.recoverEndTime = recoverTime.getRecoverEndTime();
transactionCoordinatorstats.bufferedWriterStat = transactionLog.getBufferedWriter().getStats();
return transactionCoordinatorstats;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.bookkeeper.mledger.Position;
import org.apache.commons.collections4.CollectionUtils;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.policies.data.TxnBufferedWriterStat;

/***
* See PIP-160: https://github.com/apache/pulsar/issues/15516.
Expand Down Expand Up @@ -590,4 +591,13 @@ public void addFailed(ManagedLedgerException exception, Object ctx) {
}
}
}

public TxnBufferedWriterStat getStats(){
TxnBufferedWriterStat txnBufferedWriterStat = new TxnBufferedWriterStat();
txnBufferedWriterStat.setBatchEnabled(batchEnabled);
txnBufferedWriterStat.setBatchedWriteMaxRecords(batchedWriteMaxRecords);
txnBufferedWriterStat.setBatchedWriteMaxSize(batchedWriteMaxSize);
txnBufferedWriterStat.setBatchedWriteMaxDelayInMillis(batchedWriteMaxDelayInMillis);
return txnBufferedWriterStat;
}
}
8 changes: 7 additions & 1 deletion site2/docs/admin-api-transactions.md
Original file line number Diff line number Diff line change
Expand Up @@ -417,7 +417,13 @@ The following is an example of the returned values.
"lowWaterMark" : 0,
"ongoingTxnSize" : 0,
"recoverStartTime" : 1657021892377,
"recoverEndTime" : 1657021892378
"recoverEndTime" : 1657021892378,
"bufferedWriterStat":{
"batchEnabled": true,
"batchedWriteMaxRecords": 512
"batchedWriteMaxSize": 4194304,
"batchedWriteMaxDelayInMillis": 1
}
}

```
Expand Down

0 comments on commit ca52cde

Please sign in to comment.