Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[optimize][admin]Enhancing Transaction Buffer Stats and Introducing TransactionBufferInternalStats API #20330

Merged
merged 20 commits into from
Jul 22, 2023
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.pulsar.broker.admin.impl;

import static javax.ws.rs.core.Response.Status.INTERNAL_SERVER_ERROR;
import static javax.ws.rs.core.Response.Status.METHOD_NOT_ALLOWED;
import static javax.ws.rs.core.Response.Status.NOT_FOUND;
import static javax.ws.rs.core.Response.Status.SERVICE_UNAVAILABLE;
Expand All @@ -38,6 +39,7 @@
import org.apache.pulsar.broker.admin.AdminResource;
import org.apache.pulsar.broker.service.Topic;
import org.apache.pulsar.broker.service.persistent.PersistentTopic;
import org.apache.pulsar.broker.transaction.buffer.TransactionBuffer;
import org.apache.pulsar.broker.web.RestException;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.Transactions;
Expand All @@ -47,6 +49,8 @@
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.partition.PartitionedTopicMetadata;
import org.apache.pulsar.common.policies.data.SnapshotInternalStats;
import org.apache.pulsar.common.policies.data.TransactionBufferInternalStats;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorInfo;
import org.apache.pulsar.common.policies.data.TransactionCoordinatorInternalStats;
Expand Down Expand Up @@ -170,9 +174,10 @@ protected CompletableFuture<TransactionInBufferStats> internalGetTransactionInBu
}

protected CompletableFuture<TransactionBufferStats> internalGetTransactionBufferStats(boolean authoritative,
boolean lowWaterMarks) {
boolean lowWaterMarks,
boolean segmentStats) {
return getExistingPersistentTopicAsync(authoritative)
.thenApply(topic -> topic.getTransactionBufferStats(lowWaterMarks));
.thenApply(topic -> topic.getTransactionBufferStats(lowWaterMarks, segmentStats));
}

protected CompletableFuture<TransactionPendingAckStats> internalGetPendingAckStats(
Expand Down Expand Up @@ -431,6 +436,69 @@ protected CompletableFuture<TransactionPendingAckInternalStats> internalGetPendi
);
}

protected CompletableFuture<TransactionBufferInternalStats> internalGetTransactionBufferInternalStats(
boolean authoritative, boolean metadata) {
TransactionBufferInternalStats transactionBufferInternalStats = new TransactionBufferInternalStats();
return getExistingPersistentTopicAsync(authoritative)
.thenCompose(topic -> {
TransactionBuffer.SnapshotType snapshotType = topic.getTransactionBuffer().getSnapshotType();
if (snapshotType == null) {
return FutureUtil.failedFuture(new RestException(NOT_FOUND,
"Transaction buffer Snapshot for the topic does not exist"));
} else if (snapshotType == TransactionBuffer.SnapshotType.Segment) {
transactionBufferInternalStats.snapshotType = snapshotType.toString();
this.topicName = TopicName.get(TopicDomain.persistent.toString(), namespaceName,
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_SEGMENTS);
return getExistingPersistentTopicAsync(authoritative)
.thenCompose(segmentTopic ->
segmentTopic.getManagedLedger().getManagedLedgerInternalStats(metadata)
.thenApply(segmentInternalStats -> {
SnapshotInternalStats segmentStats = new SnapshotInternalStats();
segmentStats.managedLedgerName = segmentTopic
.getManagedLedger().getName();
segmentStats.managedLedgerInternalStats = segmentInternalStats;
transactionBufferInternalStats.segmentInternalStats = segmentStats;
return null;
}))
.thenCompose(ignore -> {
this.topicName = TopicName.get(TopicDomain.persistent.toString(), namespaceName,
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT_INDEXES);
return getExistingPersistentTopicAsync(authoritative)
.thenCompose(indexTopic -> indexTopic.getManagedLedger()
.getManagedLedgerInternalStats(metadata)
.thenApply(indexInternalStats -> {
SnapshotInternalStats indexStats = new SnapshotInternalStats();
indexStats.managedLedgerName = indexTopic
.getManagedLedger().getName();
indexStats.managedLedgerInternalStats = indexInternalStats;
transactionBufferInternalStats
.segmentIndexInternalStats = indexStats;
return null;
}));
}).thenApply(ignore -> transactionBufferInternalStats);
} else if (snapshotType == TransactionBuffer.SnapshotType.Single) {
transactionBufferInternalStats.snapshotType = snapshotType.toString();
this.topicName = TopicName.get(TopicDomain.persistent.toString(), namespaceName,
SystemTopicNames.TRANSACTION_BUFFER_SNAPSHOT);
return getExistingPersistentTopicAsync(authoritative)
.thenCompose(singleTopic ->
singleTopic.getManagedLedger().getManagedLedgerInternalStats(metadata)
.thenApply(singleInternalStats -> {
// Construct the internal stats of the single snapshot
SnapshotInternalStats singleStats = new SnapshotInternalStats();
singleStats.managedLedgerName = singleTopic
.getManagedLedger().getName();
singleStats.managedLedgerInternalStats = singleInternalStats;
transactionBufferInternalStats
.singleSnapshotInternalStats = singleStats;
return null;
})).thenApply(ignore -> transactionBufferInternalStats);
}
return FutureUtil.failedFuture(new RestException(INTERNAL_SERVER_ERROR, "Unknown SnapshotType "
+ snapshotType));
});
}

protected CompletableFuture<PersistentTopic> getExistingPersistentTopicAsync(boolean authoritative) {
return validateTopicOwnershipAsync(topicName, authoritative).thenCompose(__ -> {
CompletableFuture<Optional<Topic>> topicFuture = pulsar().getBrokerService()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,11 +171,13 @@ public void getTransactionBufferStats(@Suspended final AsyncResponse asyncRespon
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("lowWaterMarks") @DefaultValue("false")
boolean lowWaterMarks) {
boolean lowWaterMarks,
@QueryParam("segmentStats") @DefaultValue("false")
boolean segmentStats) {
try {
checkTransactionCoordinatorEnabled();
validateTopicName(tenant, namespace, encodedTopic);
internalGetTransactionBufferStats(authoritative, lowWaterMarks)
internalGetTransactionBufferStats(authoritative, lowWaterMarks, segmentStats)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
Expand Down Expand Up @@ -332,6 +334,51 @@ public void getPendingAckInternalStats(@Suspended final AsyncResponse asyncRespo
}
}

@GET
@Path("/transactionBufferInternalStats/{tenant}/{namespace}/{topic}")
@ApiOperation(value = "Get transaction buffer internal stats.")
@ApiResponses(value = {
@ApiResponse(code = 403, message = "Don't have admin permission"),
@ApiResponse(code = 404, message = "Tenant or cluster or namespace or topic doesn't exist"),
@ApiResponse(code = 503, message = "This Broker is not enable transaction"),
@ApiResponse(code = 307, message = "Topic is not owned by this broker!"),
@ApiResponse(code = 405, message = "Transaction buffer don't use managedLedger!"),
@ApiResponse(code = 400, message = "Topic is not a persistent topic!"),
@ApiResponse(code = 409, message = "Concurrent modification")
})
public void getTransactionBufferInternalStats(@Suspended final AsyncResponse asyncResponse,
@QueryParam("authoritative")
@DefaultValue("false") boolean authoritative,
@PathParam("tenant") String tenant,
@PathParam("namespace") String namespace,
@PathParam("topic") @Encoded String encodedTopic,
@QueryParam("metadata") @DefaultValue("false") boolean metadata) {
try {
validateTopicName(tenant, namespace, encodedTopic);
internalGetTransactionBufferInternalStats(authoritative, metadata)
.thenAccept(asyncResponse::resume)
.exceptionally(ex -> {
if (!isRedirectException(ex)) {
log.error("[{}] Failed to get transaction buffer internal stats {}",
clientAppId(), topicName, ex);
}
Throwable cause = FutureUtil.unwrapCompletionException(ex);
if (cause instanceof BrokerServiceException.ServiceUnitNotReadyException) {
asyncResponse.resume(new RestException(SERVICE_UNAVAILABLE, cause));
} else if (cause instanceof BrokerServiceException.NotAllowedException) {
asyncResponse.resume(new RestException(METHOD_NOT_ALLOWED, cause));
} else if (cause instanceof BrokerServiceException.SubscriptionNotFoundException) {
asyncResponse.resume(new RestException(NOT_FOUND, cause));
} else {
asyncResponse.resume(new RestException(cause));
}
return null;
});
} catch (Exception ex) {
resumeAsyncResponseExceptionally(asyncResponse, ex);
}
}

@POST
@Path("/transactionCoordinator/replicas")
@ApiResponses(value = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3547,8 +3547,8 @@ public boolean checkSubscriptionTypesEnable(SubType subType) {
return subTypesEnabled != null && subTypesEnabled.contains(subType);
}

public TransactionBufferStats getTransactionBufferStats(boolean lowWaterMarks) {
return this.transactionBuffer.getStats(lowWaterMarks);
public TransactionBufferStats getTransactionBufferStats(boolean lowWaterMarks, boolean segmentStats) {
return this.transactionBuffer.getStats(lowWaterMarks, segmentStats);
}

public TransactionPendingAckStats getTransactionPendingAckStats(String subName, boolean lowWaterMarks) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.CompletableFuture;
import org.apache.bookkeeper.mledger.impl.PositionImpl;
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;


public interface AbortedTxnProcessor {
Expand Down Expand Up @@ -66,9 +67,8 @@ public interface AbortedTxnProcessor {

/**
* Get the lastSnapshotTimestamps.
* @return the lastSnapshotTimestamps.
*/
long getLastSnapshotTimestamps();
void generateSnapshotStats(TransactionBufferStats transactionBufferStats, boolean segmentStats);

CompletableFuture<Void> closeAsync();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,11 @@
@Beta
public interface TransactionBuffer {

enum SnapshotType {
Single,
Segment,
}

/**
* Return the metadata of a transaction in the buffer.
*
Expand Down Expand Up @@ -158,6 +163,17 @@ public interface TransactionBuffer {
*/
PositionImpl getMaxReadPosition();

/**
* Get the snapshot type.
*
* The snapshot type can be either "Single" or "Segment". In "Single" mode, a single snapshot log is used
* to record the transaction buffer stats. In "Segment" mode, a snapshot segment topic is used to record
* the stats, and a separate snapshot segment index topic is used to index these stats.
*
* @return the snapshot type
*/
SnapshotType getSnapshotType();

/**
* Get transaction in buffer stats.
* @return the transaction in buffer stats.
Expand All @@ -168,7 +184,7 @@ public interface TransactionBuffer {
* Get transaction stats in buffer.
* @return the transaction stats in buffer.
*/
TransactionBufferStats getStats(boolean lowWaterMarks);
TransactionBufferStats getStats(boolean lowWaterMarks, boolean segmentStats);

/**
* Wait TransactionBuffer Recovers completely.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -374,13 +374,18 @@ public PositionImpl getMaxReadPosition() {
return PositionImpl.LATEST;
}

@Override
public SnapshotType getSnapshotType() {
return null;
}

@Override
public TransactionInBufferStats getTransactionInBufferStats(TxnID txnID) {
return null;
}

@Override
public TransactionBufferStats getStats(boolean lowWaterMarks) {
public TransactionBufferStats getStats(boolean lowWaterMarks, boolean segmentStats) {
return null;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.apache.pulsar.client.api.transaction.TxnID;
import org.apache.pulsar.client.impl.PulsarClientImpl;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.util.FutureUtil;

@Slf4j
Expand Down Expand Up @@ -173,8 +174,8 @@ public CompletableFuture<Void> takeAbortedTxnsSnapshot(PositionImpl maxReadPosit
}

@Override
public long getLastSnapshotTimestamps() {
return this.lastSnapshotTimestamps;
public void generateSnapshotStats(TransactionBufferStats transactionBufferStats, boolean segmentStats) {
transactionBufferStats.lastSnapshotTimestamps = this.lastSnapshotTimestamps;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@
import org.apache.pulsar.common.naming.SystemTopicNames;
import org.apache.pulsar.common.naming.TopicDomain;
import org.apache.pulsar.common.naming.TopicName;
import org.apache.pulsar.common.policies.data.SegmentStats;
import org.apache.pulsar.common.policies.data.SegmentsStats;
import org.apache.pulsar.common.policies.data.TransactionBufferStats;
import org.apache.pulsar.common.protocol.Commands;
import org.apache.pulsar.common.util.FutureUtil;

Expand Down Expand Up @@ -115,6 +118,8 @@ public class SnapshotSegmentAbortedTxnProcessorImpl implements AbortedTxnProcess

private volatile long lastSnapshotTimestamps;

private volatile long lastTakedSnapshotSegmentTimestamp;

/**
* The number of the aborted transaction IDs in a segment.
* This is calculated according to the configured memory size.
Expand Down Expand Up @@ -449,8 +454,23 @@ public CompletableFuture<Void> clearAbortedTxnSnapshot() {
}

@Override
public long getLastSnapshotTimestamps() {
return this.lastSnapshotTimestamps;
public void generateSnapshotStats(TransactionBufferStats transactionBufferStats, boolean segmentStats) {
transactionBufferStats.totalAbortedTransactions = this.aborts.size();
transactionBufferStats.lastSnapshotTimestamps = this.lastSnapshotTimestamps;
SegmentsStats segmentsStats = new SegmentsStats();
segmentsStats.currentSegmentCapacity = this.snapshotSegmentCapacity;
segmentsStats.lastTookSnapshotSegmentTimestamp = this.lastTakedSnapshotSegmentTimestamp;
segmentsStats.unsealedAbortTxnIDSize = this.unsealedTxnIds.size();
segmentsStats.segmentsSize = indexes.size();
if (segmentStats) {
List<SegmentStats> statsList = new ArrayList<>();
segmentIndex.forEach((position, txnID) -> {
SegmentStats stats = new SegmentStats(txnID.toString(), position.toString());
statsList.add(stats);
});
segmentsStats.segmentStats = statsList;
}
transactionBufferStats.segmentsStats = segmentsStats;
}

@Override
Expand Down Expand Up @@ -702,6 +722,7 @@ private CompletableFuture<Void> writeSnapshotSegmentAsync(LinkedList<TxnID> segm
transactionBufferSnapshotSegment.setSequenceId(this.sequenceID.get());
return segmentWriter.writeAsync(buildKey(this.sequenceID.get()), transactionBufferSnapshotSegment);
}).thenCompose((messageId) -> {
lastTakedSnapshotSegmentTimestamp = System.currentTimeMillis();
//Build index for this segment
TransactionBufferSnapshotIndex index = new TransactionBufferSnapshotIndex();
index.setSequenceID(transactionBufferSnapshotSegment.getSequenceId());
Expand Down
Loading