Skip to content

Commit

Permalink
fix(s3stream): prevent commit object 0 when compacting single object
Browse files Browse the repository at this point in the history
Signed-off-by: Shichao Nie <[email protected]>
  • Loading branch information
SCNieh committed Mar 4, 2024
1 parent 336fe9e commit 19fce90
Show file tree
Hide file tree
Showing 2 changed files with 36 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -517,6 +517,7 @@ CommitStreamSetObjectRequest buildCompactRequest(List<StreamMetadata> streamMeta
List<S3ObjectMetadata> objectsToCompact)
throws CompletionException {
CommitStreamSetObjectRequest request = new CommitStreamSetObjectRequest();
request.setObjectId(-1L);

Set<Long> compactedObjectIds = new HashSet<>();
logger.info("{} stream set objects as compact candidates, total compaction size: {}",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,41 @@ public void testCompact() {
Assertions.assertTrue(checkDataIntegrity(streamMetadataList, s3ObjectMetadata, request));
}

@Test
public void testCompactSingleObject() {
List<S3ObjectMetadata> s3ObjectMetadataList = new ArrayList<>();
objectManager.prepareObject(1, TimeUnit.MINUTES.toMillis(30)).thenAccept(objectId -> {
assertEquals(OBJECT_3, objectId);
ObjectWriter objectWriter = ObjectWriter.writer(OBJECT_3, s3Operator, 1024, 1024);
StreamRecordBatch r1 = new StreamRecordBatch(STREAM_1, 0, 500, 20, TestUtils.random(20));
StreamRecordBatch r2 = new StreamRecordBatch(STREAM_3, 0, 0, 10, TestUtils.random(1024));
StreamRecordBatch r3 = new StreamRecordBatch(STREAM_3, 0, 10, 10, TestUtils.random(1024));
objectWriter.write(STREAM_1, List.of(r1));
objectWriter.write(STREAM_3, List.of(r2, r3));
objectWriter.close().join();
List<StreamOffsetRange> streamsIndices = List.of(
new StreamOffsetRange(STREAM_1, 500, 520),
new StreamOffsetRange(STREAM_3, 0, 20)
);
S3ObjectMetadata objectMetadata = new S3ObjectMetadata(OBJECT_3, S3ObjectType.STREAM_SET, streamsIndices, System.currentTimeMillis(),
System.currentTimeMillis(), objectWriter.size(), OBJECT_3);
s3ObjectMetadataList.add(objectMetadata);
List.of(r1, r2, r3).forEach(StreamRecordBatch::release);
}).join();
when(streamManager.getStreams(Collections.emptyList())).thenReturn(CompletableFuture.completedFuture(
List.of(new StreamMetadata(STREAM_0, 0, 1024, 2048, StreamState.OPENED),
new StreamMetadata(STREAM_3, 0, 1024, 2048, StreamState.OPENED))));
compactionManager = new CompactionManager(config, objectManager, streamManager, s3Operator);
List<StreamMetadata> streamMetadataList = this.streamManager.getStreams(Collections.emptyList()).join();
CommitStreamSetObjectRequest request = compactionManager.buildCompactRequest(streamMetadataList, s3ObjectMetadataList);
assertEquals(-1L, request.getObjectId());
assertEquals(List.of(OBJECT_3), request.getCompactedObjectIds());
assertTrue(request.getStreamObjects().isEmpty());
assertTrue(request.getStreamRanges().isEmpty());

Assertions.assertTrue(checkDataIntegrity(streamMetadataList, s3ObjectMetadataList, request));
}

@Test
public void testCompactWithDataTrimmed() {
when(streamManager.getStreams(Collections.emptyList())).thenReturn(CompletableFuture.completedFuture(
Expand Down

0 comments on commit 19fce90

Please sign in to comment.