From 775ad35a6c84a947493dd62a553ee3594647b774 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lisen=20=E6=9D=A8?= Date: Mon, 29 Jan 2024 19:47:18 +0800 Subject: [PATCH 1/4] support window aggr over global aggr --- src/Interpreters/InterpreterSelectQuery.cpp | 9 --------- 1 file changed, 9 deletions(-) diff --git a/src/Interpreters/InterpreterSelectQuery.cpp b/src/Interpreters/InterpreterSelectQuery.cpp index 49558771789..ae40014e4ba 100644 --- a/src/Interpreters/InterpreterSelectQuery.cpp +++ b/src/Interpreters/InterpreterSelectQuery.cpp @@ -3411,15 +3411,6 @@ void InterpreterSelectQuery::finalCheckAndOptimizeForStreamingQuery() { if (isStreamingQuery()) { - /// Does not allow window func over a global aggregation - if (hasStreamingWindowFunc()) - { - /// nested query - if (auto * proxy = storage->as()) - if (proxy->hasStreamingGlobalAggregation()) - throw Exception("Streaming query doesn't support window func over a global aggregation", ErrorCodes::NOT_IMPLEMENTED); - } - /// For now, for the following scenarios, we disable backfill from historic data store /// 1) User select some virtual columns which is only available in streaming store, like `_tp_sn`, `_tp_index_time` /// 2) Seek by streaming store sequence number From 54d3b9f78979c4ffc5dfda2a017405e28c0f5db8 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lisen=20=E6=9D=A8?= Date: Mon, 29 Jan 2024 19:47:57 +0800 Subject: [PATCH 2/4] fix incorrect processor id --- src/Processors/ProcessorID.h | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/Processors/ProcessorID.h b/src/Processors/ProcessorID.h index f4ffb5fe8c4..e044e22670c 100644 --- a/src/Processors/ProcessorID.h +++ b/src/Processors/ProcessorID.h @@ -115,7 +115,7 @@ enum class ProcessorID : UInt32 ChangelogTransformID = 94, ReplayStreamTransformID = 95, LightShufflingTransformID = 96, - StreamingJoinTransformWithAlignmentID = 44, + StreamingJoinTransformWithAlignmentID = 97, /// Aggregating transform AggregatingInOrderTransformID = 1'000, From f897a6dbad829c994f73b0b791b1eb9aee9ee3c7 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lisen=20=E6=9D=A8?= Date: Mon, 29 Jan 2024 19:48:58 +0800 Subject: [PATCH 3/4] a better impl for ChangelogTransform --- src/Processors/Chunk.h | 16 +++---- .../Streaming/AggregatingHelper.cpp | 2 +- .../Streaming/AggregatingTransform.cpp | 2 +- .../AggregatingTransformWithSubstream.cpp | 2 +- .../Streaming/ChangelogConvertTransform.cpp | 2 +- .../Streaming/ChangelogTransform.cpp | 42 ++++++++----------- .../Transforms/Streaming/JoinTransform.cpp | 4 +- .../Streaming/JoinTransformWithAlignment.cpp | 6 +-- 8 files changed, 34 insertions(+), 42 deletions(-) diff --git a/src/Processors/Chunk.h b/src/Processors/Chunk.h index 5ca7ab22e82..db722742c8e 100644 --- a/src/Processors/Chunk.h +++ b/src/Processors/Chunk.h @@ -29,7 +29,7 @@ struct ChunkContext : public COW static constexpr UInt64 APPEND_TIME_FLAG = 0x2; static constexpr UInt64 HISTORICAL_DATA_START_FLAG = 0x4; static constexpr UInt64 HISTORICAL_DATA_END_FLAG = 0x8; - static constexpr UInt64 RETRACTED_DATA_FLAG = 0x10; + static constexpr UInt64 CONSECUTIVE_DATA_FLAG = 0x10; static constexpr UInt64 AVOID_WATERMARK_FLAG = 0x8000'0000'0000'0000; /// A pair of Int64, flags represent what they mean @@ -74,13 +74,13 @@ struct ChunkContext : public COW } } - ALWAYS_INLINE void setRetractedDataFlag() + ALWAYS_INLINE void setConsecutiveDataFlag() { - flags |= RETRACTED_DATA_FLAG; + flags |= CONSECUTIVE_DATA_FLAG; setAvoidWatermark(); } - ALWAYS_INLINE bool isRetractedData() const { return flags & RETRACTED_DATA_FLAG; } + ALWAYS_INLINE bool isConsecutiveData() const { return flags & CONSECUTIVE_DATA_FLAG; } ALWAYS_INLINE void setAvoidWatermark() { flags |= AVOID_WATERMARK_FLAG; } @@ -280,9 +280,9 @@ class Chunk columns.reserve(num_columns); } - bool isRetractedData() const + bool isConsecutiveData() const { - return chunk_ctx && chunk_ctx->isRetractedData(); + return chunk_ctx && chunk_ctx->isConsecutiveData(); } bool avoidWatermark() const @@ -317,10 +317,10 @@ class Chunk chunk_ctx = std::move(mutate_chunk_ctx); } - void setRetractedDataFlag() + void setConsecutiveDataFlag() { auto mutate_chunk_ctx = chunk_ctx ? ChunkContext::mutate(chunk_ctx) : ChunkContext::create(); - mutate_chunk_ctx->setRetractedDataFlag(); + mutate_chunk_ctx->setConsecutiveDataFlag(); chunk_ctx = std::move(mutate_chunk_ctx); } diff --git a/src/Processors/Transforms/Streaming/AggregatingHelper.cpp b/src/Processors/Transforms/Streaming/AggregatingHelper.cpp index acfbeeefa56..6fa6139d38a 100644 --- a/src/Processors/Transforms/Streaming/AggregatingHelper.cpp +++ b/src/Processors/Transforms/Streaming/AggregatingHelper.cpp @@ -88,7 +88,7 @@ convertToChangelogChunk(AggregatedDataVariants & data, RetractedDataVariants & r { auto retracted_delta_col = ColumnInt8::create(retracted_chunk.rows(), Int8(-1)); retracted_chunk.addColumn(std::move(retracted_delta_col)); - retracted_chunk.setRetractedDataFlag(); + retracted_chunk.setConsecutiveDataFlag(); } auto chunk = convertToChunkImpl(data, params, ConvertAction::StreamingEmit); diff --git a/src/Processors/Transforms/Streaming/AggregatingTransform.cpp b/src/Processors/Transforms/Streaming/AggregatingTransform.cpp index a07bba83a3a..629a047c1f3 100644 --- a/src/Processors/Transforms/Streaming/AggregatingTransform.cpp +++ b/src/Processors/Transforms/Streaming/AggregatingTransform.cpp @@ -241,7 +241,7 @@ void AggregatingTransform::setCurrentChunk(Chunk chunk, Chunk retracted_chunk) if (retracted_chunk.rows()) { current_chunk_retracted = std::move(retracted_chunk); - current_chunk_retracted.setRetractedDataFlag(); + current_chunk_retracted.setConsecutiveDataFlag(); } } diff --git a/src/Processors/Transforms/Streaming/AggregatingTransformWithSubstream.cpp b/src/Processors/Transforms/Streaming/AggregatingTransformWithSubstream.cpp index 56ea6533901..918d1337658 100644 --- a/src/Processors/Transforms/Streaming/AggregatingTransformWithSubstream.cpp +++ b/src/Processors/Transforms/Streaming/AggregatingTransformWithSubstream.cpp @@ -214,7 +214,7 @@ void AggregatingTransformWithSubstream::setCurrentChunk(Chunk chunk, Chunk retra if (retracted_chunk.rows()) { current_chunk_retracted = std::move(retracted_chunk); - current_chunk_retracted.setRetractedDataFlag(); + current_chunk_retracted.setConsecutiveDataFlag(); } } diff --git a/src/Processors/Transforms/Streaming/ChangelogConvertTransform.cpp b/src/Processors/Transforms/Streaming/ChangelogConvertTransform.cpp index a8ec1cde1a5..2b9f8ceb183 100644 --- a/src/Processors/Transforms/Streaming/ChangelogConvertTransform.cpp +++ b/src/Processors/Transforms/Streaming/ChangelogConvertTransform.cpp @@ -321,7 +321,7 @@ void ChangelogConvertTransform::retractAndIndex(size_t rows, const ColumnRawPtrs /// We also can't concat -1 / +1 chunks since downstream join depends on this separation /// behavior meaning depends on a chunk is either all retraction or all append which /// largely simplify its logic and usually more performant - output_chunks.back().setRetractedDataFlag(); + output_chunks.back().setConsecutiveDataFlag(); } /// Composing resulting chunk diff --git a/src/Processors/Transforms/Streaming/ChangelogTransform.cpp b/src/Processors/Transforms/Streaming/ChangelogTransform.cpp index a17899a756a..f6634f86713 100644 --- a/src/Processors/Transforms/Streaming/ChangelogTransform.cpp +++ b/src/Processors/Transforms/Streaming/ChangelogTransform.cpp @@ -140,32 +140,10 @@ void ChangelogTransform::work() } else if (std::all_of(delta_flags.begin(), delta_flags.end(), [](auto delta) { return delta < 0; })) { - input_data.chunk.setRetractedDataFlag(); transformChunk(input_data.chunk); return; } - /// cut every column in chunk_columns and put them into a new chunk - auto cut_cols_into_chunk = [&chunk_columns, this, &delta_flags](UInt64 & start_pos, UInt64 end_pos) { - Chunk chunk_output; - - for (const auto & col : chunk_columns) - chunk_output.addColumn(col->cut(start_pos, end_pos - start_pos)); - - if (delta_flags[start_pos] < 0) - { - /// retract chunk - chunk_output.setRetractedDataFlag(); - this->transformChunk(chunk_output); - } - else - { - /// update chunk - chunk_output.setChunkContext(input_data.chunk.getChunkContext()); - this->transformChunk(chunk_output); - } - }; - /** * @brief Put consecutive data with the same _tp_delta value in a chunk. * For example: if the input chunk delta flags are [1, 1, 1, -1, -1, 1, 1, 1] @@ -175,16 +153,30 @@ void ChangelogTransform::work() * but also ensures that the _tp_delta values in the same chunk are the same. */ UInt64 start_pos = 0; - for (size_t end_pos = 0; end_pos < delta_flags.size(); ++end_pos) + for (size_t end_pos = 1; end_pos < delta_flags.size(); ++end_pos) { if (delta_flags[end_pos] != delta_flags[start_pos]) { - cut_cols_into_chunk(start_pos, end_pos); + Chunk chunk_output; + for (const auto & col : chunk_columns) + chunk_output.addColumn(col->cut(start_pos, end_pos - start_pos)); + + /// consecutive chunk + chunk_output.setConsecutiveDataFlag(); + transformChunk(chunk_output); + + /// set next chunk start pos start_pos = end_pos; } } + /// handle the last part - cut_cols_into_chunk(start_pos, delta_flags.size()); + Chunk chunk_output; + for (const auto & col : chunk_columns) + chunk_output.addColumn(col->cut(start_pos, delta_flags.size() - start_pos)); + + chunk_output.setChunkContext(input_data.chunk.getChunkContext()); + transformChunk(chunk_output); input_data.chunk.clear(); } diff --git a/src/Processors/Transforms/Streaming/JoinTransform.cpp b/src/Processors/Transforms/Streaming/JoinTransform.cpp index 2387b3a641a..f49e3894ae1 100644 --- a/src/Processors/Transforms/Streaming/JoinTransform.cpp +++ b/src/Processors/Transforms/Streaming/JoinTransform.cpp @@ -143,7 +143,7 @@ void JoinTransform::work() required_update_processing_index.reset(); } - else if (input_chunk.isRetractedData()) + else if (input_chunk.isConsecutiveData()) required_update_processing_index = i; if (input_chunk.hasWatermark()) @@ -279,7 +279,7 @@ inline void JoinTransform::joinBidirectionally(Chunks chunks) { /// Don't watermark this block. We can concat retracted / result blocks or use avoid watermarking auto chunk_ctx = ChunkContext::create(); - chunk_ctx->setRetractedDataFlag(); + chunk_ctx->setConsecutiveDataFlag(); output_chunks.emplace_back(retracted_block.getColumns(), retracted_block_rows, nullptr, std::move(chunk_ctx)); } diff --git a/src/Processors/Transforms/Streaming/JoinTransformWithAlignment.cpp b/src/Processors/Transforms/Streaming/JoinTransformWithAlignment.cpp index 0bcc34e0df6..be80727caee 100644 --- a/src/Processors/Transforms/Streaming/JoinTransformWithAlignment.cpp +++ b/src/Processors/Transforms/Streaming/JoinTransformWithAlignment.cpp @@ -388,7 +388,7 @@ void JoinTransformWithAlignment::processLeftInputData(LightChunk & chunk) { /// Don't watermark this block. We can concat retracted / result blocks or use avoid watermarking auto chunk_ctx = ChunkContext::create(); - chunk_ctx->setRetractedDataFlag(); + chunk_ctx->setConsecutiveDataFlag(); output_chunks.emplace_back(retracted_block.getColumns(), retracted_block_rows, nullptr, std::move(chunk_ctx)); } @@ -423,7 +423,7 @@ void JoinTransformWithAlignment::processRightInputData(LightChunk & chunk) { /// Don't watermark this block. We can concat retracted / result blocks or use avoid watermarking auto chunk_ctx = ChunkContext::create(); - chunk_ctx->setRetractedDataFlag(); + chunk_ctx->setConsecutiveDataFlag(); output_chunks.emplace_back(retracted_block.getColumns(), retracted_block_rows, nullptr, std::move(chunk_ctx)); } @@ -480,7 +480,7 @@ void JoinTransformWithAlignment::InputPortWithData::add(Chunk && chunk) /// If the input needs to update data, currently the input is always two consecutive chunks with _tp_delta `-1 and +1` /// So we have to process them together before processing another input /// NOTE: Assume the first retracted chunk of updated data always set RetractedDataFlag. - if (chunk.isRetractedData()) + if (chunk.isConsecutiveData()) { assert(chunk.hasRows()); last_data_ts = DB::MonotonicMilliseconds::now(); From 7ef3218617c9154b37aed27ee6f5d3e1b2f8da29 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Lisen=20=E6=9D=A8?= Date: Mon, 29 Jan 2024 22:42:43 +0800 Subject: [PATCH 4/4] fix --- src/Processors/Transforms/Streaming/ChangelogTransform.cpp | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/src/Processors/Transforms/Streaming/ChangelogTransform.cpp b/src/Processors/Transforms/Streaming/ChangelogTransform.cpp index f6634f86713..c68b74d7b45 100644 --- a/src/Processors/Transforms/Streaming/ChangelogTransform.cpp +++ b/src/Processors/Transforms/Streaming/ChangelogTransform.cpp @@ -140,6 +140,7 @@ void ChangelogTransform::work() } else if (std::all_of(delta_flags.begin(), delta_flags.end(), [](auto delta) { return delta < 0; })) { + input_data.chunk.setConsecutiveDataFlag(); transformChunk(input_data.chunk); return; } @@ -176,6 +177,10 @@ void ChangelogTransform::work() chunk_output.addColumn(col->cut(start_pos, delta_flags.size() - start_pos)); chunk_output.setChunkContext(input_data.chunk.getChunkContext()); + /// FIXME: for now, retracted data always need next consecutive data + if (delta_flags[start_pos] < 0) + chunk_output.setConsecutiveDataFlag(); + transformChunk(chunk_output); input_data.chunk.clear();