Skip to content

Commit

Permalink
Merge remote-tracking branch 'pingcap/master' into test-exchange-comp…
Browse files Browse the repository at this point in the history
…ress
  • Loading branch information
solotzg committed Jan 30, 2023
2 parents 68921ee + 0b1ffce commit e5376a5
Show file tree
Hide file tree
Showing 168 changed files with 6,521 additions and 1,597 deletions.
28 changes: 0 additions & 28 deletions .github/workflows/bug-closed.yml

This file was deleted.

23 changes: 0 additions & 23 deletions .github/workflows/license-checker.yml

This file was deleted.

Empty file removed Jenkinsfile
Empty file.
3 changes: 2 additions & 1 deletion dbms/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2022 PingCAP, Ltd.
# Copyright 2023 PingCAP, Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -62,6 +62,7 @@ add_headers_and_sources(clickhouse_common_io src/IO)
add_headers_and_sources(dbms src/Analyzers)
add_headers_and_sources(dbms src/Core)
add_headers_and_sources(dbms src/DataStreams)
add_headers_and_sources(dbms src/Operators)
add_headers_and_sources(dbms src/DataTypes)
add_headers_and_sources(dbms src/Databases)
add_headers_and_sources(dbms src/Debug)
Expand Down
3 changes: 2 additions & 1 deletion dbms/src/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2022 PingCAP, Ltd.
# Copyright 2023 PingCAP, Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -16,6 +16,7 @@ add_subdirectory (Columns)
add_subdirectory (Common)
add_subdirectory (Core)
add_subdirectory (DataStreams)
add_subdirectory (Operators)
add_subdirectory (DataTypes)
add_subdirectory (Dictionaries)
add_subdirectory (Storages)
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/DataStreams/FormatFactory.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -69,8 +69,8 @@ BlockInputStreamPtr FormatFactory::getInput(
std::forward<decltype(row_stream)>(row_stream),
sample,
max_block_size,
settings.input_format_allow_errors_num,
settings.input_format_allow_errors_ratio);
0,
0);
};

if (name == "Native")
Expand Down
9 changes: 4 additions & 5 deletions dbms/src/DataStreams/LimitBlockInputStream.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -24,8 +24,7 @@ LimitBlockInputStream::LimitBlockInputStream(
size_t limit_,
const String & req_id)
: log(Logger::get(req_id))
, limit_transform_action(input->getHeader(), limit_)

, action(input->getHeader(), limit_)
{
children.push_back(input);
}
Expand All @@ -35,7 +34,7 @@ Block LimitBlockInputStream::readImpl()
{
Block res = children.back()->read();

if (limit_transform_action.transform(res))
if (action.transform(res))
{
return res;
}
Expand All @@ -47,6 +46,6 @@ Block LimitBlockInputStream::readImpl()

void LimitBlockInputStream::appendInfo(FmtBuffer & buffer) const
{
buffer.fmtAppend(", limit = {}", limit_transform_action.getLimit());
buffer.fmtAppend(", limit = {}", action.getLimit());
}
} // namespace DB
4 changes: 2 additions & 2 deletions dbms/src/DataStreams/LimitBlockInputStream.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -46,7 +46,7 @@ class LimitBlockInputStream : public IProfilingBlockInputStream

private:
LoggerPtr log;
LimitTransformAction limit_transform_action;
LocalLimitTransformAction action;
};

} // namespace DB
68 changes: 37 additions & 31 deletions dbms/src/DataStreams/LimitTransformAction.cpp
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -17,51 +17,57 @@

namespace DB
{
LimitTransformAction::LimitTransformAction(
const Block & header_,
size_t limit_)
: header(header_)
, limit(limit_)
namespace
{
}

Block LimitTransformAction::getHeader() const
// Removes all rows outside specified range of Block.
void cut(Block & block, size_t rows [[maybe_unused]], size_t limit, size_t pos)
{
return header;
assert(rows + limit > pos);
size_t pop_back_cnt = pos - limit;
for (auto & col : block)
{
auto mutate_col = (*std::move(col.column)).mutate();
mutate_col->popBack(pop_back_cnt);
col.column = std::move(mutate_col);
}
}
} // namespace

size_t LimitTransformAction::getLimit() const
bool LocalLimitTransformAction::transform(Block & block)
{
return limit;
if (unlikely(!block))
return true;

/// pos - how many lines were read, including the last read block
if (pos >= limit)
return false;

auto rows = block.rows();
pos += rows;
if (pos > limit)
cut(block, rows, limit, pos);
// for pos <= limit, give away the whole block
return true;
}

bool LimitTransformAction::transform(Block & block)
bool GlobalLimitTransformAction::transform(Block & block)
{
if (unlikely(!block))
return true;

/// pos - how many lines were read, including the last read block
if (pos >= limit)
{
return false;
}

auto rows = block.rows();
pos += rows;
if (pos >= rows && pos <= limit)
{
// give away the whole block
return true;
}
else
{
// pos > limit
// give away a piece of the block
assert(rows + limit > pos);
size_t length = rows + limit - pos;
for (size_t i = 0; i < block.columns(); ++i)
block.safeGetByPosition(i).column = block.safeGetByPosition(i).column->cut(0, length);
return true;
}
size_t prev_pos = pos.fetch_add(rows);
if (prev_pos >= limit)
return false;

size_t cur_pos = prev_pos + rows;
if (cur_pos > limit)
cut(block, rows, limit, cur_pos);
// for pos <= limit, give away the whole block
return true;
}
} // namespace DB
48 changes: 40 additions & 8 deletions dbms/src/DataStreams/LimitTransformAction.h
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand All @@ -15,22 +15,54 @@

#include <Core/Block.h>

#include <atomic>
#include <memory>

namespace DB
{
struct LimitTransformAction
struct LocalLimitTransformAction
{
public:
LimitTransformAction(
LocalLimitTransformAction(
const Block & header_,
size_t limit_);
size_t limit_)
: header(header_)
, limit(limit_)
{
}

bool transform(Block & block);
Block getHeader() const;
size_t getLimit() const;

Block getHeader() const { return header; }
size_t getLimit() const { return limit; }

private:
Block header;
size_t limit;
const Block header;
const size_t limit;
size_t pos = 0;
};

struct GlobalLimitTransformAction
{
public:
GlobalLimitTransformAction(
const Block & header_,
size_t limit_)
: header(header_)
, limit(limit_)
{
}

bool transform(Block & block);

Block getHeader() const { return header; }
size_t getLimit() const { return limit; }

private:
const Block header;
const size_t limit;
std::atomic_size_t pos{0};
};

using GlobalLimitPtr = std::shared_ptr<GlobalLimitTransformAction>;
} // namespace DB
2 changes: 0 additions & 2 deletions dbms/src/DataStreams/PushingToViewsBlockOutputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,6 @@ PushingToViewsBlockOutputStream::PushingToViewsBlockOutputStream(
if (!dependencies.empty())
{
views_context = std::make_unique<Context>(context);
// Do not deduplicate insertions into MV if the main insertion is Ok
views_context->getSettingsRef().insert_deduplicate = false;
}

for (const auto & database_table : dependencies)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
// Copyright 2022 PingCAP, Ltd.
// Copyright 2023 PingCAP, Ltd.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
Expand Down
8 changes: 4 additions & 4 deletions dbms/src/Debug/MockStorage.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,7 @@ void MockStorage::addTableDataForDeltaMerge(Context & context, const String & na
}
}

BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int64 table_id, const PushDownFilter * push_down_filter)
BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int64 table_id, const FilterConditions * filter_conditions)
{
assert(tableExistsForDeltaMerge(table_id));
auto storage = storage_delta_merge_map[table_id];
Expand All @@ -154,15 +154,15 @@ BlockInputStreamPtr MockStorage::getStreamFromDeltaMerge(Context & context, Int6
SelectQueryInfo query_info;
query_info.query = std::make_shared<ASTSelectQuery>();
query_info.mvcc_query_info = std::make_unique<MvccQueryInfo>(context.getSettingsRef().resolve_locks, std::numeric_limits<UInt64>::max(), scan_context);
if (push_down_filter && push_down_filter->hasValue())
if (filter_conditions && filter_conditions->hasValue())
{
auto analyzer = std::make_unique<DAGExpressionAnalyzer>(names_and_types_map_for_delta_merge[table_id], context);
query_info.dag_query = std::make_unique<DAGQueryInfo>(
push_down_filter->conditions,
filter_conditions->conditions,
analyzer->getPreparedSets(),
analyzer->getCurrentInputColumns(),
context.getTimezoneInfo());
auto [before_where, filter_column_name, project_after_where] = ::DB::buildPushDownFilter(*push_down_filter, *analyzer);
auto [before_where, filter_column_name, project_after_where] = ::DB::buildPushDownFilter(*filter_conditions, *analyzer);
BlockInputStreams ins = storage->read(column_names, query_info, context, stage, 8192, 1); // TODO: Support config max_block_size and num_streams
// TODO: set num_streams, then ins.size() != 1
BlockInputStreamPtr in = ins[0];
Expand Down
4 changes: 2 additions & 2 deletions dbms/src/Debug/MockStorage.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@
#pragma once
#include <Core/ColumnsWithTypeAndName.h>
#include <DataStreams/IBlockInputStream.h>
#include <Flash/Coprocessor/PushDownFilter.h>
#include <Flash/Coprocessor/FilterConditions.h>
#include <Flash/Coprocessor/TiDBTableScan.h>
#include <Storages/Transaction/TiDB.h>
#include <common/types.h>
Expand Down Expand Up @@ -82,7 +82,7 @@ class MockStorage

NamesAndTypes getNameAndTypesForDeltaMerge(Int64 table_id);

BlockInputStreamPtr getStreamFromDeltaMerge(Context & context, Int64 table_id, const PushDownFilter * push_down_filter = nullptr);
BlockInputStreamPtr getStreamFromDeltaMerge(Context & context, Int64 table_id, const FilterConditions * filter_conditions = nullptr);

bool tableExistsForDeltaMerge(Int64 table_id);

Expand Down
16 changes: 14 additions & 2 deletions dbms/src/Flash/CMakeLists.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
# Copyright 2022 PingCAP, Ltd.
# Copyright 2023 PingCAP, Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
Expand All @@ -18,8 +18,14 @@ add_headers_and_sources(flash_service .)
add_headers_and_sources(flash_service ./Coprocessor)
add_headers_and_sources(flash_service ./Mpp)
add_headers_and_sources(flash_service ./Executor)
add_headers_and_sources(flash_service ./Pipeline)
add_headers_and_sources(flash_service ./Pipeline/Exec)
add_headers_and_sources(flash_service ./Pipeline/Schedule)
add_headers_and_sources(flash_service ./Pipeline/Schedule/Events)
add_headers_and_sources(flash_service ./Pipeline/Schedule/Tasks)
add_headers_and_sources(flash_service ./Pipeline/Schedule/TaskQueues)
add_headers_and_sources(flash_service ./Planner)
add_headers_and_sources(flash_service ./Planner/plans)
add_headers_and_sources(flash_service ./Planner/Plans)
add_headers_and_sources(flash_service ./Statistics)
add_headers_and_sources(flash_service ./Management)

Expand All @@ -28,6 +34,12 @@ target_link_libraries(flash_service dbms)

if (ENABLE_TESTS)
add_subdirectory(Coprocessor/tests)
add_subdirectory(Executor/tests)
add_subdirectory(Pipeline/tests)
add_subdirectory(Pipeline/Exec/tests)
add_subdirectory(Pipeline/Schedule/tests)
add_subdirectory(Pipeline/Schedule/Events/tests)
add_subdirectory(Pipeline/Schedule/TaskQueues/tests)
add_subdirectory(Planner/tests)
add_subdirectory(tests)
endif ()
Loading

0 comments on commit e5376a5

Please sign in to comment.