Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[improve][monitor] Introduce new metrics related to transaction buffer snapshot segments #20125

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.prometheus.client.CollectorRegistry;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
Expand Down Expand Up @@ -52,6 +53,7 @@
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotIndexesMetadata;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TransactionBufferSnapshotSegment;
import org.apache.pulsar.broker.transaction.buffer.metadata.v2.TxnIDData;
import org.apache.pulsar.broker.transaction.buffer.stats.TxnSnapshotSegmentStats;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.client.api.transaction.TxnID;
Expand Down Expand Up @@ -129,6 +131,7 @@ public class SnapshotSegmentAbortedTxnProcessorImpl implements AbortedTxnProcess
*/
private final PersistentWorker persistentWorker;

private final TxnSnapshotSegmentStats txnSnapshotSegmentStats;
private static final String SNAPSHOT_PREFIX = "multiple-";

public SnapshotSegmentAbortedTxnProcessorImpl(PersistentTopic topic) {
Expand All @@ -147,6 +150,9 @@ public SnapshotSegmentAbortedTxnProcessorImpl(PersistentTopic topic) {
this.snapshotSegmentCapacity = (topic.getBrokerService().getPulsar()
.getConfiguration().getTransactionBufferSnapshotSegmentSize() - 8 - topic.getName().length()) / 3;
this.unsealedTxnIds = new LinkedList<>();
TopicName topicName = TopicName.get(topic.getName());
this.txnSnapshotSegmentStats = new TxnSnapshotSegmentStats(topicName.getNamespace(), topicName.getLocalName(),
CollectorRegistry.defaultRegistry);
}

@Override
Expand Down Expand Up @@ -238,6 +244,7 @@ public CompletableFuture<PositionImpl> recoverFromSnapshot() {
while (reader.hasMoreEvents()) {
Message<TransactionBufferSnapshotIndexes> message = reader.readNextAsync()
.get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
txnSnapshotSegmentStats.recordIndexOpReadSuccess();
if (topic.getName().equals(message.getKey())) {
TransactionBufferSnapshotIndexes transactionBufferSnapshotIndexes = message.getValue();
if (transactionBufferSnapshotIndexes != null) {
Expand All @@ -249,6 +256,7 @@ public CompletableFuture<PositionImpl> recoverFromSnapshot() {
}
}
} catch (TimeoutException ex) {
txnSnapshotSegmentStats.recordIndexOpReadFail();
Throwable t = FutureUtil.unwrapCompletionException(ex);
String errorMessage = String.format("[%s] Transaction buffer recover fail by read "
+ "transactionBufferSnapshot timeout!", topic.getName());
Expand Down Expand Up @@ -288,6 +296,7 @@ public void openReadOnlyManagedLedgerComplete(ReadOnlyManagedLedgerImpl readOnly
new AsyncCallbacks.ReadEntryCallback() {
@Override
public void readEntryComplete(Entry entry, Object ctx) {
txnSnapshotSegmentStats.recordSegmentOpReadSuccess();
handleSnapshotSegmentEntry(entry);
indexes.put(new PositionImpl(
index.abortedMarkLedgerID,
Expand All @@ -310,6 +319,7 @@ public void readEntryFailed(ManagedLedgerException exception, Object ctx) {
the segment can not be read according to the index.
We update index again if there are invalid indexes.
*/
txnSnapshotSegmentStats.recordSegmentOpReadFail();
if (((ManagedLedgerImpl) topic.getManagedLedger())
.ledgerExists(index.getAbortedMarkLedgerID())) {
log.error("[{}] Failed to read snapshot segment [{}:{}]",
Expand Down Expand Up @@ -612,6 +622,7 @@ private CompletableFuture<Void> takeSnapshotSegmentAsync(LinkedList<TxnID> seale
sealedAbortedTxnIdSegment.size());
}
this.sequenceID.getAndIncrement();
txnSnapshotSegmentStats.recordSegmentOpAddSuccess();
});
res.exceptionally(e -> {
//Just log the error, and the processor will try to take snapshot again when the transactionBuffer
Expand All @@ -620,6 +631,7 @@ private CompletableFuture<Void> takeSnapshotSegmentAsync(LinkedList<TxnID> seale
+ "for the topic [{}], and the size of the segment is [{}]",
this.sequenceID, abortedMarkerPersistentPosition, topic.getName(),
sealedAbortedTxnIdSegment.size(), e);
txnSnapshotSegmentStats.recordSegmentOpAddFail();
return null;
});
return res;
Expand Down Expand Up @@ -679,6 +691,7 @@ private CompletableFuture<Void> deleteSnapshotSegment(List<PositionImpl> positio
+ "whose sequenceId is [{}] and maxReadPosition is [{}]",
this.topic.getName(), this.sequenceID, positionNeedToDelete);
}
txnSnapshotSegmentStats.recordSegmentOpDelSuccess();
//The index may fail to update but the processor will check
//whether the snapshot segment is null, and update the index when recovering.
//And if the task is not the newest in the queue, it is no need to update the index.
Expand All @@ -689,6 +702,7 @@ private CompletableFuture<Void> deleteSnapshotSegment(List<PositionImpl> positio
log.warn("[{}] Failed to delete the snapshot segment, "
+ "whose sequenceId is [{}] and maxReadPosition is [{}]",
this.topic.getName(), this.sequenceID, positionNeedToDelete, e);
txnSnapshotSegmentStats.recordSegmentOpDelFail();
return null;
});
results.add(res);
Expand All @@ -698,15 +712,21 @@ private CompletableFuture<Void> deleteSnapshotSegment(List<PositionImpl> positio

private CompletableFuture<Void> updateSnapshotIndex(TransactionBufferSnapshotIndexesMetadata snapshotSegment) {
TransactionBufferSnapshotIndexes snapshotIndexes = new TransactionBufferSnapshotIndexes();
txnSnapshotSegmentStats.setSnapshotSegmentTotal(indexes.size());
CompletableFuture<Void> res = snapshotIndexWriter.getFuture()
.thenCompose((indexesWriter) -> {
snapshotIndexes.setIndexList(indexes.values().stream().toList());
snapshotIndexes.setSnapshot(snapshotSegment);
snapshotIndexes.setTopicName(topic.getName());
txnSnapshotSegmentStats.observeSnapshotIndexEntryBytes(calculateSize(snapshotIndexes));
return indexesWriter.writeAsync(topic.getName(), snapshotIndexes)
.thenCompose(messageId -> CompletableFuture.completedFuture(null));
});
res.thenRun(() -> lastSnapshotTimestamps = System.currentTimeMillis()).exceptionally(e -> {
res.thenRun(() -> {
lastSnapshotTimestamps = System.currentTimeMillis();
txnSnapshotSegmentStats.recordIndexOpAddSuccess();
}).exceptionally(e -> {
txnSnapshotSegmentStats.recordIndexOpAddFail();
log.error("[{}] Failed to update snapshot segment index", snapshotIndexes.getTopicName(), e);
return null;
});
Expand All @@ -717,12 +737,15 @@ private CompletableFuture<Void> clearSnapshotSegmentAndIndexes() {
CompletableFuture<Void> res = persistentWorker.clearAllSnapshotSegments()
.thenCompose((ignore) -> snapshotIndexWriter.getFuture()
.thenCompose(indexesWriter -> indexesWriter.writeAsync(topic.getName(), null)))
.thenRun(() ->
log.debug("Successes to clear the snapshot segment and indexes for the topic [{}]",
topic.getName()));
.thenRun(() -> {
log.debug("Successes to clear the snapshot segment and indexes for the topic [{}]",
topic.getName());
txnSnapshotSegmentStats.recordIndexOpDelSuccess();
});
res.exceptionally(e -> {
log.error("Failed to clear the snapshot segment and indexes for the topic [{}]",
topic.getName(), e);
txnSnapshotSegmentStats.recordIndexOpDelFail();
return null;
});
return res;
Expand Down Expand Up @@ -751,10 +774,13 @@ private CompletableFuture<Void> clearAllSnapshotSegments() {
.get(getSystemClientOperationTimeoutMs(), TimeUnit.MILLISECONDS);
if (topic.getName().equals(message.getValue().getTopicName())) {
snapshotSegmentsWriter.getFuture().get().write(message.getKey(), null);
txnSnapshotSegmentStats.recordSegmentOpDelSuccess();
}
txnSnapshotSegmentStats.recordSegmentOpReadSuccess();
}
return CompletableFuture.completedFuture(null);
} catch (Exception ex) {
txnSnapshotSegmentStats.recordSegmentOpDelFail();
log.error("[{}] Transaction buffer clear snapshot segments fail!", topic.getName(), ex);
return FutureUtil.failedFuture(ex);
} finally {
Expand Down Expand Up @@ -786,4 +812,24 @@ private List<TxnIDData> convertTypeToTxnIDData(List<TxnID> abortedTxns) {
return segment;
}

private int calculateSize(TransactionBufferSnapshotIndexes indexes) {
int size = 0;

// Calculate the size of topicName
size += indexes.getTopicName().getBytes().length;

// Calculate the size of indexList
for (TransactionBufferSnapshotIndex index : indexes.getIndexList()) {
size += 6 * Long.BYTES; // As each TransactionBufferSnapshotIndex has 6 long type attributes
}

// Calculate the size of snapshot
TransactionBufferSnapshotIndexesMetadata snapshot = indexes.getSnapshot();
size += 2 * Long.BYTES; // Size of maxReadPositionLedgerId and maxReadPositionEntryId
for (TxnIDData txnIDData : snapshot.getAborts()) {
size += 2 * Long.BYTES; // Size of mostSigBits and leastSigBits
}

return size;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,188 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.pulsar.broker.transaction.buffer.stats;

import io.prometheus.client.CollectorRegistry;
import io.prometheus.client.Counter;
import io.prometheus.client.Gauge;
import io.prometheus.client.Histogram;
import java.util.concurrent.atomic.AtomicBoolean;

public final class TxnSnapshotSegmentStats implements AutoCloseable {
private static final String NAMESPACE_LABEL_NAME = "namespace";
private static final String TOPIC_NAME_LABEL_NAME = "topic_name";
private static final String OPERATION_LABEL_NAME = "op";
private static final String STATUS_LABEL_NAME = "status";
private static final String PREFIX = "pulsar_txn_tb_snapshot_";

private final CollectorRegistry collectorRegistry;

private final Counter snapshotSegmentOpTotal;

private final Counter snapshotIndexOpTotal;

private final Gauge snapshotSegmentTotal;

private final Histogram snapshotIndexEntryBytes;

private final Counter.Child segmentOpAddSuccessChild;
private final Counter.Child segmentOpDelSuccessChild;
private final Counter.Child segmentOpReadSuccessChild;
private final Counter.Child indexOpAddSuccessChild;
private final Counter.Child indexOpDelSuccessChild;
private final Counter.Child indexOpReadSuccessChild;
private final Counter.Child segmentOpAddFailedChild;
private final Counter.Child segmentOpDelFailedChild;
private final Counter.Child segmentOpReadFailedChild;
private final Counter.Child indexOpAddFailedChild;
private final Counter.Child indexOpDelFailedChild;
private final Counter.Child indexOpReadFailedChild;
private final Gauge.Child snapshotSegmentTotalChild;
private final Histogram.Child snapshotIndexEntryBytesChild;

private final String namespace;
private final String topicName;
private final AtomicBoolean closed = new AtomicBoolean(false);

public TxnSnapshotSegmentStats(String namespace, String topicName, CollectorRegistry registry) {
this.namespace = namespace;
this.topicName = topicName;
this.collectorRegistry = registry;

snapshotSegmentOpTotal = Counter
.build(PREFIX + "segment_op_total",
"Pulsar transaction buffer snapshot segment operation count.")
.labelNames(NAMESPACE_LABEL_NAME, TOPIC_NAME_LABEL_NAME, OPERATION_LABEL_NAME, STATUS_LABEL_NAME)
.register(collectorRegistry);
this.segmentOpAddSuccessChild = snapshotSegmentOpTotal
.labels(namespace, topicName, "add", "success");
this.segmentOpDelSuccessChild = snapshotSegmentOpTotal
.labels(namespace, topicName, "del", "success");
this.segmentOpReadSuccessChild = snapshotSegmentOpTotal
.labels(namespace, topicName, "read", "success");
this.segmentOpAddFailedChild = snapshotSegmentOpTotal
.labels(namespace, topicName, "add", "fail");
this.segmentOpDelFailedChild = snapshotSegmentOpTotal
.labels(namespace, topicName, "del", "fail");
this.segmentOpReadFailedChild = snapshotSegmentOpTotal
.labels(namespace, topicName, "read", "fail");

snapshotIndexOpTotal = Counter
.build(PREFIX + "index_op_total",
"Pulsar transaction buffer snapshot index operation count.")
.labelNames(NAMESPACE_LABEL_NAME, TOPIC_NAME_LABEL_NAME, OPERATION_LABEL_NAME, STATUS_LABEL_NAME)
.register(collectorRegistry);
this.indexOpAddSuccessChild = snapshotIndexOpTotal
.labels(namespace, topicName, "add", "success");
this.indexOpDelSuccessChild = snapshotIndexOpTotal
.labels(namespace, topicName, "del", "success");
this.indexOpReadSuccessChild = snapshotIndexOpTotal
.labels(namespace, topicName, "read", "success");
this.indexOpAddFailedChild = snapshotIndexOpTotal
.labels(namespace, topicName, "add", "fail");
this.indexOpDelFailedChild = snapshotIndexOpTotal
.labels(namespace, topicName, "del", "fail");
this.indexOpReadFailedChild = snapshotIndexOpTotal
.labels(namespace, topicName, "read", "fail");

snapshotSegmentTotal = Gauge
.build(PREFIX + "index_total",
"Number of snapshot segments maintained in the Pulsar transaction buffer.")
.labelNames("namespace", "topic")
.register(collectorRegistry);
this.snapshotSegmentTotalChild = snapshotSegmentTotal
.labels(namespace, topicName);

snapshotIndexEntryBytes = Histogram
.build(PREFIX + "index_entry_bytes",
"Size of the snapshot index entry maintained in the Pulsar transaction buffer.")
.labelNames("namespace", "topic")
.unit("bytes")
.register(collectorRegistry);
this.snapshotIndexEntryBytesChild = snapshotIndexEntryBytes
.labels(namespace, topicName);
}

public void recordSegmentOpAddSuccess() {
this.segmentOpAddSuccessChild.inc();
}

public void recordSegmentOpDelSuccess() {
this.segmentOpDelSuccessChild.inc();
}

public void recordSegmentOpReadSuccess() {
this.segmentOpReadSuccessChild.inc();
}

public void recordIndexOpAddSuccess() {
this.indexOpAddSuccessChild.inc();
}

public void recordIndexOpDelSuccess() {
this.indexOpDelSuccessChild.inc();
}

public void recordIndexOpReadSuccess() {
this.indexOpReadSuccessChild.inc();
}

public void recordSegmentOpAddFail() {
this.segmentOpAddFailedChild.inc();
}

public void recordSegmentOpDelFail() {
this.segmentOpDelFailedChild.inc();
}

public void recordSegmentOpReadFail() {
this.segmentOpReadFailedChild.inc();
}

public void recordIndexOpAddFail() {
this.indexOpAddFailedChild.inc();
}

public void recordIndexOpDelFail() {
this.indexOpDelFailedChild.inc();
}

public void recordIndexOpReadFail() {
this.indexOpReadFailedChild.inc();
}

public void setSnapshotSegmentTotal(double value) {
snapshotSegmentTotalChild.set(value);
}

public void observeSnapshotIndexEntryBytes(double value) {
snapshotIndexEntryBytesChild.observe(value);
}

@Override
public void close() {
if (this.closed.compareAndSet(false, true)) {
collectorRegistry.unregister(snapshotSegmentOpTotal);
collectorRegistry.unregister(snapshotIndexOpTotal);
collectorRegistry.unregister(snapshotSegmentTotal);
collectorRegistry.unregister(snapshotIndexEntryBytes);
}
}
}

Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
/**
* The transaction buffer snapshot metadata.
*/
package org.apache.pulsar.broker.transaction.buffer.stats;
Loading