Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(kafka_issues642): object format support data block group #887

Merged
merged 1 commit into from
Jan 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@
<guava.version>32.1.3-jre</guava.version>
<slf4j.version>2.0.9</slf4j.version>
<snakeyaml.version>2.2</snakeyaml.version>
<s3stream.version>0.14.0-SNAPSHOT</s3stream.version>
<s3stream.version>0.15.0-SNAPSHOT</s3stream.version>

<!-- Flat buffers related -->
<flatbuffers.version>23.5.26</flatbuffers.version>
Expand Down
2 changes: 1 addition & 1 deletion s3stream/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
<modelVersion>4.0.0</modelVersion>
<groupId>com.automq.elasticstream</groupId>
<artifactId>s3stream</artifactId>
<version>0.14.0-SNAPSHOT</version>
<version>0.15.0-SNAPSHOT</version>
<properties>
<mockito-core.version>5.5.0</mockito-core.version>
<junit-jupiter.version>5.10.0</junit-jupiter.version>
Expand Down
44 changes: 44 additions & 0 deletions s3stream/src/main/java/com/automq/stream/s3/DataBlockIndex.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,44 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.automq.stream.s3;

import io.netty.buffer.ByteBuf;

public record DataBlockIndex(long streamId, long startOffset, int endOffsetDelta, int recordCount, long startPosition,
int size) {

public static final int BLOCK_INDEX_SIZE = 8/* streamId */ + 8 /* startOffset */ + 4 /* endOffset delta */
+ 4 /* record count */ + 8 /* block position */ + 4 /* block size */;

public long endOffset() {
return startOffset + endOffsetDelta;
}

public long endPosition() {
return startPosition + size;
}

public void encode(ByteBuf buf) {
buf.writeLong(streamId);
buf.writeLong(startOffset);
buf.writeInt(endOffsetDelta);
buf.writeInt(recordCount);
buf.writeLong(startPosition);
buf.writeInt(size);
}
}
164 changes: 74 additions & 90 deletions s3stream/src/main/java/com/automq/stream/s3/ObjectReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ public CompletableFuture<FindIndexResult> find(long streamId, long startOffset,

public CompletableFuture<DataBlock> read(DataBlockIndex block) {
CompletableFuture<ByteBuf> rangeReadCf = s3Operator.rangeRead(objectKey, block.startPosition(), block.endPosition(), ThrottleStrategy.THROTTLE_1);
return rangeReadCf.thenApply(buf -> new DataBlock(buf, block.recordCount()));
return rangeReadCf.thenApply(DataBlock::new);
}

void asyncGetBasicObjectInfo() {
Expand Down Expand Up @@ -121,12 +121,10 @@ public void close0() {
}

/**
* @param dataBlockSize The total size of the data blocks, which equals to index start position.
* @param indexBlock raw index data.
* @param blockCount The number of data blocks in the object.
* @param indexBlockSize The size of the index blocks.
* @param dataBlockSize The total size of the data blocks, which equals to index start position.
* @param indexBlock raw index data.
*/
public record BasicObjectInfo(long dataBlockSize, IndexBlock indexBlock, int blockCount, int indexBlockSize) {
public record BasicObjectInfo(long dataBlockSize, IndexBlock indexBlock) {

public static BasicObjectInfo parse(ByteBuf objectTailBuf,
S3ObjectMetadata s3ObjectMetadata) throws IndexBlockParseException {
Expand All @@ -145,13 +143,7 @@ public static BasicObjectInfo parse(ByteBuf objectTailBuf,
indexBlockBuf.readBytes(copy, indexBlockBuf.readableBytes());
objectTailBuf.release();
indexBlockBuf = copy;

int blockCount = indexBlockBuf.readInt();
ByteBuf blocks = indexBlockBuf.retainedSlice(indexBlockBuf.readerIndex(), blockCount * 16);
indexBlockBuf.skipBytes(blockCount * 16);
ByteBuf streamRanges = indexBlockBuf.retainedSlice(indexBlockBuf.readerIndex(), indexBlockBuf.readableBytes());
indexBlockBuf.release();
return new BasicObjectInfo(indexBlockPosition, new IndexBlock(s3ObjectMetadata, blocks, streamRanges), blockCount, indexBlockSize);
return new BasicObjectInfo(indexBlockPosition, new IndexBlock(s3ObjectMetadata, indexBlockBuf));
}
}

Expand All @@ -165,48 +157,47 @@ void close() {
}

public static class IndexBlock {
public static final int INDEX_BLOCK_UNIT_SIZE = 8/* streamId */ + 8 /* startOffset */ + 4 /* endOffset delta */
+ 4 /* record count */ + 8 /* block position */ + 4 /* block size */;
private final S3ObjectMetadata s3ObjectMetadata;
private final ByteBuf blocks;
private final ByteBuf streamRanges;
private final ByteBuf buf;
private final int size;
private final int count;

public IndexBlock(S3ObjectMetadata s3ObjectMetadata, ByteBuf blocks, ByteBuf streamRanges) {
public IndexBlock(S3ObjectMetadata s3ObjectMetadata, ByteBuf buf) {
this.s3ObjectMetadata = s3ObjectMetadata;
this.blocks = blocks;
this.streamRanges = streamRanges;
this.size = blocks.readableBytes() + streamRanges.readableBytes();
this.buf = buf;
this.size = buf.readableBytes();
this.count = buf.readableBytes() / INDEX_BLOCK_UNIT_SIZE;
}

public Iterator<StreamDataBlock> iterator() {
ByteBuf blocks = this.blocks.slice();
ByteBuf ranges = this.streamRanges.slice();
public Iterator<DataBlockIndex> iterator() {
AtomicInteger getIndex = new AtomicInteger(0);
return new Iterator<>() {
@Override
public boolean hasNext() {
return ranges.readableBytes() != 0;
return getIndex.get() < count;
}

@Override
public StreamDataBlock next() {
long streamId = ranges.readLong();
long startOffset = ranges.readLong();
long endOffset = startOffset + ranges.readInt();
int rangeBlockId = ranges.readInt();
long blockPosition = blocks.getLong(rangeBlockId * 16);
int blockSize = blocks.getInt(rangeBlockId * 16 + 8);
int recordCount = blocks.getInt(rangeBlockId * 16 + 12);
return new StreamDataBlock(streamId, startOffset, endOffset, rangeBlockId, s3ObjectMetadata.objectId(),
blockPosition, blockSize, recordCount);
public DataBlockIndex next() {
return get(getIndex.getAndIncrement());
}
};
}

public ByteBuf blocks() {
return blocks.slice();
}

public ByteBuf streamRanges() {
return streamRanges.slice();
public DataBlockIndex get(int index) {
if (index < 0 || index >= count) {
throw new IllegalArgumentException("index" + index + " is out of range [0, " + count + ")");
}
int base = index * INDEX_BLOCK_UNIT_SIZE;
long streamId = buf.getLong(base);
long startOffset = buf.getLong(base + 8);
int endOffsetDelta = buf.getInt(base + 16);
int recordCount = buf.getInt(base + 20);
long blockPosition = buf.getLong(base + 24);
int blockSize = buf.getInt(base + 32);
return new DataBlockIndex(streamId, startOffset, endOffsetDelta, recordCount, blockPosition, blockSize);
}

public FindIndexResult find(long streamId, long startOffset, long endOffset) {
Expand All @@ -219,37 +210,30 @@ public FindIndexResult find(long streamId, long startOffset, long endOffset, int
boolean matched = false;
boolean isFulfilled = false;
List<StreamDataBlock> rst = new LinkedList<>();
IndexBlockOrderedBytes indexBlockOrderedBytes = new IndexBlockOrderedBytes(streamRanges);
IndexBlockOrderedBytes indexBlockOrderedBytes = new IndexBlockOrderedBytes(this);
int startIndex = indexBlockOrderedBytes.search(new IndexBlockOrderedBytes.TargetStreamOffset(streamId, startOffset));
if (startIndex == -1) {
// mismatched
return new FindIndexResult(false, nextStartOffset, nextMaxBytes, rst);
}
for (int i = startIndex * 24; i < streamRanges.readableBytes(); i += 24) {
long rangeStreamId = streamRanges.getLong(i);
long rangeStartOffset = streamRanges.getLong(i + 8);
long rangeEndOffset = rangeStartOffset + streamRanges.getInt(i + 16);
int rangeBlockId = streamRanges.getInt(i + 20);
if (rangeStreamId == streamId) {
if (nextStartOffset < rangeStartOffset) {
for (int i = startIndex; i < count(); i++) {
DataBlockIndex index = get(i);
if (index.streamId() == streamId) {
if (nextStartOffset < index.startOffset()) {
break;
}
if (rangeEndOffset <= nextStartOffset) {
if (index.endOffset() <= nextStartOffset) {
continue;
}
matched = nextStartOffset == rangeStartOffset;
nextStartOffset = rangeEndOffset;
long blockPosition = blocks.getLong(rangeBlockId * 16);
int blockSize = blocks.getInt(rangeBlockId * 16 + 8);
int recordCount = blocks.getInt(rangeBlockId * 16 + 12);
rst.add(new StreamDataBlock(streamId, rangeStartOffset, rangeEndOffset, s3ObjectMetadata.objectId(),
new DataBlockIndex(rangeBlockId, blockPosition, blockSize, recordCount)));
matched = nextStartOffset == index.startOffset();
nextStartOffset = index.endOffset();
rst.add(new StreamDataBlock(s3ObjectMetadata.objectId(), index));

// we consider first block as not matched because we do not know exactly how many bytes are within
// the range in first block, as a result we may read one more block than expected.
if (matched) {
int recordPayloadSize = blockSize
- recordCount * StreamRecordBatchCodec.HEADER_SIZE // sum of encoded record header size
int recordPayloadSize = index.size()
- index.recordCount() * StreamRecordBatchCodec.HEADER_SIZE // sum of encoded record header size
- ObjectWriter.DataBlock.BLOCK_HEADER_SIZE; // block header size
nextMaxBytes -= Math.min(nextMaxBytes, recordPayloadSize);
}
Expand All @@ -264,13 +248,16 @@ public FindIndexResult find(long streamId, long startOffset, long endOffset, int
return new FindIndexResult(isFulfilled, nextStartOffset, nextMaxBytes, rst);
}

int size() {
public int size() {
return size;
}

public int count() {
return count;
}

void close() {
blocks.release();
streamRanges.release();
buf.release();
}
}

Expand All @@ -288,45 +275,36 @@ public IndexBlockParseException(long indexBlockPosition) {

}

public record DataBlockIndex(int blockId, long startPosition, int size, int recordCount) {
public static final int BLOCK_INDEX_SIZE = 8 + 4 + 4;

public long endPosition() {
return startPosition + size;
}

@Override
public String toString() {
return "DataBlockIndex{" +
"blockId=" + blockId +
", startPosition=" + startPosition +
", size=" + size +
", recordCount=" + recordCount +
'}';
}
}

public static class DataBlock implements AutoCloseable {
private final ByteBuf buf;
private final int recordCount;

public DataBlock(ByteBuf buf, int recordCount) {
this.buf = buf;
this.recordCount = recordCount;
public DataBlock(ByteBuf buf) {
this.buf = buf.duplicate();
this.recordCount = check(buf);
}

private static int check(ByteBuf buf) {
buf = buf.duplicate();
int recordCount = 0;
while (buf.readableBytes() > 0) {
byte magicCode = buf.readByte();
if (magicCode != ObjectWriter.DATA_BLOCK_MAGIC) {
LOGGER.error("magic code mismatch, expected {}, actual {}", ObjectWriter.DATA_BLOCK_MAGIC, magicCode);
throw new RuntimeException("[FATAL] magic code mismatch, data is corrupted");
}
buf.readByte(); // flag
recordCount += buf.readInt();
int dataLength = buf.readInt();
buf.skipBytes(dataLength);
}
return recordCount;
}

public CloseableIterator<StreamRecordBatch> iterator() {
ByteBuf buf = this.buf.duplicate();
AtomicInteger currentBlockRecordCount = new AtomicInteger(0);
AtomicInteger remainingRecordCount = new AtomicInteger(recordCount);
// skip magic and flag
byte magicCode = buf.readByte();
buf.readByte();

if (magicCode != ObjectWriter.DATA_BLOCK_MAGIC) {
LOGGER.error("magic code mismatch, expected {}, actual {}", ObjectWriter.DATA_BLOCK_MAGIC, magicCode);
throw new RuntimeException("[FATAL] magic code mismatch, data is corrupted");
}
// TODO: check flag, use uncompressed stream or compressed stream.
return new CloseableIterator<>() {
@Override
public boolean hasNext() {
Expand All @@ -339,6 +317,12 @@ public StreamRecordBatch next() {
if (remainingRecordCount.decrementAndGet() < 0) {
throw new NoSuchElementException();
}
if (currentBlockRecordCount.get() == 0) {
buf.skipBytes(1 /* magic */ + 1 /* flag */);
currentBlockRecordCount.set(buf.readInt());
buf.skipBytes(4);
}
currentBlockRecordCount.decrementAndGet();
return StreamRecordBatchCodec.duplicateDecode(buf);
}

Expand Down
31 changes: 8 additions & 23 deletions s3stream/src/main/java/com/automq/stream/s3/ObjectWriter.java
Original file line number Diff line number Diff line change
Expand Up @@ -200,33 +200,15 @@ class IndexBlock {

public IndexBlock() {
long nextPosition = 0;
int indexBlockSize = 4 + (8 + 4 + 4 + 8 + 8 + 4 + 4) * completedBlocks.size();
int indexBlockSize = DataBlockIndex.BLOCK_INDEX_SIZE * completedBlocks.size();
buf = DirectByteBufAlloc.byteBuffer(indexBlockSize, "write_index_block");
buf.writeInt(completedBlocks.size()); // block count
// block index
for (DataBlock block : completedBlocks) {
// start position in the object
buf.writeLong(nextPosition);
// byte size of the block
buf.writeInt(block.size());
// how many ranges in the block
buf.writeInt(block.recordCount());
ObjectStreamRange streamRange = block.getStreamRange();
new DataBlockIndex(streamRange.getStreamId(), streamRange.getStartOffset(), (int) (streamRange.getEndOffset() - streamRange.getStartOffset()),
block.recordCount(), nextPosition, block.size()).encode(buf);
nextPosition += block.size();
}
position = nextPosition;
// object stream range
for (int blockIndex = 0; blockIndex < completedBlocks.size(); blockIndex++) {
DataBlock block = completedBlocks.get(blockIndex);
ObjectStreamRange range = block.getStreamRange();
// stream id of this range
buf.writeLong(range.getStreamId());
// start offset of the related stream
buf.writeLong(range.getStartOffset());
// record count of the related stream in this range
buf.writeInt((int) (range.getEndOffset() - range.getStartOffset()));
// the index of block where this range is in
buf.writeInt(blockIndex);
}
}

public ByteBuf buffer() {
Expand All @@ -244,7 +226,7 @@ public int size() {
}

class DataBlock {
public static final int BLOCK_HEADER_SIZE = 2;
public static final int BLOCK_HEADER_SIZE = 1 /* magic */ + 1/* flag */ + 4 /* record count*/ + 4 /* data length */;
private final CompositeByteBuf encodedBuf;
private final ObjectStreamRange streamRange;
private final int recordCount;
Expand All @@ -256,9 +238,12 @@ public DataBlock(long streamId, List<StreamRecordBatch> records) {
ByteBuf header = DirectByteBufAlloc.byteBuffer(BLOCK_HEADER_SIZE);
header.writeByte(DATA_BLOCK_MAGIC);
header.writeByte(DATA_BLOCK_DEFAULT_FLAG);
header.writeInt(recordCount);
header.writeInt(0); // data length
encodedBuf.addComponent(true, header);
records.forEach(r -> encodedBuf.addComponent(true, r.encoded().retain()));
this.size = encodedBuf.readableBytes();
encodedBuf.setInt(BLOCK_HEADER_SIZE - 4, size - BLOCK_HEADER_SIZE);
this.streamRange = new ObjectStreamRange(streamId, records.get(0).getEpoch(), records.get(0).getBaseOffset(), records.get(records.size() - 1).getLastOffset(), size);
}

Expand Down
Loading