Skip to content

Commit

Permalink
add AggregateDataEx to support tracking changes or updated for aggr
Browse files Browse the repository at this point in the history
  • Loading branch information
yl-lisen committed Jan 7, 2024
1 parent 9da8a20 commit ddb9432
Show file tree
Hide file tree
Showing 18 changed files with 1,830 additions and 1,416 deletions.
3 changes: 2 additions & 1 deletion src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3256,7 +3256,8 @@ void InterpreterSelectQuery::executeStreamingAggregation(
streaming_group_by,
delta_col_pos,
window_keys_num,
query_info.streaming_window_params);
query_info.streaming_window_params,
data_stream_semantic_pair.isChangelogOutput());

auto merge_threads = max_streams;
auto temporary_data_merge_threads = settings.aggregation_memory_efficient_merge_threads
Expand Down
123 changes: 123 additions & 0 deletions src/Interpreters/Streaming/AggregateDataEx.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
#pragma once

#include <IO/ReadHelpers.h>
#include <IO/WriteHelpers.h>
#include <Common/serde.h>

namespace DB
{
using AggregateDataPtr = char *;
using ConstAggregateDataPtr = const char *;

namespace Streaming
{
SERDE struct UpdatedDataEx
{
static UpdatedDataEx & data(AggregateDataPtr __restrict place) { return *reinterpret_cast<UpdatedDataEx *>(place); }
static const UpdatedDataEx & data(ConstAggregateDataPtr __restrict place) { return *reinterpret_cast<const UpdatedDataEx *>(place); }

static bool isEmpty(ConstAggregateDataPtr __restrict place) { return data(place).final_count == 0; }
static bool isUpdated(ConstAggregateDataPtr __restrict place) { return data(place).updated_since_last_finalization; }
static void resetUpdated(AggregateDataPtr __restrict place) { data(place).updated_since_last_finalization = false; }

static void addBatch(size_t row_begin, size_t row_end, AggregateDataPtr * places, const IColumn * delta_col)
{
if (delta_col == nullptr)
{
for (size_t i = row_begin; i < row_end; ++i)
if (places[i])
data(places[i]).add();
}
else
{
const auto & delta_flags = assert_cast<const ColumnInt8 &>(*delta_col).getData();
for (size_t i = row_begin; i < row_end; ++i)
{
if (places[i])
{
if (delta_flags[i] >= 0)
data(places[i]).add();
else
data(places[i]).negate();
}
}
}
}

static void addBatchSinglePlace(size_t row_begin, size_t row_end, AggregateDataPtr __restrict place, const IColumn * delta_col)
{
if (!place)
return;

auto & metadata = data(place);
if (delta_col == nullptr)
metadata.final_count += row_end - row_begin;
else
{
const auto & delta_flags = assert_cast<const ColumnInt8 &>(*delta_col).getData();
std::accumulate(delta_flags.begin(), delta_flags.end(), metadata.final_count);
}

metadata.updated_since_last_finalization = true;
}

static void serialize(ConstAggregateDataPtr __restrict place, WriteBuffer & wb)
{
const auto & data_ex = data(place);
writeIntBinary(data_ex.final_count, wb);
writeBoolText(data_ex.updated_since_last_finalization, wb);
}

static void deserialize(AggregateDataPtr __restrict place, ReadBuffer & rb)
{
auto & data_ex = data(place);
readIntBinary(data_ex.final_count, rb);
readBoolText(data_ex.updated_since_last_finalization, rb);
}

void add()
{
++final_count;
updated_since_last_finalization = true;
}

void negate()
{
--final_count;
updated_since_last_finalization = true;
}

/// Used for tracking the group is empty or not
UInt32 final_count = 0;

/// Used for tracking the group is updated or not
bool updated_since_last_finalization = true;
};

SERDE struct RetractedDataEx : UpdatedDataEx
{
static AggregateDataPtr & getRetracted(AggregateDataPtr & place) { return reinterpret_cast<RetractedDataEx *>(place)->retracted_data; }
static bool hasRetracted(ConstAggregateDataPtr __restrict place) { return reinterpret_cast<const RetractedDataEx *>(place)->retracted_data; }

template <bool use_retracted_data>
static AggregateDataPtr & getData(AggregateDataPtr & place)
{
if constexpr (use_retracted_data)
return getRetracted(place);
else
return place;
}

/// Used for tracking group changes
AggregateDataPtr retracted_data = nullptr;
};

enum class ExpandedDataType : uint8_t
{
None = 0,
Updated = 1, /// Allow tracking group is empty or updated
UpdatedWithRetracted = 2, /// Allow tracking group is empty or updated and changes
};

}
}
113 changes: 113 additions & 0 deletions src/Interpreters/Streaming/AggregationUtils.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,113 @@
#include <Interpreters/Streaming/AggregationUtils.h>

namespace DB
{

namespace ErrorCodes
{
extern const int LOGICAL_ERROR;
}

namespace Streaming
{
OutputBlockColumns prepareOutputBlockColumns(
const Aggregator::Params & params,
const Aggregator::AggregateFunctionsPlainPtrs & aggregate_functions,
const Block & res_header,
Arenas & aggregates_pools,
bool final,
size_t rows)
{
MutableColumns key_columns(params.keys_size);
MutableColumns aggregate_columns(params.aggregates_size);
MutableColumns final_aggregate_columns(params.aggregates_size);
Aggregator::AggregateColumnsData aggregate_columns_data(params.aggregates_size);

for (size_t i = 0; i < params.keys_size; ++i)
{
key_columns[i] = res_header.safeGetByPosition(i).type->createColumn();
key_columns[i]->reserve(rows);
}

for (size_t i = 0; i < params.aggregates_size; ++i)
{
if (!final)
{
const auto & aggregate_column_name = params.aggregates[i].column_name;
aggregate_columns[i] = res_header.getByName(aggregate_column_name).type->createColumn();

/// The ColumnAggregateFunction column captures the shared ownership of the arena with the aggregate function states.
ColumnAggregateFunction & column_aggregate_func = assert_cast<ColumnAggregateFunction &>(*aggregate_columns[i]);

for (auto & pool : aggregates_pools)
column_aggregate_func.addArena(pool);

aggregate_columns_data[i] = &column_aggregate_func.getData();
aggregate_columns_data[i]->reserve(rows);
}
else
{
final_aggregate_columns[i] = aggregate_functions[i]->getReturnType()->createColumn();
final_aggregate_columns[i]->reserve(rows);

if (aggregate_functions[i]->isState())
{
auto callback = [&](IColumn & subcolumn)
{
/// The ColumnAggregateFunction column captures the shared ownership of the arena with aggregate function states.
if (auto * column_aggregate_func = typeid_cast<ColumnAggregateFunction *>(&subcolumn))
for (auto & pool : aggregates_pools)
column_aggregate_func->addArena(pool);
};

callback(*final_aggregate_columns[i]);
final_aggregate_columns[i]->forEachSubcolumnRecursively(callback);
}
}
}

if (key_columns.size() != params.keys_size)
throw Exception{"Aggregate. Unexpected key columns size.", ErrorCodes::LOGICAL_ERROR};

std::vector<IColumn *> raw_key_columns;
raw_key_columns.reserve(key_columns.size());
for (auto & column : key_columns)
raw_key_columns.push_back(column.get());

return {
.key_columns = std::move(key_columns),
.raw_key_columns = std::move(raw_key_columns),
.aggregate_columns = std::move(aggregate_columns),
.final_aggregate_columns = std::move(final_aggregate_columns),
.aggregate_columns_data = std::move(aggregate_columns_data),
};
}

Block finalizeBlock(const Aggregator::Params & params, const Block & res_header, OutputBlockColumns && out_cols, bool final, size_t rows)
{
auto && [key_columns, raw_key_columns, aggregate_columns, final_aggregate_columns, aggregate_columns_data] = out_cols;

Block res = res_header.cloneEmpty();

for (size_t i = 0; i < params.keys_size; ++i)
res.getByPosition(i).column = std::move(key_columns[i]);

for (size_t i = 0; i < params.aggregates_size; ++i)
{
const auto & aggregate_column_name = params.aggregates[i].column_name;
if (final)
res.getByName(aggregate_column_name).column = std::move(final_aggregate_columns[i]);
else
res.getByName(aggregate_column_name).column = std::move(aggregate_columns[i]);
}

/// Change the size of the columns-constants in the block.
size_t columns = res_header.columns();
for (size_t i = 0; i < columns; ++i)
if (isColumnConst(*res.getByPosition(i).column))
res.getByPosition(i).column = res.getByPosition(i).column->cut(0, rows);

return res;
}
}
}
27 changes: 27 additions & 0 deletions src/Interpreters/Streaming/AggregationUtils.h
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
#pragma once

#include <Interpreters/Streaming/Aggregator.h>

namespace DB::Streaming
{

struct OutputBlockColumns
{
MutableColumns key_columns;
std::vector<IColumn *> raw_key_columns;
MutableColumns aggregate_columns;
MutableColumns final_aggregate_columns;
Aggregator::AggregateColumnsData aggregate_columns_data;
};


OutputBlockColumns prepareOutputBlockColumns(
const Aggregator::Params & params,
const Aggregator::AggregateFunctionsPlainPtrs & aggregate_functions,
const Block & res_header,
Arenas & aggregates_pools,
bool final,
size_t rows);

Block finalizeBlock(const Aggregator::Params & params, const Block & res_header, OutputBlockColumns && out_cols, bool final, size_t rows);
}
Loading

0 comments on commit ddb9432

Please sign in to comment.