Skip to content

Commit

Permalink
Storages: Separate ColumnFileTinyReader & ColumnFileSetInputStream (#…
Browse files Browse the repository at this point in the history
…9527)

ref #6233

Stroages: Separate ColumnFileTinyReader & ColumnFileSetInputStream

Co-authored-by: ti-chi-bot[bot] <108142056+ti-chi-bot[bot]@users.noreply.github.com>
Co-authored-by: jinhelin <[email protected]>
Co-authored-by: JaySon <[email protected]>
  • Loading branch information
4 people authored Oct 16, 2024
1 parent 5e44fe9 commit aaf95f7
Show file tree
Hide file tree
Showing 17 changed files with 347 additions and 275 deletions.
19 changes: 9 additions & 10 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFile.h
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,9 @@ class ColumnFile
: id(++MAX_COLUMN_FILE_ID)
{}

public:
virtual ~ColumnFile() = default;

public:
enum Type : UInt32
{
DELETE_RANGE = 1,
Expand Down Expand Up @@ -94,8 +94,8 @@ class ColumnFile
UInt64 getId() const { return id; }

virtual size_t getRows() const { return 0; }
virtual size_t getBytes() const { return 0; };
virtual size_t getDeletes() const { return 0; };
virtual size_t getBytes() const { return 0; }
virtual size_t getDeletes() const { return 0; }

virtual Type getType() const = 0;

Expand All @@ -104,11 +104,11 @@ class ColumnFile
/// Is a ColumnFileTiny or not.
bool isTinyFile() const { return getType() == Type::TINY_FILE; }
/// Is a ColumnFileDeleteRange or not.
bool isDeleteRange() const { return getType() == Type::DELETE_RANGE; };
bool isDeleteRange() const { return getType() == Type::DELETE_RANGE; }
/// Is a ColumnFileBig or not.
bool isBigFile() const { return getType() == Type::BIG_FILE; };
bool isBigFile() const { return getType() == Type::BIG_FILE; }
/// Is a ColumnFilePersisted or not
bool isPersisted() const { return getType() != Type::INMEMORY_FILE; };
bool isPersisted() const { return getType() != Type::INMEMORY_FILE; }

/**
* Whether this column file SEEMS TO BE flushed from another.
Expand All @@ -130,7 +130,8 @@ class ColumnFile
virtual ColumnFileReaderPtr getReader(
const DMContext & context,
const IColumnFileDataProviderPtr & data_provider,
const ColumnDefinesPtr & col_defs) const
const ColumnDefinesPtr & col_defs,
ReadTag read_tag) const
= 0;

/// Note: Only ColumnFileInMemory can be appendable. Other ColumnFiles (i.e. ColumnFilePersisted) have
Expand Down Expand Up @@ -177,9 +178,7 @@ class ColumnFileReader
virtual size_t skipNextBlock() { throw Exception("Unsupported operation", ErrorCodes::LOGICAL_ERROR); }

/// Create a new reader from current reader with different columns to read.
virtual ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & col_defs) = 0;

virtual void setReadTag(ReadTag /*read_tag*/) {}
virtual ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & col_defs, ReadTag read_tag) = 0;
};

std::pair<size_t, size_t> copyColumnsData(
Expand Down
16 changes: 5 additions & 11 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,9 +67,10 @@ void ColumnFileBig::removeData(WriteBatches & wbs) const
ColumnFileReaderPtr ColumnFileBig::getReader(
const DMContext & dm_context,
const IColumnFileDataProviderPtr &,
const ColumnDefinesPtr & col_defs) const
const ColumnDefinesPtr & col_defs,
ReadTag read_tag) const
{
return std::make_shared<ColumnFileBigReader>(dm_context, *this, col_defs);
return std::make_shared<ColumnFileBigReader>(dm_context, *this, col_defs, read_tag);
}

void ColumnFileBig::serializeMetadata(WriteBuffer & buf, bool /*save_schema*/) const
Expand Down Expand Up @@ -380,17 +381,10 @@ size_t ColumnFileBigReader::skipNextBlock()
}
}

ColumnFileReaderPtr ColumnFileBigReader::createNewReader(const ColumnDefinesPtr & new_col_defs)
ColumnFileReaderPtr ColumnFileBigReader::createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag read_tag)
{
// Currently we don't reuse the cache data.
return std::make_shared<ColumnFileBigReader>(dm_context, column_file, new_col_defs);
}

void ColumnFileBigReader::setReadTag(ReadTag read_tag_)
{
// `read_tag` should be set before `file_stream` is initialized.
RUNTIME_CHECK(file_stream == nullptr);
read_tag = read_tag_;
return std::make_shared<ColumnFileBigReader>(dm_context, column_file, new_col_defs, read_tag);
}

} // namespace DB::DM
13 changes: 7 additions & 6 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileBig.h
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,8 @@ class ColumnFileBig : public ColumnFilePersisted
ColumnFileReaderPtr getReader(
const DMContext & dm_context,
const IColumnFileDataProviderPtr & data_provider,
const ColumnDefinesPtr & col_defs) const override;
const ColumnDefinesPtr & col_defs,
ReadTag) const override;

void serializeMetadata(WriteBuffer & buf, bool save_schema) const override;
void serializeMetadata(dtpb::ColumnFilePersisted * cf_pb, bool save_schema) const override;
Expand Down Expand Up @@ -146,7 +147,7 @@ class ColumnFileBigReader : public ColumnFileReader
Block cur_block;
Columns cur_block_data; // The references to columns in cur_block, for faster access.

ReadTag read_tag = ReadTag::Internal;
ReadTag read_tag;

private:
void initStream();
Expand All @@ -165,10 +166,12 @@ class ColumnFileBigReader : public ColumnFileReader
ColumnFileBigReader(
const DMContext & dm_context_,
const ColumnFileBig & column_file_,
const ColumnDefinesPtr & col_defs_)
const ColumnDefinesPtr & col_defs_,
ReadTag read_tag_)
: dm_context(dm_context_)
, column_file(column_file_)
, col_defs(col_defs_)
, read_tag(read_tag_)
{
if (col_defs_->size() == 1)
{
Expand Down Expand Up @@ -200,9 +203,7 @@ class ColumnFileBigReader : public ColumnFileReader

size_t skipNextBlock() override;

ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs) override;

void setReadTag(ReadTag read_tag_) override;
ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag) override;
};

} // namespace DB::DM
18 changes: 7 additions & 11 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileDeleteRange.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -16,16 +16,17 @@
#include <Storages/DeltaMerge/DMContext.h>


namespace DB
{
namespace DM
namespace DB::DM
{

ColumnFileReaderPtr ColumnFileDeleteRange::getReader(
const DMContext &,
const IColumnFileDataProviderPtr &,
const ColumnDefinesPtr &) const
const ColumnDefinesPtr &,
ReadTag) const
{
return std::make_shared<ColumnFileEmptyReader>();
// ColumnFileDeleteRange is not readable.
return nullptr;
}

void ColumnFileDeleteRange::serializeMetadata(WriteBuffer & buf, bool /*save_schema*/) const
Expand All @@ -50,9 +51,4 @@ ColumnFilePersistedPtr ColumnFileDeleteRange::deserializeMetadata(const dtpb::Co
return std::make_shared<ColumnFileDeleteRange>(RowKeyRange::deserialize(dr_pb.range()));
}

ColumnFileReaderPtr ColumnFileEmptyReader::createNewReader(const ColumnDefinesPtr &)
{
return std::make_shared<ColumnFileEmptyReader>();
}
} // namespace DM
} // namespace DB
} // namespace DB::DM
20 changes: 8 additions & 12 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileDeleteRange.h
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@
#include <Storages/DeltaMerge/Remote/Serializer_fwd.h>


namespace DB
{
namespace DM
namespace DB::DM
{

class ColumnFileDeleteRange;
using ColumnFileDeleteRangePtr = std::shared_ptr<ColumnFileDeleteRange>;

Expand All @@ -42,8 +41,11 @@ class ColumnFileDeleteRange : public ColumnFilePersisted
{}
ColumnFileDeleteRange(const ColumnFileDeleteRange &) = default;

ColumnFileReaderPtr getReader(const DMContext &, const IColumnFileDataProviderPtr &, const ColumnDefinesPtr &)
const override;
ColumnFileReaderPtr getReader(
const DMContext &,
const IColumnFileDataProviderPtr &,
const ColumnDefinesPtr &,
ReadTag) const override;

const auto & getDeleteRange() { return delete_range; }

Expand Down Expand Up @@ -74,10 +76,4 @@ class ColumnFileDeleteRange : public ColumnFilePersisted
String toString() const override { return "{delete_range:" + delete_range.toString() + "}"; }
};

class ColumnFileEmptyReader : public ColumnFileReader
{
public:
ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr &) override;
};
} // namespace DM
} // namespace DB
} // namespace DB::DM
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,8 @@ void ColumnFileInMemory::fillColumns(const ColumnDefines & col_defs, size_t col_
ColumnFileReaderPtr ColumnFileInMemory::getReader(
const DMContext &,
const IColumnFileDataProviderPtr &,
const ColumnDefinesPtr & col_defs) const
const ColumnDefinesPtr & col_defs,
ReadTag) const
{
return std::make_shared<ColumnFileInMemoryReader>(*this, col_defs);
}
Expand Down Expand Up @@ -155,7 +156,7 @@ size_t ColumnFileInMemoryReader::skipNextBlock()
return memory_file.getRows();
}

ColumnFileReaderPtr ColumnFileInMemoryReader::createNewReader(const ColumnDefinesPtr & new_col_defs)
ColumnFileReaderPtr ColumnFileInMemoryReader::createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag)
{
// Reuse the cache data.
return std::make_shared<ColumnFileInMemoryReader>(memory_file, new_col_defs, cols_data_cache);
Expand Down
7 changes: 4 additions & 3 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileInMemory.h
Original file line number Diff line number Diff line change
Expand Up @@ -65,7 +65,7 @@ class ColumnFileInMemory : public ColumnFile
Type getType() const override { return Type::INMEMORY_FILE; }

size_t getRows() const override { return rows; }
size_t getBytes() const override { return bytes; };
size_t getBytes() const override { return bytes; }

CachePtr getCache() { return cache; }

Expand All @@ -77,7 +77,8 @@ class ColumnFileInMemory : public ColumnFile
ColumnFileReaderPtr getReader(
const DMContext & context,
const IColumnFileDataProviderPtr & data_provider,
const ColumnDefinesPtr & col_defs) const override;
const ColumnDefinesPtr & col_defs,
ReadTag) const override;

bool isAppendable() const override { return !disable_append; }
void disableAppend() override { disable_append = true; }
Expand Down Expand Up @@ -138,7 +139,7 @@ class ColumnFileInMemoryReader : public ColumnFileReader

size_t skipNextBlock() override;

ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs) override;
ColumnFileReaderPtr createNewReader(const ColumnDefinesPtr & new_col_defs, ReadTag) override;
};

} // namespace DM
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,80 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#include <Storages/DeltaMerge/ColumnFile/ColumnFileSetInputStream.h>

namespace DB::DM
{

size_t ColumnFileSetInputStream::skipNextBlock()
{
while (cur_column_file_reader != reader.column_file_readers.end())
{
if (*cur_column_file_reader == nullptr)
{
++cur_column_file_reader;
continue;
}
auto skipped_rows = (*cur_column_file_reader)->skipNextBlock();
read_rows += skipped_rows;
if (skipped_rows)
return skipped_rows;
else
{
(*cur_column_file_reader).reset();
++cur_column_file_reader;
}
}
return 0;
}

Block ColumnFileSetInputStream::read()
{
while (cur_column_file_reader != reader.column_file_readers.end())
{
if (*cur_column_file_reader == nullptr)
{
++cur_column_file_reader;
continue;
}
auto block = (*cur_column_file_reader)->readNextBlock();
if (block)
{
block.setStartOffset(read_rows);
read_rows += block.rows();
return block;
}
else
{
(*cur_column_file_reader).reset();
++cur_column_file_reader;
}
}
return {};
}

Block ColumnFileSetInputStream::readWithFilter(const IColumn::Filter & filter)
{
auto block = read();
if (size_t passed_count = countBytesInFilter(filter); passed_count != block.rows())
{
for (auto & col : block)
{
col.column = col.column->filter(filter, passed_count);
}
}
return block;
}

} // namespace DB::DM
57 changes: 57 additions & 0 deletions dbms/src/Storages/DeltaMerge/ColumnFile/ColumnFileSetInputStream.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
// Copyright 2024 PingCAP, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#pragma once

#include <Storages/DeltaMerge/ColumnFile/ColumnFileSetReader.h>
#include <Storages/DeltaMerge/SkippableBlockInputStream.h>

namespace DB::DM
{

class ColumnFileSetInputStream : public SkippableBlockInputStream
{
protected:
ColumnFileSetReader reader;

std::vector<ColumnFileReaderPtr>::iterator cur_column_file_reader;
size_t read_rows = 0;

public:
ColumnFileSetInputStream(
const DMContext & context_,
const ColumnFileSetSnapshotPtr & delta_snap_,
const ColumnDefinesPtr & col_defs_,
const RowKeyRange & segment_range_,
ReadTag read_tag_)
: reader(context_, delta_snap_, col_defs_, segment_range_, read_tag_)
{
cur_column_file_reader = reader.column_file_readers.begin();
}

String getName() const override { return "ColumnFileSet"; }
Block getHeader() const override { return toEmptyBlock(*(reader.col_defs)); }

bool getSkippedRows(size_t &) override { throw Exception("Not implemented", ErrorCodes::NOT_IMPLEMENTED); }

size_t skipNextBlock() override;

Block read() override;

Block readWithFilter(const IColumn::Filter & filter) override;
};

using ColumnFileSetInputStreamPtr = std::shared_ptr<ColumnFileSetInputStream>;

} // namespace DB::DM
Loading

0 comments on commit aaf95f7

Please sign in to comment.