Skip to content

Commit

Permalink
feat(s3stream): release refCnt when ownership transferred
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh committed Dec 6, 2023
1 parent 6570ebd commit f478603
Show file tree
Hide file tree
Showing 4 changed files with 6 additions and 24 deletions.
12 changes: 5 additions & 7 deletions s3stream/src/main/java/com/automq/stream/s3/StreamDataBlock.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ public class StreamDataBlock {
private final ObjectReader.DataBlockIndex dataBlockIndex;
private final CompletableFuture<ByteBuf> dataCf = new CompletableFuture<>();
private final AtomicInteger refCount = new AtomicInteger(1);
private boolean isDataTransferred = false;

public StreamDataBlock(long streamId, long startOffset, long endOffset, long objectId, ObjectReader.DataBlockIndex dataBlockIndex) {
this.streamId = streamId;
Expand Down Expand Up @@ -104,8 +103,12 @@ public CompletableFuture<ByteBuf> getDataCf() {
return this.dataCf;
}

public void releaseRef() {
refCount.decrementAndGet();
}

public void release() {
if (!isDataTransferred && refCount.decrementAndGet() == 0) {
if (refCount.decrementAndGet() == 0) {
dataCf.thenAccept(buf -> {
if (buf != null) {
buf.release();
Expand All @@ -114,10 +117,6 @@ public void release() {
}
}

public void transferDataOwnership() {
isDataTransferred = true;
}

@Override
public String toString() {
return "StreamDataBlock{" +
Expand All @@ -126,7 +125,6 @@ public String toString() {
", startOffset=" + startOffset +
", endOffset=" + endOffset +
", dataBlockIndex=" + dataBlockIndex +
", isDataTransferred=" + isDataTransferred +
'}';
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ private CompositeByteBuf groupWaitingBlocks() {
CompositeByteBuf buf = DirectByteBufAlloc.compositeByteBuffer();
for (StreamDataBlock block : waitingUploadBlocks) {
buf.addComponent(true, block.getDataCf().join());
block.transferDataOwnership();
block.releaseRef();
completedBlocks.add(block);
nextDataBlockPosition += block.getBlockSize();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,12 @@
package com.automq.stream.s3.compact;

import com.automq.stream.s3.Config;
import com.automq.stream.s3.DirectByteBufAlloc;
import com.automq.stream.s3.ObjectReader;
import com.automq.stream.s3.ObjectWriter;
import com.automq.stream.s3.StreamDataBlock;
import com.automq.stream.s3.TestUtils;
import com.automq.stream.s3.compact.operator.DataBlockReader;
import com.automq.stream.s3.memory.MemoryMetadataManager;
import com.automq.stream.s3.metadata.StreamMetadata;
import com.automq.stream.s3.metadata.StreamState;
import com.automq.stream.s3.model.StreamRecordBatch;
import com.automq.stream.s3.network.ThrottleStrategy;
import com.automq.stream.s3.objects.CommitStreamSetObjectRequest;
import com.automq.stream.s3.objects.ObjectStreamRange;
import com.automq.stream.s3.objects.StreamObject;
Expand All @@ -37,11 +32,6 @@
import com.automq.stream.s3.metadata.S3StreamConstant;
import com.automq.stream.s3.metadata.StreamOffsetRange;
import com.automq.stream.s3.operator.DefaultS3Operator;
import com.automq.stream.s3.operator.MemoryS3Operator;
import com.automq.stream.s3.operator.S3Operator;
import com.automq.stream.s3.operator.Writer;
import io.netty.buffer.ByteBuf;
import io.netty.buffer.CompositeByteBuf;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.BeforeEach;
Expand All @@ -51,9 +41,7 @@
import org.mockito.Mockito;
import software.amazon.awssdk.core.async.AsyncRequestBody;
import software.amazon.awssdk.services.s3.S3AsyncClient;
import software.amazon.awssdk.services.s3.model.CompletedPart;
import software.amazon.awssdk.services.s3.model.PutObjectRequest;
import software.amazon.awssdk.services.s3.model.PutObjectResponse;

import java.util.ArrayList;
import java.util.Collection;
Expand All @@ -65,14 +53,11 @@
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.ArgumentMatchers.anyInt;
import static org.mockito.ArgumentMatchers.anyLong;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.Mockito.doAnswer;
Expand Down

0 comments on commit f478603

Please sign in to comment.