diff --git a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp index f96a193a90a..c547325675a 100644 --- a/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp +++ b/dbms/src/Storages/DeltaMerge/DeltaMergeStore_InternalSegment.cpp @@ -21,7 +21,7 @@ #include #include #include -#include +#include #include #include #include @@ -542,7 +542,7 @@ bool DeltaMergeStore::segmentEnsureStableLocalIndexAsync(const SegmentPtr & segm // No lock is needed, stable meta is immutable. const auto build_info - = DMFileIndexWriter::getLocalIndexBuildInfo(local_index_infos_snap, segment->getStable()->getDMFiles()); + = DMFileVectorIndexWriter::getLocalIndexBuildInfo(local_index_infos_snap, segment->getStable()->getDMFiles()); if (!build_info.indexes_to_build || build_info.indexes_to_build->empty() || build_info.dm_files.empty()) return false; @@ -617,7 +617,7 @@ bool DeltaMergeStore::segmentWaitStableLocalIndexReady(const SegmentPtr & segmen // No lock is needed, stable meta is immutable. auto segment_id = segment->segmentId(); auto build_info - = DMFileIndexWriter::getLocalIndexBuildInfo(local_index_infos_snap, segment->getStable()->getDMFiles()); + = DMFileVectorIndexWriter::getLocalIndexBuildInfo(local_index_infos_snap, segment->getStable()->getDMFiles()); if (!build_info.indexes_to_build || build_info.indexes_to_build->empty()) return true; @@ -738,7 +738,7 @@ void DeltaMergeStore::segmentEnsureStableLocalIndex( DMFile::info(index_build_info.dm_files)); // 2. Build the index. - DMFileIndexWriter iw(DMFileIndexWriter::Options{ + DMFileVectorIndexWriter iw(DMFileVectorIndexWriter::Options{ .path_pool = path_pool, .index_infos = index_build_info.indexes_to_build, .dm_files = index_build_info.dm_files, diff --git a/dbms/src/Storages/DeltaMerge/File/DMFile.h b/dbms/src/Storages/DeltaMerge/File/DMFile.h index b5d569445ed..1767ce8331d 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFile.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFile.h @@ -328,9 +328,10 @@ class DMFile : private boost::noncopyable #endif DMFileMetaPtr meta; + friend class DMFileVectorIndexReader; friend class DMFileV3IncrementWriter; friend class DMFileWriter; - friend class DMFileIndexWriter; + friend class DMFileVectorIndexWriter; friend class DMFileReader; friend class MarkLoader; friend class ColumnReadStream; diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp index 4a822fce628..cb65e47bee9 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileBlockInputStream.cpp @@ -250,8 +250,6 @@ SkippableBlockInputStreamPtr DMFileBlockInputStreamBuilder::tryBuildWithVectorIn std::move(header_layout), std::move(rest_columns_reader), std::move(vec_column.value()), - file_provider, - read_limiter, scan_context, vector_index_cache, bitmap_filter.value(), diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexReader.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexReader.cpp new file mode 100644 index 00000000000..ec7ab1a8af7 --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexReader.cpp @@ -0,0 +1,223 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#include +#include +#include +#include +#include +#include +#include +#include + + +namespace DB::ErrorCodes +{ +extern const int S3_ERROR; +} // namespace DB::ErrorCodes + +namespace DB::DM +{ + +String DMFileVectorIndexReader::PerfStat::toString() const +{ + return fmt::format( + "index_size={} load={:.2f}s{}{}, search={:.2f}s, read_vec_column={:.2f}s", + index_size, + duration_load_index, + has_s3_download ? " (S3)" : "", + has_load_from_file ? " (LoadFile)" : "", + duration_search, + duration_read_vec_column); +} + +std::vector DMFileVectorIndexReader::load() +{ + if (loaded) + return {}; + + loadVectorIndex(); + auto sorted_results = loadVectorSearchResult(); + + perf_stat.selected_nodes = sorted_results.size(); + loaded = true; + return sorted_results; +} + +void DMFileVectorIndexReader::loadVectorIndex() +{ + const auto col_id = ann_query_info->column_id(); + const auto index_id = ann_query_info->index_id() > 0 ? ann_query_info->index_id() : EmptyIndexID; + + RUNTIME_CHECK(dmfile->useMetaV2()); // v3 + + // Check vector index exists on the column + auto vector_index = dmfile->getLocalIndex(col_id, index_id); + RUNTIME_CHECK(vector_index.has_value(), col_id, index_id); + perf_stat.index_size = vector_index->index_bytes(); + + // If local file is invalidated, cache is not valid anymore. So we + // need to ensure file exists on local fs first. + const auto index_file_path = index_id > 0 // + ? dmfile->vectorIndexPath(index_id) // + : dmfile->colIndexPath(DMFile::getFileNameBase(col_id)); + String local_index_file_path; + if (auto s3_file_name = S3::S3FilenameView::fromKeyWithPrefix(index_file_path); s3_file_name.isValid()) + { + // Disaggregated mode + auto * file_cache = FileCache::instance(); + RUNTIME_CHECK_MSG(file_cache, "Must enable S3 file cache to use vector index"); + + Stopwatch watch; + + auto perf_begin = PerfContext::file_cache; + + // If download file failed, retry a few times. + for (auto i = 3; i > 0; --i) + { + try + { + if (auto file_guard = file_cache->downloadFileForLocalRead(s3_file_name, vector_index->index_bytes()); + file_guard) + { + local_index_file_path = file_guard->getLocalFileName(); + break; // Successfully downloaded index into local cache + } + + throw Exception(ErrorCodes::S3_ERROR, "Failed to download vector index file {}", index_file_path); + } + catch (...) + { + if (i <= 1) + throw; + } + } + + if ( // + PerfContext::file_cache.fg_download_from_s3 > perf_begin.fg_download_from_s3 || // + PerfContext::file_cache.fg_wait_download_from_s3 > perf_begin.fg_wait_download_from_s3) + perf_stat.has_s3_download = true; + + auto download_duration = watch.elapsedSeconds(); + perf_stat.duration_load_index += download_duration; + + GET_METRIC(tiflash_vector_index_duration, type_download).Observe(download_duration); + } + else + { + // Not disaggregated mode + local_index_file_path = index_file_path; + } + + auto load_from_file = [&]() { + perf_stat.has_load_from_file = true; + return VectorIndexViewer::view(*vector_index, local_index_file_path); + }; + + Stopwatch watch; + if (vec_index_cache) + // Note: must use local_index_file_path as the cache key, because cache + // will check whether file is still valid and try to remove memory references + // when file is dropped. + vec_index = vec_index_cache->getOrSet(local_index_file_path, load_from_file); + else + vec_index = load_from_file(); + + perf_stat.duration_load_index += watch.elapsedSeconds(); + RUNTIME_CHECK(vec_index != nullptr); + + scan_context->total_vector_idx_load_time_ms += static_cast(perf_stat.duration_load_index * 1000); + if (perf_stat.has_s3_download) + // it could be possible that s3=true but load_from_file=false, it means we download a file + // and then reuse the memory cache. The majority time comes from s3 download + // so we still count it as s3 download. + scan_context->total_vector_idx_load_from_s3++; + else if (perf_stat.has_load_from_file) + scan_context->total_vector_idx_load_from_disk++; + else + scan_context->total_vector_idx_load_from_cache++; +} + +DMFileVectorIndexReader::~DMFileVectorIndexReader() +{ + scan_context->total_vector_idx_read_vec_time_ms += static_cast(perf_stat.duration_read_vec_column * 1000); +} + +String DMFileVectorIndexReader::perfStat() const +{ + return fmt::format( + "{} top_k_[query/visited/discarded/result]={}/{}/{}/{}", + perf_stat.toString(), + ann_query_info->top_k(), + perf_stat.visited_nodes, + perf_stat.discarded_nodes, + perf_stat.selected_nodes); +} + +std::vector DMFileVectorIndexReader::loadVectorSearchResult() +{ + Stopwatch watch; + + auto perf_begin = PerfContext::vector_search; + + RUNTIME_CHECK(valid_rows.size() >= dmfile->getRows(), valid_rows.size(), dmfile->getRows()); + auto sorted_results = vec_index->search(ann_query_info, valid_rows); + std::sort(sorted_results.begin(), sorted_results.end()); + // results must not contain duplicates. Usually there should be no duplicates. + sorted_results.erase(std::unique(sorted_results.begin(), sorted_results.end()), sorted_results.end()); + + perf_stat.discarded_nodes = PerfContext::vector_search.discarded_nodes - perf_begin.discarded_nodes; + perf_stat.visited_nodes = PerfContext::vector_search.visited_nodes - perf_begin.visited_nodes; + + perf_stat.duration_search = watch.elapsedSeconds(); + scan_context->total_vector_idx_search_time_ms += static_cast(perf_stat.duration_search * 1000); + scan_context->total_vector_idx_search_discarded_nodes += perf_stat.discarded_nodes; + scan_context->total_vector_idx_search_visited_nodes += perf_stat.visited_nodes; + + return sorted_results; +} + +void DMFileVectorIndexReader::read( + MutableColumnPtr & vec_column, + const std::span & selected_rows, + size_t start_offset, + size_t column_size) +{ + Stopwatch watch; + RUNTIME_CHECK(loaded); + + vec_column->reserve(column_size); + std::vector value; + size_t current_rowid = start_offset; + for (auto rowid : selected_rows) + { + vec_index->get(rowid, value); + if (rowid > current_rowid) + { + UInt32 nulls = rowid - current_rowid; + // Insert [] if column is Not Null, or NULL if column is Nullable + vec_column->insertManyDefaults(nulls); + } + vec_column->insertData(reinterpret_cast(value.data()), value.size() * sizeof(Float32)); + current_rowid = rowid + 1; + } + if (current_rowid < start_offset + column_size) + { + UInt32 nulls = column_size + start_offset - current_rowid; + vec_column->insertManyDefaults(nulls); + } + perf_stat.duration_read_vec_column += watch.elapsedSeconds(); +} + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexReader.h b/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexReader.h new file mode 100644 index 00000000000..70252eecc4a --- /dev/null +++ b/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexReader.h @@ -0,0 +1,97 @@ +// Copyright 2023 PingCAP, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +#pragma once + +#include +#include +#include +#include + +namespace DB::DM +{ + +class DMFileVectorIndexReader +{ +private: + const DMFilePtr & dmfile; + const ANNQueryInfoPtr & ann_query_info; + const BitmapFilterView valid_rows; + const ScanContextPtr & scan_context; + // Global vector index cache + const VectorIndexCachePtr vec_index_cache; + + // Performance statistics + struct PerfStat + { + double duration_search; + double duration_load_index; + double duration_read_vec_column; + size_t index_size; + size_t visited_nodes; + size_t discarded_nodes; + size_t selected_nodes; + bool has_s3_download; + bool has_load_from_file; + + String toString() const; + }; + PerfStat perf_stat; + + // Set after load(). + VectorIndexViewerPtr vec_index = nullptr; + bool loaded = false; + +public: + DMFileVectorIndexReader( + const ANNQueryInfoPtr & ann_query_info_, + const DMFilePtr & dmfile_, + const BitmapFilterView & valid_rows_, + const ScanContextPtr & scan_context_, + const VectorIndexCachePtr & vec_index_cache_) + : dmfile(dmfile_) + , ann_query_info(ann_query_info_) + , valid_rows(valid_rows_) + , scan_context(scan_context_) + , vec_index_cache(vec_index_cache_) + , perf_stat() + {} + + ~DMFileVectorIndexReader(); + + // Read vector column data and set filter. + // The column will be as same as as the rows of the tiny file, + // but only the rows in sorted_results will be filled, + // others will be filled with default values. + // return the real number of rows read. + void read( + MutableColumnPtr & vec_column, + const std::span & selected_rows, + size_t start_offset, + size_t column_size); + + // Load vector index and search results. + // Return the rowids of the selected rows. + std::vector load(); + + String perfStat() const; + +private: + void loadVectorIndex(); + std::vector loadVectorSearchResult(); +}; + +using DMFileVectorIndexReaderPtr = std::shared_ptr; + +} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexWriter.cpp similarity index 97% rename from dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.cpp rename to dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexWriter.cpp index dd7ee697719..3186193e13a 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexWriter.cpp @@ -17,8 +17,8 @@ #include #include #include -#include #include +#include #include #include #include @@ -40,7 +40,7 @@ extern const char exception_build_local_index_for_file[]; namespace DB::DM { -LocalIndexBuildInfo DMFileIndexWriter::getLocalIndexBuildInfo( +LocalIndexBuildInfo DMFileVectorIndexWriter::getLocalIndexBuildInfo( const LocalIndexInfosSnapshot & index_infos, const DMFiles & dm_files) { @@ -85,7 +85,8 @@ LocalIndexBuildInfo DMFileIndexWriter::getLocalIndexBuildInfo( return build; } -size_t DMFileIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutable, ProceedCheckFn should_proceed) const +size_t DMFileVectorIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutable, ProceedCheckFn should_proceed) + const { const auto column_defines = dm_file_mutable->getColumnDefines(); const auto del_cd_iter = std::find_if(column_defines.cbegin(), column_defines.cend(), [](const ColumnDefine & cd) { @@ -242,7 +243,7 @@ size_t DMFileIndexWriter::buildIndexForFile(const DMFilePtr & dm_file_mutable, P return total_built_index_bytes; } -DMFiles DMFileIndexWriter::build(ProceedCheckFn should_proceed) const +DMFiles DMFileVectorIndexWriter::build(ProceedCheckFn should_proceed) const { RUNTIME_CHECK(!built); // Create a clone of existing DMFile instances by using DMFile::restore, diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.h b/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexWriter.h similarity index 96% rename from dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.h rename to dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexWriter.h index 76727f3eebf..0d7a9d84db0 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileIndexWriter.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileVectorIndexWriter.h @@ -64,7 +64,7 @@ struct LocalIndexBuildInfo } }; -class DMFileIndexWriter +class DMFileVectorIndexWriter { public: static LocalIndexBuildInfo getLocalIndexBuildInfo( @@ -81,7 +81,7 @@ class DMFileIndexWriter using ProceedCheckFn = std::function; - explicit DMFileIndexWriter(const Options & options) + explicit DMFileVectorIndexWriter(const Options & options) : logger(Logger::get()) , options(options) {} diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.cpp b/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.cpp index b2a1dab4266..a6e76f8270e 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.cpp +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.cpp @@ -13,20 +13,8 @@ // limitations under the License. #include -#include -#include -#include #include -#include -#include -#include - - -namespace DB::ErrorCodes -{ -extern const int S3_ERROR; -} // namespace DB::ErrorCodes namespace DB::DM { @@ -34,11 +22,9 @@ namespace DB::DM DMFileWithVectorIndexBlockInputStream::DMFileWithVectorIndexBlockInputStream( const ANNQueryInfoPtr & ann_query_info_, const DMFilePtr & dmfile_, - Block && header_layout_, + Block && header_, DMFileReader && reader_, ColumnDefine && vec_cd_, - const FileProviderPtr & file_provider_, - const ReadLimiterPtr & read_limiter_, const ScanContextPtr & scan_context_, const VectorIndexCachePtr & vec_index_cache_, const BitmapFilterView & valid_rows_, @@ -46,63 +32,38 @@ DMFileWithVectorIndexBlockInputStream::DMFileWithVectorIndexBlockInputStream( : log(Logger::get(tracing_id)) , ann_query_info(ann_query_info_) , dmfile(dmfile_) - , header_layout(std::move(header_layout_)) + , header(std::move(header_)) , reader(std::move(reader_)) , vec_cd(std::move(vec_cd_)) - , file_provider(file_provider_) - , read_limiter(read_limiter_) , scan_context(scan_context_) - , vec_index_cache(vec_index_cache_) - , valid_rows(valid_rows_) -{ - RUNTIME_CHECK(ann_query_info); - RUNTIME_CHECK(vec_cd.id == ann_query_info->column_id()); - for (const auto & cd : reader.read_columns) - { - RUNTIME_CHECK(header_layout.has(cd.name), cd.name); - RUNTIME_CHECK(cd.id != vec_cd.id); - } - RUNTIME_CHECK(header_layout.has(vec_cd.name)); - RUNTIME_CHECK(header_layout.columns() == reader.read_columns.size() + 1); - - // Fill start_offset_to_pack_id - const auto & pack_stats = dmfile->getPackStats(); - start_offset_to_pack_id.reserve(pack_stats.size()); - UInt32 start_offset = 0; - for (size_t pack_id = 0; pack_id < pack_stats.size(); ++pack_id) - { - start_offset_to_pack_id[start_offset] = pack_id; - start_offset += pack_stats[pack_id].rows; - } - - // Fill header - header = toEmptyBlock(reader.read_columns); - addColumnToBlock(header, vec_cd.id, vec_cd.name, vec_cd.type, vec_cd.type->createColumn(), vec_cd.default_value); -} + , vec_index_reader(std::make_shared( + ann_query_info, + dmfile, + valid_rows_, + scan_context, + vec_index_cache_)) +{} DMFileWithVectorIndexBlockInputStream::~DMFileWithVectorIndexBlockInputStream() { - if (!vec_column_reader) - return; - - scan_context->total_vector_idx_read_vec_time_ms += static_cast(duration_read_from_vec_index_seconds * 1000); scan_context->total_vector_idx_read_others_time_ms += static_cast(duration_read_from_other_columns_seconds * 1000); - LOG_DEBUG( // + LOG_DEBUG( log, - "Finished read DMFile with vector index for column dmf_{}/{}(id={}), " - "index_id={} query_top_k={} load_index+result={:.2f}s read_from_index={:.2f}s read_from_others={:.2f}s", + "Finished vector search over column dmf_{}/{}(id={}), index_id={} {} " + "pack_[total/before_search/after_search]={}/{}/{}", dmfile->fileId(), vec_cd.name, vec_cd.id, ann_query_info->index_id(), - ann_query_info->top_k(), - duration_load_vec_index_and_results_seconds, - duration_read_from_vec_index_seconds, - duration_read_from_other_columns_seconds); -} + vec_index_reader->perfStat(), + + dmfile->getPackStats().size(), + valid_packs_before_search, + valid_packs_after_search); +} Block DMFileWithVectorIndexBlockInputStream::read(FilterPtr & res_filter, bool return_filter) { @@ -127,15 +88,18 @@ Block DMFileWithVectorIndexBlockInputStream::readImpl(FilterPtr & res_filter) { load(); - Block res; - if (!reader.read_columns.empty()) - res = readByFollowingOtherColumns(); - else - res = readByIndexReader(); + auto [res, real_rows] = reader.read_columns.empty() ? readByIndexReader() : readByFollowingOtherColumns(); if (!res) return {}; + // If all rows are valid, res_filter is nullptr. + if (real_rows == res.rows()) + { + res_filter = nullptr; + return res; + } + // Assign output filter according to sorted_results. // // For example, if sorted_results is [3, 10], the complete filter array is: @@ -168,7 +132,7 @@ Block DMFileWithVectorIndexBlockInputStream::readImpl(FilterPtr & res_filter) return res; } -Block DMFileWithVectorIndexBlockInputStream::readByIndexReader() +std::tuple DMFileWithVectorIndexBlockInputStream::readByIndexReader() { const auto & pack_stats = dmfile->getPackStats(); size_t all_packs = pack_stats.size(); @@ -182,36 +146,38 @@ Block DMFileWithVectorIndexBlockInputStream::readByIndexReader() if (pack_res[index_reader_next_pack_id].isUse()) break; index_reader_next_row_id += pack_stats[index_reader_next_pack_id].rows; - index_reader_next_pack_id++; + ++index_reader_next_pack_id; } if (index_reader_next_pack_id >= all_packs) // Finished return {}; - auto read_pack_id = index_reader_next_pack_id; auto block_start_row_id = index_reader_next_row_id; - auto read_rows = pack_stats[read_pack_id].rows; - - index_reader_next_pack_id++; - index_reader_next_row_id += read_rows; + while (index_reader_next_pack_id < all_packs) + { + if (!pack_res[index_reader_next_pack_id].isUse()) + break; + index_reader_next_row_id += pack_stats[index_reader_next_pack_id].rows; + ++index_reader_next_pack_id; + } Block block; block.setStartOffset(block_start_row_id); + size_t read_rows = index_reader_next_row_id - block_start_row_id; auto vec_column = vec_cd.type->createColumn(); - vec_column->reserve(read_rows); - Stopwatch w; - vec_column_reader->read(vec_column, read_pack_id, read_rows); - duration_read_from_vec_index_seconds += w.elapsedSeconds(); + auto begin = std::lower_bound(sorted_results.cbegin(), sorted_results.cend(), block_start_row_id); + auto end = std::lower_bound(begin, sorted_results.cend(), index_reader_next_row_id); + const std::span block_selected_rows{begin, end}; + vec_index_reader->read(vec_column, block_selected_rows, block_start_row_id, read_rows); block.insert(ColumnWithTypeAndName{std::move(vec_column), vec_cd.type, vec_cd.name, vec_cd.id}); - - return block; + return {block, block_selected_rows.size()}; } -Block DMFileWithVectorIndexBlockInputStream::readByFollowingOtherColumns() +std::tuple DMFileWithVectorIndexBlockInputStream::readByFollowingOtherColumns() { // First read other columns. Stopwatch w; @@ -225,31 +191,21 @@ Block DMFileWithVectorIndexBlockInputStream::readByFollowingOtherColumns() // Using vec_cd.type to construct a Column directly instead of using // the type from dmfile, so that we don't need extra transforms - // (e.g. wrap with a Nullable). vec_column_reader is compatible with + // (e.g. wrap with a Nullable). vec_index_reader is compatible with // both Nullable and NotNullable. auto vec_column = vec_cd.type->createColumn(); - vec_column->reserve(read_rows); // Then read from vector index for the same pack. - w.restart(); - - vec_column_reader->read(vec_column, getPackIdFromBlock(block_others), read_rows); - duration_read_from_vec_index_seconds += w.elapsedSeconds(); - - // Re-assemble block using the same layout as header_layout. - Block res = header_layout.cloneEmpty(); - // Note: the start offset counts from the beginning of THIS dmfile. It - // is not a global offset. - res.setStartOffset(block_others.startOffset()); - for (const auto & elem : block_others) - { - RUNTIME_CHECK(res.has(elem.name)); - res.getByName(elem.name).column = std::move(elem.column); - } - RUNTIME_CHECK(res.has(vec_cd.name)); - res.getByName(vec_cd.name).column = std::move(vec_column); - - return res; + auto begin = std::lower_bound(sorted_results.cbegin(), sorted_results.cend(), block_others.startOffset()); + auto end = std::lower_bound(begin, sorted_results.cend(), block_others.startOffset() + read_rows); + const std::span block_selected_rows{begin, end}; + vec_index_reader->read(vec_column, block_selected_rows, block_others.startOffset(), read_rows); + + // Re-assemble block using the same layout as header. + // Insert the vector column into the block. + auto index = header.getPositionByName(vec_cd.name); + block_others.insert(index, ColumnWithTypeAndName(std::move(vec_column), vec_cd.type, vec_cd.name)); + return {block_others, block_selected_rows.size()}; } void DMFileWithVectorIndexBlockInputStream::load() @@ -257,183 +213,27 @@ void DMFileWithVectorIndexBlockInputStream::load() if (loaded) return; - Stopwatch w; - - loadVectorIndex(); - loadVectorSearchResult(); - - duration_load_vec_index_and_results_seconds = w.elapsedSeconds(); - - loaded = true; -} - -void DMFileWithVectorIndexBlockInputStream::loadVectorIndex() -{ - bool has_s3_download = false; - bool has_load_from_file = false; - - double duration_load_index = 0; // include download from s3 and load from fs - - const auto col_id = ann_query_info->column_id(); - const auto index_id = ann_query_info->index_id() > 0 ? ann_query_info->index_id() : EmptyIndexID; - - RUNTIME_CHECK(dmfile->useMetaV2()); // v3 - - // Check vector index exists on the column - auto vector_index = dmfile->getLocalIndex(col_id, index_id); - RUNTIME_CHECK(vector_index.has_value(), col_id, index_id); - - // If local file is invalidated, cache is not valid anymore. So we - // need to ensure file exists on local fs first. - const auto index_file_path = index_id > 0 // - ? dmfile->vectorIndexPath(index_id) // - : dmfile->colIndexPath(DMFile::getFileNameBase(col_id)); - String local_index_file_path; - FileSegmentPtr file_guard = nullptr; - if (auto s3_file_name = S3::S3FilenameView::fromKeyWithPrefix(index_file_path); s3_file_name.isValid()) - { - // Disaggregated mode - auto * file_cache = FileCache::instance(); - RUNTIME_CHECK_MSG(file_cache, "Must enable S3 file cache to use vector index"); - - Stopwatch watch; - - auto perf_begin = PerfContext::file_cache; - - // If download file failed, retry a few times. - for (auto i = 3; i > 0; --i) - { - try - { - file_guard = file_cache->downloadFileForLocalRead( // - s3_file_name, - vector_index->index_bytes()); - if (file_guard) - { - local_index_file_path = file_guard->getLocalFileName(); - break; // Successfully downloaded index into local cache - } - - throw Exception( // - ErrorCodes::S3_ERROR, - "Failed to download vector index file {}", - index_file_path); - } - catch (...) - { - if (i <= 1) - throw; - } - } - - if ( // - PerfContext::file_cache.fg_download_from_s3 > perf_begin.fg_download_from_s3 || // - PerfContext::file_cache.fg_wait_download_from_s3 > perf_begin.fg_wait_download_from_s3) - has_s3_download = true; - - auto download_duration = watch.elapsedSeconds(); - duration_load_index += download_duration; - - GET_METRIC(tiflash_vector_index_duration, type_download).Observe(download_duration); - } - else - { - // Not disaggregated mode - local_index_file_path = index_file_path; - } - - auto load_from_file = [&]() { - has_load_from_file = true; - return VectorIndexViewer::view(*vector_index, local_index_file_path); - }; - - Stopwatch watch; - if (vec_index_cache) - // Note: must use local_index_file_path as the cache key, because cache - // will check whether file is still valid and try to remove memory references - // when file is dropped. - vec_index = vec_index_cache->getOrSet(local_index_file_path, load_from_file); - else - vec_index = load_from_file(); - - duration_load_index += watch.elapsedSeconds(); - RUNTIME_CHECK(vec_index != nullptr); - - scan_context->total_vector_idx_load_time_ms += static_cast(duration_load_index * 1000); - if (has_s3_download) - // it could be possible that s3=true but load_from_file=false, it means we download a file - // and then reuse the memory cache. The majority time comes from s3 download - // so we still count it as s3 download. - scan_context->total_vector_idx_load_from_s3++; - else if (has_load_from_file) - scan_context->total_vector_idx_load_from_disk++; - else - scan_context->total_vector_idx_load_from_cache++; - - LOG_DEBUG( // - log, - "Loaded vector index for column dmf_{}/{}(id={}), index_id={} index_size={} kind={} cost={:.2f}s {} {}", - dmfile->fileId(), - vec_cd.name, - vec_cd.id, - vector_index->index_id(), - vector_index->index_bytes(), - vector_index->index_kind(), - duration_load_index, - has_s3_download ? "(S3)" : "", - has_load_from_file ? "(LoadFile)" : ""); -} - -void DMFileWithVectorIndexBlockInputStream::loadVectorSearchResult() -{ - Stopwatch watch; - - auto perf_begin = PerfContext::vector_search; - - RUNTIME_CHECK(valid_rows.size() >= dmfile->getRows(), valid_rows.size(), dmfile->getRows()); - sorted_results = vec_index->search(ann_query_info, valid_rows); - std::sort(sorted_results.begin(), sorted_results.end()); - // results must not contain duplicates. Usually there should be no duplicates. - sorted_results.erase(std::unique(sorted_results.begin(), sorted_results.end()), sorted_results.end()); - - auto discarded_nodes = PerfContext::vector_search.discarded_nodes - perf_begin.discarded_nodes; - auto visited_nodes = PerfContext::vector_search.visited_nodes - perf_begin.visited_nodes; - - double search_duration = watch.elapsedSeconds(); - scan_context->total_vector_idx_search_time_ms += static_cast(search_duration * 1000); - scan_context->total_vector_idx_search_discarded_nodes += discarded_nodes; - scan_context->total_vector_idx_search_visited_nodes += visited_nodes; - - vec_column_reader = std::make_shared(dmfile, vec_index, sorted_results); - + sorted_results = vec_index_reader->load(); // Vector index is very likely to filter out some packs. For example, // if we query for Top 1, then only 1 pack will be remained. So we // update the pack filter used by the DMFileReader to avoid reading // unnecessary data for other columns. - size_t valid_packs_before_search = 0; - size_t valid_packs_after_search = 0; const auto & pack_stats = dmfile->getPackStats(); auto & pack_res = reader.pack_filter.getPackRes(); - size_t results_it = 0; - const size_t results_it_max = sorted_results.size(); - + auto results_it = sorted_results.begin(); UInt32 pack_start = 0; - - for (size_t pack_id = 0, pack_id_max = dmfile->getPacks(); pack_id < pack_id_max; pack_id++) + for (size_t pack_id = 0; pack_id < dmfile->getPacks(); ++pack_id) { if (pack_res[pack_id].isUse()) ++valid_packs_before_search; bool pack_has_result = false; - UInt32 pack_end = pack_start + pack_stats[pack_id].rows; - while (results_it < results_it_max // - && sorted_results[results_it] >= pack_start // - && sorted_results[results_it] < pack_end) + while (results_it != sorted_results.end() && *results_it >= pack_start && *results_it < pack_end) { pack_has_result = true; - results_it++; + ++results_it; } if (!pack_has_result) @@ -445,42 +245,11 @@ void DMFileWithVectorIndexBlockInputStream::loadVectorSearchResult() pack_start = pack_end; } - RUNTIME_CHECK_MSG(results_it == results_it_max, "All packs has been visited but not all results are consumed"); - - LOG_DEBUG( // - log, - "Finished vector search over column dmf_{}/{}(id={}), index_id={} cost={:.3f}s " - "top_k_[query/visited/discarded/result]={}/{}/{}/{} " - "rows_[file/after_search]={}/{} " - "pack_[total/before_search/after_search]={}/{}/{}", + RUNTIME_CHECK_MSG( + results_it == sorted_results.end(), + "All packs has been visited but not all results are consumed"); - dmfile->fileId(), - vec_cd.name, - vec_cd.id, - ann_query_info->index_id(), - search_duration, - - ann_query_info->top_k(), - visited_nodes, // Visited nodes will be larger than query_top_k when there are MVCC rows - discarded_nodes, // How many nodes are skipped by MVCC - sorted_results.size(), - - dmfile->getRows(), - sorted_results.size(), - - pack_stats.size(), - valid_packs_before_search, - valid_packs_after_search); -} - -UInt32 DMFileWithVectorIndexBlockInputStream::getPackIdFromBlock(const Block & block) -{ - // The start offset of a block is ensured to be aligned with the pack. - // This is how we know which pack the block comes from. - auto start_offset = block.startOffset(); - auto it = start_offset_to_pack_id.find(start_offset); - RUNTIME_CHECK(it != start_offset_to_pack_id.end()); - return it->second; + loaded = true; } } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.h b/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.h index 6f82fb293d8..de5ad0f333e 100644 --- a/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.h +++ b/dbms/src/Storages/DeltaMerge/File/DMFileWithVectorIndexBlockInputStream.h @@ -17,8 +17,8 @@ #include #include #include +#include #include -#include #include #include @@ -56,11 +56,9 @@ class DMFileWithVectorIndexBlockInputStream : public SkippableBlockInputStream static DMFileWithVectorIndexBlockInputStreamPtr create( const ANNQueryInfoPtr & ann_query_info, const DMFilePtr & dmfile, - Block && header_layout, + Block && header, DMFileReader && reader, ColumnDefine && vec_cd, - const FileProviderPtr & file_provider, - const ReadLimiterPtr & read_limiter, const ScanContextPtr & scan_context, const VectorIndexCachePtr & vec_index_cache, const BitmapFilterView & valid_rows, @@ -69,11 +67,9 @@ class DMFileWithVectorIndexBlockInputStream : public SkippableBlockInputStream return std::make_shared( ann_query_info, dmfile, - std::move(header_layout), + std::move(header), std::move(reader), std::move(vec_cd), - file_provider, - read_limiter, scan_context, vec_index_cache, valid_rows, @@ -83,11 +79,9 @@ class DMFileWithVectorIndexBlockInputStream : public SkippableBlockInputStream explicit DMFileWithVectorIndexBlockInputStream( const ANNQueryInfoPtr & ann_query_info_, const DMFilePtr & dmfile_, - Block && header_layout_, + Block && header_, DMFileReader && reader_, ColumnDefine && vec_cd_, - const FileProviderPtr & file_provider_, - const ReadLimiterPtr & read_limiter_, const ScanContextPtr & scan_context_, const VectorIndexCachePtr & vec_index_cache_, const BitmapFilterView & valid_rows_, @@ -141,56 +135,42 @@ class DMFileWithVectorIndexBlockInputStream : public SkippableBlockInputStream // Read data totally from the VectorColumnFromIndexReader. This is used // when there is no other column to read. - Block readByIndexReader(); + std::tuple readByIndexReader(); // Read data from other columns first, then read from VectorColumnFromIndexReader. This is used // when there are other columns to read. - Block readByFollowingOtherColumns(); + std::tuple readByFollowingOtherColumns(); private: void load(); - void loadVectorIndex(); - - void loadVectorSearchResult(); - - UInt32 getPackIdFromBlock(const Block & block); - private: const LoggerPtr log; const ANNQueryInfoPtr ann_query_info; const DMFilePtr dmfile; - // The header_layout should contain columns from reader and vec_cd - Block header_layout; + // The header contains columns from reader and vec_cd + Block header; // Vector column should be excluded in the reader DMFileReader reader; // Note: ColumnDefine comes from read path does not have vector_index fields. const ColumnDefine vec_cd; - const FileProviderPtr file_provider; - const ReadLimiterPtr read_limiter; const ScanContextPtr scan_context; - const VectorIndexCachePtr vec_index_cache; - const BitmapFilterView valid_rows; // TODO(vector-index): Currently this does not support ColumnFileBig - - Block header; // Filled in constructor; - - std::unordered_map start_offset_to_pack_id; // Filled from reader in constructor + const DMFileVectorIndexReaderPtr vec_index_reader; // Set after load(). VectorIndexViewerPtr vec_index = nullptr; - // Set after load(). - VectorColumnFromIndexReaderPtr vec_column_reader = nullptr; + // VectorColumnFromIndexReaderPtr vec_column_reader = nullptr; // Set after load(). Used to filter the output rows. std::vector sorted_results{}; // Key is rowid IColumn::Filter filter; bool loaded = false; - double duration_load_vec_index_and_results_seconds = 0; - double duration_read_from_vec_index_seconds = 0; double duration_read_from_other_columns_seconds = 0; + size_t valid_packs_before_search = 0; + size_t valid_packs_after_search = 0; }; } // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader.cpp b/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader.cpp deleted file mode 100644 index 4ac5fe274a1..00000000000 --- a/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader.cpp +++ /dev/null @@ -1,138 +0,0 @@ -// Copyright 2024 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#include -#include - -#include - -namespace DB::DM -{ - -std::vector VectorColumnFromIndexReader::calcPackStartRowID(const DMFileMeta::PackStats & pack_stats) -{ - std::vector pack_start_rowid(pack_stats.size()); - UInt32 rowid = 0; - for (size_t i = 0, i_max = pack_stats.size(); i < i_max; i++) - { - pack_start_rowid[i] = rowid; - rowid += pack_stats[i].rows; - } - return pack_start_rowid; -} - -MutableColumnPtr VectorColumnFromIndexReader::calcResultsByPack( - const std::vector & sorted_results, - const DMFileMeta::PackStats & pack_stats, - const std::vector & pack_start_rowid) -{ - auto column = ColumnArray::create(ColumnUInt32::create()); - -#ifndef NDEBUG - { - const auto sorted = std::is_sorted(sorted_results.begin(), sorted_results.end()); - RUNTIME_CHECK(sorted); - } -#endif - - std::vector offsets_in_pack; - size_t results_it = 0; - const size_t results_it_max = sorted_results.size(); - for (size_t pack_id = 0, pack_id_max = pack_start_rowid.size(); pack_id < pack_id_max; pack_id++) - { - offsets_in_pack.clear(); - - UInt32 pack_start = pack_start_rowid[pack_id]; - UInt32 pack_end = pack_start + pack_stats[pack_id].rows; - - while (results_it < results_it_max // - && sorted_results[results_it] >= pack_start // - && sorted_results[results_it] < pack_end) - { - offsets_in_pack.push_back(sorted_results[results_it] - pack_start); - results_it++; - } - - // insert - column->insertData( - reinterpret_cast(offsets_in_pack.data()), - offsets_in_pack.size() * sizeof(UInt32)); - } - - RUNTIME_CHECK_MSG(results_it == results_it_max, "All packs has been visited but not all results are consumed"); - - return column; -} - -void VectorColumnFromIndexReader::read(MutableColumnPtr & column, size_t start_pack_id, UInt32 read_rows) -{ - std::vector value; - const auto * results_by_pack = checkAndGetColumn(this->results_by_pack.get()); - - size_t pack_id = start_pack_id; - UInt32 remaining_rows_in_pack = pack_stats[pack_id].rows; - - while (read_rows > 0) - { - if (remaining_rows_in_pack == 0) - { - // If this pack is drained but we still need to read more rows, let's read from next pack. - pack_id++; - RUNTIME_CHECK(pack_id < pack_stats.size()); - remaining_rows_in_pack = pack_stats[pack_id].rows; - } - - UInt32 expect_result_rows = std::min(remaining_rows_in_pack, read_rows); - UInt32 filled_result_rows = 0; - - auto offsets_in_pack = results_by_pack->getDataAt(pack_id); - auto offsets_in_pack_n = results_by_pack->sizeAt(pack_id); - RUNTIME_CHECK(offsets_in_pack.size == offsets_in_pack_n * sizeof(UInt32)); - - // Note: offsets_in_pack_n may be 0, means there is no results in this pack. - for (size_t i = 0; i < offsets_in_pack_n; ++i) - { - UInt32 offset_in_pack = reinterpret_cast(offsets_in_pack.data)[i]; - RUNTIME_CHECK(filled_result_rows <= offset_in_pack); - if (offset_in_pack > filled_result_rows) - { - UInt32 nulls = offset_in_pack - filled_result_rows; - // Insert [] if column is Not Null, or NULL if column is Nullable - column->insertManyDefaults(nulls); - filled_result_rows += nulls; - } - RUNTIME_CHECK(filled_result_rows == offset_in_pack); - - // TODO(vector-index): We could fill multiple rows if rowid is continuous. - VectorIndexViewer::Key rowid = pack_start_rowid[pack_id] + offset_in_pack; - index->get(rowid, value); - column->insertData(reinterpret_cast(value.data()), value.size() * sizeof(Float32)); - filled_result_rows++; - } - - if (filled_result_rows < expect_result_rows) - { - size_t nulls = expect_result_rows - filled_result_rows; - // Insert [] if column is Not Null, or NULL if column is Nullable - column->insertManyDefaults(nulls); - filled_result_rows += nulls; - } - - RUNTIME_CHECK(filled_result_rows == expect_result_rows); - remaining_rows_in_pack -= filled_result_rows; - read_rows -= filled_result_rows; - } -} - -} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader.h b/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader.h deleted file mode 100644 index 5fff067dc72..00000000000 --- a/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader.h +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright 2024 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include -#include -#include -#include -#include -#include -#include -#include - -#include - -namespace DB::DM -{ - -/** - * @brief VectorColumnFromIndexReader reads vector column data from the index - * while maintaining the same column layout as if it was read from the DMFile. - * For example, when we want to read vector column data of row id [1, 5, 10], - * this reader will return [NULL, VEC, NULL, NULL, NULL, VEC, ....]. - * - * Note: The term "row id" in this class refers to the row offset in this DMFile. - * It is a file-level row id, not a global row id. - */ -class VectorColumnFromIndexReader -{ -private: - const DMFilePtr dmfile; // Keep a reference of dmfile to keep pack_stats valid. - const DMFileMeta::PackStats & pack_stats; - const std::vector pack_start_rowid; - - const VectorIndexViewerPtr index; - /// results_by_pack[i]=[a,b,c...] means pack[i]'s row offset [a,b,c,...] is contained in the result set. - /// The rowid of a is pack_start_rowid[i]+a. - MutableColumnPtr /* ColumnArray of UInt32 */ results_by_pack; - -private: - static std::vector calcPackStartRowID(const DMFileMeta::PackStats & pack_stats); - - static MutableColumnPtr calcResultsByPack( - const std::vector & results, - const DMFileMeta::PackStats & pack_stats, - const std::vector & pack_start_rowid); - -public: - /// VectorIndex::Key is the offset of the row in the DMFile (file-level row id), - /// including NULLs and delete marks. - explicit VectorColumnFromIndexReader( - const DMFilePtr & dmfile_, - const VectorIndexViewerPtr & index_, - const std::vector & sorted_results_) - : dmfile(dmfile_) - , pack_stats(dmfile_->getPackStats()) - , pack_start_rowid(calcPackStartRowID(pack_stats)) - , index(index_) - , results_by_pack(calcResultsByPack(sorted_results_, pack_stats, pack_start_rowid)) - {} - - void read(MutableColumnPtr & column, size_t start_pack_id, UInt32 read_rows); -}; - -} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader_fwd.h b/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader_fwd.h deleted file mode 100644 index c5fcb54abe6..00000000000 --- a/dbms/src/Storages/DeltaMerge/File/VectorColumnFromIndexReader_fwd.h +++ /dev/null @@ -1,25 +0,0 @@ -// Copyright 2024 PingCAP, Inc. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -#pragma once - -#include - -namespace DB::DM -{ - -class VectorColumnFromIndexReader; -using VectorColumnFromIndexReaderPtr = std::shared_ptr; - -} // namespace DB::DM diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp index 867251091f7..2fc283eb6a8 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_dm_vector_index.cpp @@ -19,7 +19,7 @@ #include #include #include -#include +#include #include #include #include @@ -127,8 +127,8 @@ class VectorIndexDMFileTest DMFilePtr buildIndex(TiDB::VectorIndexDefinition definition) { - auto build_info = DMFileIndexWriter::getLocalIndexBuildInfo(indexInfo(definition), {dm_file}); - DMFileIndexWriter iw(DMFileIndexWriter::Options{ + auto build_info = DMFileVectorIndexWriter::getLocalIndexBuildInfo(indexInfo(definition), {dm_file}); + DMFileVectorIndexWriter iw(DMFileVectorIndexWriter::Options{ .path_pool = path_pool, .index_infos = build_info.indexes_to_build, .dm_files = {dm_file}, @@ -142,8 +142,8 @@ class VectorIndexDMFileTest DMFilePtr buildMultiIndex(const LocalIndexInfosPtr & index_infos) { assert(index_infos != nullptr); - auto build_info = DMFileIndexWriter::getLocalIndexBuildInfo(index_infos, {dm_file}); - DMFileIndexWriter iw(DMFileIndexWriter::Options{ + auto build_info = DMFileVectorIndexWriter::getLocalIndexBuildInfo(index_infos, {dm_file}); + DMFileVectorIndexWriter iw(DMFileVectorIndexWriter::Options{ .path_pool = path_pool, .index_infos = build_info.indexes_to_build, .dm_files = {dm_file}, @@ -1892,10 +1892,10 @@ class VectorIndexSegmentOnS3Test }), }, }); - auto build_info = DMFileIndexWriter::getLocalIndexBuildInfo(index_infos, dm_files); + auto build_info = DMFileVectorIndexWriter::getLocalIndexBuildInfo(index_infos, dm_files); // Build multiple index - DMFileIndexWriter iw(DMFileIndexWriter::Options{ + DMFileVectorIndexWriter iw(DMFileVectorIndexWriter::Options{ .path_pool = storage_path_pool, .index_infos = build_info.indexes_to_build, .dm_files = dm_files, diff --git a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp index 2656f97220e..0f8c77f515d 100644 --- a/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp +++ b/dbms/src/Storages/DeltaMerge/tests/gtest_segment_test_basic.cpp @@ -19,7 +19,7 @@ #include #include #include -#include +#include #include #include #include @@ -782,10 +782,10 @@ bool SegmentTestBasic::ensureSegmentStableLocalIndex(PageIdU64 segment_id, const bool success = false; auto segment = segments[segment_id]; auto dm_files = segment->getStable()->getDMFiles(); - auto build_info = DMFileIndexWriter::getLocalIndexBuildInfo(local_index_infos, dm_files); + auto build_info = DMFileVectorIndexWriter::getLocalIndexBuildInfo(local_index_infos, dm_files); // Build index - DMFileIndexWriter iw(DMFileIndexWriter::Options{ + DMFileVectorIndexWriter iw(DMFileVectorIndexWriter::Options{ .path_pool = storage_path_pool, .index_infos = build_info.indexes_to_build, .dm_files = dm_files,