Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

some minor supports and fixes #529

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 0 additions & 9 deletions src/Interpreters/InterpreterSelectQuery.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -3411,15 +3411,6 @@ void InterpreterSelectQuery::finalCheckAndOptimizeForStreamingQuery()
{
if (isStreamingQuery())
{
/// Does not allow window func over a global aggregation
if (hasStreamingWindowFunc())
{
/// nested query
if (auto * proxy = storage->as<Streaming::ProxyStream>())
if (proxy->hasStreamingGlobalAggregation())
throw Exception("Streaming query doesn't support window func over a global aggregation", ErrorCodes::NOT_IMPLEMENTED);
}

/// For now, for the following scenarios, we disable backfill from historic data store
/// 1) User select some virtual columns which is only available in streaming store, like `_tp_sn`, `_tp_index_time`
/// 2) Seek by streaming store sequence number
Expand Down
16 changes: 8 additions & 8 deletions src/Processors/Chunk.h
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ struct ChunkContext : public COW<ChunkContext>
static constexpr UInt64 APPEND_TIME_FLAG = 0x2;
static constexpr UInt64 HISTORICAL_DATA_START_FLAG = 0x4;
static constexpr UInt64 HISTORICAL_DATA_END_FLAG = 0x8;
static constexpr UInt64 RETRACTED_DATA_FLAG = 0x10;
static constexpr UInt64 CONSECUTIVE_DATA_FLAG = 0x10;
static constexpr UInt64 AVOID_WATERMARK_FLAG = 0x8000'0000'0000'0000;

/// A pair of Int64, flags represent what they mean
Expand Down Expand Up @@ -74,13 +74,13 @@ struct ChunkContext : public COW<ChunkContext>
}
}

ALWAYS_INLINE void setRetractedDataFlag()
ALWAYS_INLINE void setConsecutiveDataFlag()
{
flags |= RETRACTED_DATA_FLAG;
flags |= CONSECUTIVE_DATA_FLAG;
setAvoidWatermark();
}

ALWAYS_INLINE bool isRetractedData() const { return flags & RETRACTED_DATA_FLAG; }
ALWAYS_INLINE bool isConsecutiveData() const { return flags & CONSECUTIVE_DATA_FLAG; }

ALWAYS_INLINE void setAvoidWatermark() { flags |= AVOID_WATERMARK_FLAG; }

Expand Down Expand Up @@ -280,9 +280,9 @@ class Chunk
columns.reserve(num_columns);
}

bool isRetractedData() const
bool isConsecutiveData() const
{
return chunk_ctx && chunk_ctx->isRetractedData();
return chunk_ctx && chunk_ctx->isConsecutiveData();
}

bool avoidWatermark() const
Expand Down Expand Up @@ -317,10 +317,10 @@ class Chunk
chunk_ctx = std::move(mutate_chunk_ctx);
}

void setRetractedDataFlag()
void setConsecutiveDataFlag()
{
auto mutate_chunk_ctx = chunk_ctx ? ChunkContext::mutate(chunk_ctx) : ChunkContext::create();
mutate_chunk_ctx->setRetractedDataFlag();
mutate_chunk_ctx->setConsecutiveDataFlag();
chunk_ctx = std::move(mutate_chunk_ctx);
}

Expand Down
2 changes: 1 addition & 1 deletion src/Processors/ProcessorID.h
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ enum class ProcessorID : UInt32
ChangelogTransformID = 94,
ReplayStreamTransformID = 95,
LightShufflingTransformID = 96,
StreamingJoinTransformWithAlignmentID = 44,
StreamingJoinTransformWithAlignmentID = 97,

/// Aggregating transform
AggregatingInOrderTransformID = 1'000,
Expand Down
2 changes: 1 addition & 1 deletion src/Processors/Transforms/Streaming/AggregatingHelper.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,7 @@ convertToChangelogChunk(AggregatedDataVariants & data, RetractedDataVariants & r
{
auto retracted_delta_col = ColumnInt8::create(retracted_chunk.rows(), Int8(-1));
retracted_chunk.addColumn(std::move(retracted_delta_col));
retracted_chunk.setRetractedDataFlag();
retracted_chunk.setConsecutiveDataFlag();
}

auto chunk = convertToChunkImpl(data, params, ConvertAction::StreamingEmit);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,7 +241,7 @@ void AggregatingTransform::setCurrentChunk(Chunk chunk, Chunk retracted_chunk)
if (retracted_chunk.rows())
{
current_chunk_retracted = std::move(retracted_chunk);
current_chunk_retracted.setRetractedDataFlag();
current_chunk_retracted.setConsecutiveDataFlag();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ void AggregatingTransformWithSubstream::setCurrentChunk(Chunk chunk, Chunk retra
if (retracted_chunk.rows())
{
current_chunk_retracted = std::move(retracted_chunk);
current_chunk_retracted.setRetractedDataFlag();
current_chunk_retracted.setConsecutiveDataFlag();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -321,7 +321,7 @@ void ChangelogConvertTransform::retractAndIndex(size_t rows, const ColumnRawPtrs
/// We also can't concat -1 / +1 chunks since downstream join depends on this separation
/// behavior meaning depends on a chunk is either all retraction or all append which
/// largely simplify its logic and usually more performant
output_chunks.back().setRetractedDataFlag();
output_chunks.back().setConsecutiveDataFlag();
}

/// Composing resulting chunk
Expand Down
47 changes: 22 additions & 25 deletions src/Processors/Transforms/Streaming/ChangelogTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -140,32 +140,11 @@ void ChangelogTransform::work()
}
else if (std::all_of(delta_flags.begin(), delta_flags.end(), [](auto delta) { return delta < 0; }))
{
input_data.chunk.setRetractedDataFlag();
input_data.chunk.setConsecutiveDataFlag();
transformChunk(input_data.chunk);
return;
}

/// cut every column in chunk_columns and put them into a new chunk
auto cut_cols_into_chunk = [&chunk_columns, this, &delta_flags](UInt64 & start_pos, UInt64 end_pos) {
Chunk chunk_output;

for (const auto & col : chunk_columns)
chunk_output.addColumn(col->cut(start_pos, end_pos - start_pos));

if (delta_flags[start_pos] < 0)
{
/// retract chunk
chunk_output.setRetractedDataFlag();
this->transformChunk(chunk_output);
}
else
{
/// update chunk
chunk_output.setChunkContext(input_data.chunk.getChunkContext());
this->transformChunk(chunk_output);
}
};

/**
* @brief Put consecutive data with the same _tp_delta value in a chunk.
* For example: if the input chunk delta flags are [1, 1, 1, -1, -1, 1, 1, 1]
Expand All @@ -175,16 +154,34 @@ void ChangelogTransform::work()
* but also ensures that the _tp_delta values in the same chunk are the same.
*/
UInt64 start_pos = 0;
for (size_t end_pos = 0; end_pos < delta_flags.size(); ++end_pos)
for (size_t end_pos = 1; end_pos < delta_flags.size(); ++end_pos)
{
if (delta_flags[end_pos] != delta_flags[start_pos])
{
cut_cols_into_chunk(start_pos, end_pos);
Chunk chunk_output;
for (const auto & col : chunk_columns)
chunk_output.addColumn(col->cut(start_pos, end_pos - start_pos));

/// consecutive chunk
chunk_output.setConsecutiveDataFlag();
transformChunk(chunk_output);

/// set next chunk start pos
start_pos = end_pos;
}
}

/// handle the last part
cut_cols_into_chunk(start_pos, delta_flags.size());
Chunk chunk_output;
for (const auto & col : chunk_columns)
chunk_output.addColumn(col->cut(start_pos, delta_flags.size() - start_pos));

chunk_output.setChunkContext(input_data.chunk.getChunkContext());
/// FIXME: for now, retracted data always need next consecutive data
if (delta_flags[start_pos] < 0)
chunk_output.setConsecutiveDataFlag();

transformChunk(chunk_output);

input_data.chunk.clear();
}
Expand Down
4 changes: 2 additions & 2 deletions src/Processors/Transforms/Streaming/JoinTransform.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ void JoinTransform::work()

required_update_processing_index.reset();
}
else if (input_chunk.isRetractedData())
else if (input_chunk.isConsecutiveData())
required_update_processing_index = i;

if (input_chunk.hasWatermark())
Expand Down Expand Up @@ -279,7 +279,7 @@ inline void JoinTransform::joinBidirectionally(Chunks chunks)
{
/// Don't watermark this block. We can concat retracted / result blocks or use avoid watermarking
auto chunk_ctx = ChunkContext::create();
chunk_ctx->setRetractedDataFlag();
chunk_ctx->setConsecutiveDataFlag();
output_chunks.emplace_back(retracted_block.getColumns(), retracted_block_rows, nullptr, std::move(chunk_ctx));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -388,7 +388,7 @@ void JoinTransformWithAlignment::processLeftInputData(LightChunk & chunk)
{
/// Don't watermark this block. We can concat retracted / result blocks or use avoid watermarking
auto chunk_ctx = ChunkContext::create();
chunk_ctx->setRetractedDataFlag();
chunk_ctx->setConsecutiveDataFlag();
output_chunks.emplace_back(retracted_block.getColumns(), retracted_block_rows, nullptr, std::move(chunk_ctx));
}

Expand Down Expand Up @@ -423,7 +423,7 @@ void JoinTransformWithAlignment::processRightInputData(LightChunk & chunk)
{
/// Don't watermark this block. We can concat retracted / result blocks or use avoid watermarking
auto chunk_ctx = ChunkContext::create();
chunk_ctx->setRetractedDataFlag();
chunk_ctx->setConsecutiveDataFlag();
output_chunks.emplace_back(retracted_block.getColumns(), retracted_block_rows, nullptr, std::move(chunk_ctx));
}

Expand Down Expand Up @@ -480,7 +480,7 @@ void JoinTransformWithAlignment::InputPortWithData::add(Chunk && chunk)
/// If the input needs to update data, currently the input is always two consecutive chunks with _tp_delta `-1 and +1`
/// So we have to process them together before processing another input
/// NOTE: Assume the first retracted chunk of updated data always set RetractedDataFlag.
if (chunk.isRetractedData())
if (chunk.isConsecutiveData())
{
assert(chunk.hasRows());
last_data_ts = DB::MonotonicMilliseconds::now();
Expand Down
Loading