Skip to content
This repository has been archived by the owner on Apr 1, 2024. It is now read-only.

Commit

Permalink
[fix][broker] Release EntryBuffer after parse proto object (apache#20170
Browse files Browse the repository at this point in the history
)
  • Loading branch information
coderzc authored Apr 24, 2023
1 parent ab8a8c9 commit 35e9897
Showing 1 changed file with 25 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@

import com.google.protobuf.InvalidProtocolBufferException;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import java.util.ArrayList;
import java.util.Enumeration;
import java.util.List;
Expand All @@ -38,6 +39,7 @@
import org.apache.pulsar.broker.ServiceConfiguration;
import org.apache.pulsar.broker.delayed.proto.SnapshotMetadata;
import org.apache.pulsar.broker.delayed.proto.SnapshotSegment;
import org.apache.pulsar.common.allocator.PulsarByteBufAllocator;
import org.apache.pulsar.common.util.FutureUtil;

@Slf4j
Expand All @@ -60,8 +62,9 @@ public BookkeeperBucketSnapshotStorage(PulsarService pulsar) {
public CompletableFuture<Long> createBucketSnapshot(SnapshotMetadata snapshotMetadata,
List<SnapshotSegment> bucketSnapshotSegments,
String bucketKey, String topicName, String cursorName) {
ByteBuf metadataByteBuf = Unpooled.wrappedBuffer(snapshotMetadata.toByteArray());
return createLedger(bucketKey, topicName, cursorName)
.thenCompose(ledgerHandle -> addEntry(ledgerHandle, snapshotMetadata.toByteArray())
.thenCompose(ledgerHandle -> addEntry(ledgerHandle, metadataByteBuf)
.thenCompose(__ -> addSnapshotSegments(ledgerHandle, bucketSnapshotSegments))
.thenCompose(__ -> closeLedger(ledgerHandle))
.thenApply(__ -> ledgerHandle.getId()));
Expand Down Expand Up @@ -117,19 +120,32 @@ public void close() throws Exception {
private CompletableFuture<Void> addSnapshotSegments(LedgerHandle ledgerHandle,
List<SnapshotSegment> bucketSnapshotSegments) {
List<CompletableFuture<Void>> addFutures = new ArrayList<>();
ByteBuf byteBuf;
for (SnapshotSegment bucketSnapshotSegment : bucketSnapshotSegments) {
addFutures.add(addEntry(ledgerHandle, bucketSnapshotSegment.toByteArray()));
byteBuf = PulsarByteBufAllocator.DEFAULT.directBuffer(bucketSnapshotSegment.getSerializedSize());
try {
bucketSnapshotSegment.writeTo(byteBuf);
} catch (Exception e){
byteBuf.release();
throw e;
}
addFutures.add(addEntry(ledgerHandle, byteBuf));
}

return FutureUtil.waitForAll(addFutures);
}

private SnapshotMetadata parseSnapshotMetadataEntry(LedgerEntry ledgerEntry) {
ByteBuf entryBuffer = null;
try {
ByteBuf entryBuffer = ledgerEntry.getEntryBuffer();
entryBuffer = ledgerEntry.getEntryBuffer();
return SnapshotMetadata.parseFrom(entryBuffer.nioBuffer());
} catch (InvalidProtocolBufferException e) {
throw new BucketSnapshotSerializationException(e);
} finally {
if (entryBuffer != null) {
entryBuffer.release();
}
}
}

Expand All @@ -139,7 +155,11 @@ private List<SnapshotSegment> parseSnapshotSegmentEntries(Enumeration<LedgerEntr
LedgerEntry ledgerEntry = entryEnumeration.nextElement();
SnapshotSegment snapshotSegment = new SnapshotSegment();
ByteBuf entryBuffer = ledgerEntry.getEntryBuffer();
snapshotSegment.parseFrom(entryBuffer, entryBuffer.readableBytes());
try {
snapshotSegment.parseFrom(entryBuffer, entryBuffer.readableBytes());
} finally {
entryBuffer.release();
}
snapshotMetadataList.add(snapshotSegment);
}
return snapshotMetadataList;
Expand Down Expand Up @@ -208,7 +228,7 @@ private CompletableFuture<Void> closeLedger(LedgerHandle ledgerHandle) {
return future;
}

private CompletableFuture<Void> addEntry(LedgerHandle ledgerHandle, byte[] data) {
private CompletableFuture<Void> addEntry(LedgerHandle ledgerHandle, ByteBuf data) {
final CompletableFuture<Void> future = new CompletableFuture<>();
ledgerHandle.asyncAddEntry(data,
(rc, handle, entryId, ctx) -> {
Expand Down

0 comments on commit 35e9897

Please sign in to comment.