From 749581715f0801ead08cecda74e9b5e063e66abd Mon Sep 17 00:00:00 2001 From: zhyass Date: Tue, 19 Sep 2023 01:03:31 +0800 Subject: [PATCH] feat: distributed execution of compact statement (#12750) * compact distribute * remove unused codes * update test case * add sqllogic test * fix test --------- Co-authored-by: dantengsky --- src/query/catalog/src/table.rs | 23 +- .../transform_accumulating_async.rs | 1 + .../src/interpreters/interpreter_delete.rs | 11 +- .../interpreters/interpreter_merge_into.rs | 9 +- .../src/interpreters/interpreter_replace.rs | 19 +- .../interpreter_table_optimize.rs | 88 ++- .../service/src/pipelines/pipeline_builder.rs | 55 +- .../src/schedulers/fragments/fragmenter.rs | 11 + .../src/schedulers/fragments/plan_fragment.rs | 67 ++- .../mutation/block_compact_mutator.rs | 89 +-- src/query/sql/src/evaluator/block_operator.rs | 2 +- src/query/sql/src/executor/format.rs | 54 +- src/query/sql/src/executor/physical_plan.rs | 49 +- .../sql/src/executor/physical_plan_display.rs | 46 +- .../sql/src/executor/physical_plan_visitor.rs | 96 ++-- src/query/sql/src/executor/profile.rs | 6 +- src/query/sql/src/planner/plans/plan.rs | 54 +- .../common/table-meta/src/meta/format.rs | 4 +- .../common/table-meta/src/meta/v4/segment.rs | 4 +- src/query/storages/fuse/src/fuse_table.rs | 16 +- .../src/operations/common/mutation_log.rs | 18 +- .../transform_mutation_aggregator.rs | 194 ++++--- .../processors/transform_serialize_block.rs | 54 +- .../operations/common/snapshot_generator.rs | 39 -- .../storages/fuse/src/operations/compact.rs | 216 ++++---- .../storages/fuse/src/operations/delete.rs | 36 +- .../transform_matched_mutation_aggregator.rs | 2 +- src/query/storages/fuse/src/operations/mod.rs | 1 + .../mutation/compact/block_compact_mutator.rs | 506 ++++++++++++------ .../mutation/compact/compact_aggregator.rs | 184 ------- .../mutation/compact/compact_part.rs | 109 +++- .../mutation/compact/compact_source.rs | 62 ++- .../src/operations/mutation/compact/mod.rs | 5 +- .../fuse/src/operations/mutation/mod.rs | 6 +- .../src/operations/mutation/mutation_meta.rs | 48 +- .../src/operations/mutation/mutation_part.rs | 76 +-- .../operations/mutation/mutation_source.rs | 33 +- .../operations/mutation/recluster_mutator.rs | 23 +- .../storages/fuse/src/operations/replace.rs | 55 +- ...ransform_merge_into_mutation_aggregator.rs | 2 +- .../storages/fuse/src/operations/update.rs | 2 +- .../storages/fuse/src/pruning/fuse_pruner.rs | 28 +- src/query/storages/fuse/src/pruning/mod.rs | 1 - .../09_0008_fuse_optimize_table | 11 +- .../09_0016_remote_alter_recluster | 2 +- .../mode/cluster/distributed_compact.sql | 122 +++++ 46 files changed, 1434 insertions(+), 1105 deletions(-) delete mode 100644 src/query/storages/fuse/src/operations/mutation/compact/compact_aggregator.rs create mode 100644 tests/sqllogictests/suites/mode/cluster/distributed_compact.sql diff --git a/src/query/catalog/src/table.rs b/src/query/catalog/src/table.rs index abf117a5149a8..6a59816a15bd3 100644 --- a/src/query/catalog/src/table.rs +++ b/src/query/catalog/src/table.rs @@ -36,6 +36,7 @@ use common_meta_types::MetaId; use common_pipeline_core::Pipeline; use common_storage::StorageMetrics; use storages_common_table_meta::meta::SnapshotId; +use storages_common_table_meta::meta::TableSnapshot; use crate::plan::DataSourceInfo; use crate::plan::DataSourcePlan; @@ -300,16 +301,28 @@ pub trait Table: Sync + Send { unimplemented!() } - // return false if the table does not need to be compacted. #[async_backtrace::framed] - async fn compact( + async fn compact_segments( &self, ctx: Arc, - target: CompactTarget, limit: Option, - pipeline: &mut Pipeline, ) -> Result<()> { - let (_, _, _, _) = (ctx, target, limit, pipeline); + let (_, _) = (ctx, limit); + + Err(ErrorCode::Unimplemented(format!( + "table {}, of engine type {}, does not support compact segments", + self.name(), + self.get_table_info().engine(), + ))) + } + + #[async_backtrace::framed] + async fn compact_blocks( + &self, + ctx: Arc, + limit: Option, + ) -> Result)>> { + let (_, _) = (ctx, limit); Err(ErrorCode::Unimplemented(format!( "table {}, of engine type {}, does not support compact", diff --git a/src/query/pipeline/transforms/src/processors/transforms/transform_accumulating_async.rs b/src/query/pipeline/transforms/src/processors/transforms/transform_accumulating_async.rs index 209474572cce8..951633a4471cc 100644 --- a/src/query/pipeline/transforms/src/processors/transforms/transform_accumulating_async.rs +++ b/src/query/pipeline/transforms/src/processors/transforms/transform_accumulating_async.rs @@ -21,6 +21,7 @@ use common_pipeline_core::processors::port::InputPort; use common_pipeline_core::processors::port::OutputPort; use common_pipeline_core::processors::processor::Event; use common_pipeline_core::processors::Processor; + #[async_trait::async_trait] pub trait AsyncAccumulatingTransform: Send { const NAME: &'static str; diff --git a/src/query/service/src/interpreters/interpreter_delete.rs b/src/query/service/src/interpreters/interpreter_delete.rs index 46a3f221c49be..ff7013d260c72 100644 --- a/src/query/service/src/interpreters/interpreter_delete.rs +++ b/src/query/service/src/interpreters/interpreter_delete.rs @@ -69,7 +69,8 @@ use crate::schedulers::build_query_pipeline; use crate::schedulers::build_query_pipeline_without_render_result_set; use crate::sessions::QueryContext; use crate::sessions::TableContext; -use crate::sql::executor::FinalCommit; +use crate::sql::executor::CommitSink; +use crate::sql::executor::MutationKind; use crate::sql::plans::DeletePlan; use crate::stream::PullingExecutorStream; @@ -275,7 +276,7 @@ impl DeleteInterpreter { partitions: Partitions, table_info: TableInfo, col_indices: Vec, - snapshot: TableSnapshot, + snapshot: Arc, catalog_info: CatalogInfo, is_distributed: bool, query_row_id_col: bool, @@ -300,12 +301,14 @@ impl DeleteInterpreter { }); } - Ok(PhysicalPlan::FinalCommit(Box::new(FinalCommit { + Ok(PhysicalPlan::CommitSink(CommitSink { input: Box::new(root), snapshot, table_info, catalog_info, - }))) + mutation_kind: MutationKind::Delete, + merge_meta: true, + })) } } diff --git a/src/query/service/src/interpreters/interpreter_merge_into.rs b/src/query/service/src/interpreters/interpreter_merge_into.rs index 408554fe0b5d0..845f3589056dd 100644 --- a/src/query/service/src/interpreters/interpreter_merge_into.rs +++ b/src/query/service/src/interpreters/interpreter_merge_into.rs @@ -24,9 +24,9 @@ use common_expression::FieldIndex; use common_expression::RemoteExpr; use common_functions::BUILTIN_FUNCTIONS; use common_meta_app::schema::TableInfo; +use common_sql::executor::CommitSink; use common_sql::executor::MergeInto; use common_sql::executor::MergeIntoSource; -use common_sql::executor::MutationAggregate; use common_sql::executor::MutationKind; use common_sql::executor::PhysicalPlan; use common_sql::executor::PhysicalPlanBuilder; @@ -297,14 +297,15 @@ impl MergeIntoInterpreter { }); // build mutation_aggregate - let physical_plan = PhysicalPlan::MutationAggregate(Box::new(MutationAggregate { + let physical_plan = PhysicalPlan::CommitSink(CommitSink { input: Box::new(merge_into), - snapshot: (*base_snapshot).clone(), + snapshot: base_snapshot, table_info: table_info.clone(), catalog_info: catalog_.info(), // let's use update first, we will do some optimizeations and select exact strategy mutation_kind: MutationKind::Update, - })); + merge_meta: false, + }); Ok((physical_plan, table_info.clone())) } diff --git a/src/query/service/src/interpreters/interpreter_replace.rs b/src/query/service/src/interpreters/interpreter_replace.rs index e4dd0d888970b..f21838b6d2612 100644 --- a/src/query/service/src/interpreters/interpreter_replace.rs +++ b/src/query/service/src/interpreters/interpreter_replace.rs @@ -21,9 +21,9 @@ use common_exception::Result; use common_expression::DataSchemaRef; use common_meta_app::principal::StageInfo; use common_sql::executor::AsyncSourcerPlan; +use common_sql::executor::CommitSink; use common_sql::executor::Deduplicate; use common_sql::executor::Exchange; -use common_sql::executor::MutationAggregate; use common_sql::executor::MutationKind; use common_sql::executor::OnConflictField; use common_sql::executor::PhysicalPlan; @@ -238,15 +238,14 @@ impl ReplaceInterpreter { ignore_exchange: false, })); } - root = Box::new(PhysicalPlan::MutationAggregate(Box::new( - MutationAggregate { - input: root, - snapshot: (*base_snapshot).clone(), - table_info: table_info.clone(), - catalog_info: catalog.info(), - mutation_kind: MutationKind::Replace, - }, - ))); + root = Box::new(PhysicalPlan::CommitSink(CommitSink { + input: root, + snapshot: base_snapshot, + table_info: table_info.clone(), + catalog_info: catalog.info(), + mutation_kind: MutationKind::Replace, + merge_meta: false, + })); Ok((root, purge_info)) } diff --git a/src/query/service/src/interpreters/interpreter_table_optimize.rs b/src/query/service/src/interpreters/interpreter_table_optimize.rs index 29330016149bc..b1e091d4bb059 100644 --- a/src/query/service/src/interpreters/interpreter_table_optimize.rs +++ b/src/query/service/src/interpreters/interpreter_table_optimize.rs @@ -16,20 +16,31 @@ use std::sync::Arc; use std::time::SystemTime; use common_base::runtime::GlobalIORuntime; +use common_catalog::plan::Partitions; use common_catalog::table::CompactTarget; use common_catalog::table::TableExt; use common_exception::ErrorCode; use common_exception::Result; +use common_meta_app::schema::CatalogInfo; +use common_meta_app::schema::TableInfo; use common_pipeline_core::Pipeline; +use common_sql::executor::CommitSink; +use common_sql::executor::CompactPartial; +use common_sql::executor::Exchange; +use common_sql::executor::FragmentKind; +use common_sql::executor::MutationKind; +use common_sql::executor::PhysicalPlan; use common_sql::plans::OptimizeTableAction; use common_sql::plans::OptimizeTablePlan; use common_storages_factory::NavigationPoint; +use storages_common_table_meta::meta::TableSnapshot; use crate::interpreters::Interpreter; use crate::interpreters::InterpreterClusteringHistory; use crate::pipelines::executor::ExecutorSettings; use crate::pipelines::executor::PipelineCompleteExecutor; use crate::pipelines::PipelineBuildResult; +use crate::schedulers::build_query_pipeline_without_render_result_set; use crate::sessions::QueryContext; use crate::sessions::TableContext; @@ -71,6 +82,41 @@ impl Interpreter for OptimizeTableInterpreter { } impl OptimizeTableInterpreter { + pub fn build_physical_plan( + parts: Partitions, + table_info: TableInfo, + snapshot: Arc, + catalog_info: CatalogInfo, + is_distributed: bool, + ) -> Result { + let merge_meta = parts.is_lazy; + let mut root = PhysicalPlan::CompactPartial(CompactPartial { + parts, + table_info: table_info.clone(), + catalog_info: catalog_info.clone(), + column_ids: snapshot.schema.to_leaf_column_id_set(), + }); + + if is_distributed { + root = PhysicalPlan::Exchange(Exchange { + plan_id: 0, + input: Box::new(root), + kind: FragmentKind::Merge, + keys: vec![], + ignore_exchange: false, + }); + } + + Ok(PhysicalPlan::CommitSink(CommitSink { + input: Box::new(root), + table_info, + catalog_info, + snapshot, + mutation_kind: MutationKind::Compact, + merge_meta, + })) + } + async fn build_pipeline( &self, target: CompactTarget, @@ -80,13 +126,12 @@ impl OptimizeTableInterpreter { .ctx .get_table(&self.plan.catalog, &self.plan.database, &self.plan.table) .await?; - let need_recluster = !table.cluster_keys(self.ctx.clone()).is_empty() - && matches!(target, CompactTarget::Blocks); + let table_info = table.get_table_info().clone(); // check if the table is locked. let catalog = self.ctx.get_catalog(&self.plan.catalog).await?; let reply = catalog - .list_table_lock_revs(table.get_table_info().ident.table_id) + .list_table_lock_revs(table_info.ident.table_id) .await?; if !reply.is_empty() { return Err(ErrorCode::TableAlreadyLocked(format!( @@ -95,19 +140,40 @@ impl OptimizeTableInterpreter { ))); } - let mut compact_pipeline = Pipeline::create(); - table - .compact( - self.ctx.clone(), - target, - self.plan.limit, - &mut compact_pipeline, - ) + if matches!(target, CompactTarget::Segments) { + table + .compact_segments(self.ctx.clone(), self.plan.limit) + .await?; + return Ok(PipelineBuildResult::create()); + } + + let res = table + .compact_blocks(self.ctx.clone(), self.plan.limit) .await?; + let is_distributed = !self.ctx.get_cluster().is_empty(); + let catalog_info = catalog.info(); + let mut compact_pipeline = if let Some((parts, snapshot)) = res { + let physical_plan = Self::build_physical_plan( + parts, + table_info, + snapshot, + catalog_info, + is_distributed, + )?; + + let build_res = + build_query_pipeline_without_render_result_set(&self.ctx, &physical_plan, false) + .await?; + build_res.main_pipeline + } else { + Pipeline::create() + }; + let mut build_res = PipelineBuildResult::create(); let settings = self.ctx.get_settings(); let mut reclustered_block_count = 0; + let need_recluster = !table.cluster_keys(self.ctx.clone()).is_empty(); if need_recluster { if !compact_pipeline.is_empty() { compact_pipeline.set_max_threads(settings.get_max_threads()? as usize); diff --git a/src/query/service/src/pipelines/pipeline_builder.rs b/src/query/service/src/pipelines/pipeline_builder.rs index 54f8320be4b22..9eb62fdf249a8 100644 --- a/src/query/service/src/pipelines/pipeline_builder.rs +++ b/src/query/service/src/pipelines/pipeline_builder.rs @@ -72,6 +72,8 @@ use common_sql::executor::AggregateFinal; use common_sql::executor::AggregateFunctionDesc; use common_sql::executor::AggregatePartial; use common_sql::executor::AsyncSourcerPlan; +use common_sql::executor::CommitSink; +use common_sql::executor::CompactPartial; use common_sql::executor::ConstantTableScan; use common_sql::executor::CopyIntoTable; use common_sql::executor::CopyIntoTableSource; @@ -83,14 +85,12 @@ use common_sql::executor::EvalScalar; use common_sql::executor::ExchangeSink; use common_sql::executor::ExchangeSource; use common_sql::executor::Filter; -use common_sql::executor::FinalCommit; use common_sql::executor::HashJoin; use common_sql::executor::Lambda; use common_sql::executor::Limit; use common_sql::executor::MaterializedCte; use common_sql::executor::MergeInto; use common_sql::executor::MergeIntoSource; -use common_sql::executor::MutationAggregate; use common_sql::executor::PhysicalPlan; use common_sql::executor::Project; use common_sql::executor::ProjectSet; @@ -258,7 +258,10 @@ impl PipelineBuilder { self.build_runtime_filter_source(runtime_filter_source) } PhysicalPlan::DeletePartial(delete) => self.build_delete_partial(delete), - PhysicalPlan::MutationAggregate(plan) => self.build_mutation_aggregate(plan), + PhysicalPlan::CompactPartial(compact_partial) => { + self.build_compact_partial(compact_partial) + } + PhysicalPlan::CommitSink(plan) => self.build_commit_sink(plan), PhysicalPlan::RangeJoin(range_join) => self.build_range_join(range_join), PhysicalPlan::MaterializedCte(materialized_cte) => { self.build_materialized_cte(materialized_cte) @@ -271,27 +274,9 @@ impl PipelineBuilder { PhysicalPlan::MergeIntoSource(merge_into_source) => { self.build_merge_into_source(merge_into_source) } - PhysicalPlan::FinalCommit(final_commit) => self.build_final_commit(final_commit), } } - fn build_final_commit(&mut self, final_commit: &FinalCommit) -> Result<()> { - self.build_pipeline(&final_commit.input)?; - let tbl = self.ctx.build_table_by_table_info( - &final_commit.catalog_info, - &final_commit.table_info, - None, - )?; - let table = FuseTable::try_from_table(tbl.as_ref())?; - table.chain_commit_meta_merger(&mut self.main_pipeline, table.cluster_key_id())?; - let ctx: Arc = self.ctx.clone(); - table.chain_commit_sink( - &ctx, - &mut self.main_pipeline, - Arc::new(final_commit.snapshot.clone()), - ) - } - fn check_schema_cast( select_schema: Arc, output_schema: Arc, @@ -757,6 +742,21 @@ impl PipelineBuilder { Ok(()) } + fn build_compact_partial(&mut self, compact_block: &CompactPartial) -> Result<()> { + let table = self.ctx.build_table_by_table_info( + &compact_block.catalog_info, + &compact_block.table_info, + None, + )?; + let table = FuseTable::try_from_table(table.as_ref())?; + table.build_compact_partial( + self.ctx.clone(), + compact_block.parts.clone(), + compact_block.column_ids.clone(), + &mut self.main_pipeline, + ) + } + /// The flow of Pipeline is as follows: /// /// +---------------+ +-----------------------+ @@ -795,29 +795,26 @@ impl PipelineBuilder { table.chain_mutation_aggregator( &ctx, &mut self.main_pipeline, - Arc::new(delete.snapshot.clone()), + delete.snapshot.clone(), MutationKind::Delete, )?; Ok(()) } - /// The flow of Pipeline is as follows: - /// - /// +-----------------------+ +----------+ - /// |TableMutationAggregator| ---> |CommitSink| - /// +-----------------------+ +----------+ - fn build_mutation_aggregate(&mut self, plan: &MutationAggregate) -> Result<()> { + fn build_commit_sink(&mut self, plan: &CommitSink) -> Result<()> { self.build_pipeline(&plan.input)?; let table = self.ctx .build_table_by_table_info(&plan.catalog_info, &plan.table_info, None)?; let table = FuseTable::try_from_table(table.as_ref())?; let ctx: Arc = self.ctx.clone(); + table.chain_mutation_pipes( &ctx, &mut self.main_pipeline, - Arc::new(plan.snapshot.clone()), + plan.snapshot.clone(), plan.mutation_kind, + plan.merge_meta, )?; Ok(()) } diff --git a/src/query/service/src/schedulers/fragments/fragmenter.rs b/src/query/service/src/schedulers/fragments/fragmenter.rs index 0ac1014043618..b6ac093b1f172 100644 --- a/src/query/service/src/schedulers/fragments/fragmenter.rs +++ b/src/query/service/src/schedulers/fragments/fragmenter.rs @@ -57,6 +57,7 @@ enum State { SelectLeaf, DeleteLeaf, ReplaceInto, + Compact, Other, } @@ -179,6 +180,15 @@ impl PhysicalPlanReplacer for Fragmenter { } } + fn replace_compact_partial( + &mut self, + plan: &common_sql::executor::CompactPartial, + ) -> Result { + self.state = State::Compact; + + Ok(PhysicalPlan::CompactPartial(plan.clone())) + } + fn replace_delete_partial( &mut self, plan: &common_sql::executor::DeletePartial, @@ -249,6 +259,7 @@ impl PhysicalPlanReplacer for Fragmenter { State::DeleteLeaf => FragmentType::DeleteLeaf, State::Other => FragmentType::Intermediate, State::ReplaceInto => FragmentType::ReplaceInto, + State::Compact => FragmentType::Compact, }; self.state = State::Other; let exchange = Self::get_exchange( diff --git a/src/query/service/src/schedulers/fragments/plan_fragment.rs b/src/query/service/src/schedulers/fragments/plan_fragment.rs index 55a0905eb2347..6de828ec70e36 100644 --- a/src/query/service/src/schedulers/fragments/plan_fragment.rs +++ b/src/query/service/src/schedulers/fragments/plan_fragment.rs @@ -22,6 +22,7 @@ use common_catalog::plan::PartitionsShuffleKind; use common_exception::ErrorCode; use common_exception::Result; use common_settings::ReplaceIntoShuffleStrategy; +use common_sql::executor::CompactPartial; use common_sql::executor::CopyIntoTable; use common_sql::executor::CopyIntoTableSource; use common_sql::executor::Deduplicate; @@ -61,6 +62,7 @@ pub enum FragmentType { DeleteLeaf, /// Intermediate fragment of a replace into plan, which contains a `ReplaceInto` operator. ReplaceInto, + Compact, } #[derive(Clone)] @@ -147,6 +149,13 @@ impl PlanFragment { } actions.add_fragment_actions(fragment_actions)?; } + FragmentType::Compact => { + let mut fragment_actions = self.redistribute_compact(ctx)?; + if let Some(ref exchange) = self.exchange { + fragment_actions.set_exchange(exchange.clone()); + } + actions.add_fragment_actions(fragment_actions)?; + } } Ok(()) @@ -254,8 +263,7 @@ impl PlanFragment { }; plan = replace_delete_partial.replace(&plan)?; - fragment_actions - .add_action(QueryFragmentAction::create(executor.clone(), plan.clone())); + fragment_actions.add_action(QueryFragmentAction::create(executor, plan)); } Ok(fragment_actions) @@ -273,30 +281,29 @@ impl PlanFragment { let partitions = &plan.segments; let executors = Fragmenter::get_executors(ctx.clone()); let mut fragment_actions = QueryFragmentActions::create(self.fragment_id); - let local_id = &ctx.get_cluster().local_id; + let local_id = ctx.get_cluster().local_id.clone(); match ctx.get_settings().get_replace_into_shuffle_strategy()? { ReplaceIntoShuffleStrategy::SegmentLevelShuffling => { let partition_reshuffle = Self::reshuffle(executors, partitions.clone())?; - for (executor, parts) in partition_reshuffle.iter() { + for (executor, parts) in partition_reshuffle.into_iter() { let mut plan = self.plan.clone(); let need_insert = executor == local_id; let mut replace_replace_into = ReplaceReplaceInto { - partitions: parts.clone(), + partitions: parts, slot: None, need_insert, }; plan = replace_replace_into.replace(&plan)?; - fragment_actions - .add_action(QueryFragmentAction::create(executor.clone(), plan.clone())); + fragment_actions.add_action(QueryFragmentAction::create(executor, plan)); } } ReplaceIntoShuffleStrategy::BlockLevelShuffling => { let num_slots = executors.len(); // assign all the segment locations to each one of the executors, // but for each segment, one executor only need to take part of the blocks - for (executor_idx, executor) in executors.iter().enumerate() { + for (executor_idx, executor) in executors.into_iter().enumerate() { let mut plan = self.plan.clone(); let need_insert = executor == local_id; let mut replace_replace_into = ReplaceReplaceInto { @@ -309,14 +316,41 @@ impl PlanFragment { }; plan = replace_replace_into.replace(&plan)?; - fragment_actions - .add_action(QueryFragmentAction::create(executor.clone(), plan.clone())); + fragment_actions.add_action(QueryFragmentAction::create(executor, plan)); } } } Ok(fragment_actions) } + fn redistribute_compact(&self, ctx: Arc) -> Result { + let exchange_sink = match &self.plan { + PhysicalPlan::ExchangeSink(plan) => plan, + _ => unreachable!("logic error"), + }; + let compact_block = match exchange_sink.input.as_ref() { + PhysicalPlan::CompactPartial(plan) => plan, + _ => unreachable!("logic error"), + }; + + let partitions: &Partitions = &compact_block.parts; + let executors = Fragmenter::get_executors(ctx); + let mut fragment_actions = QueryFragmentActions::create(self.fragment_id); + + let partition_reshuffle = partitions.reshuffle(executors)?; + + for (executor, parts) in partition_reshuffle.into_iter() { + let mut plan = self.plan.clone(); + + let mut replace_compact_partial = ReplaceCompactBlock { partitions: parts }; + plan = replace_compact_partial.replace(&plan)?; + + fragment_actions.add_action(QueryFragmentAction::create(executor, plan)); + } + + Ok(fragment_actions) + } + fn reshuffle( executors: Vec, partitions: Vec, @@ -431,6 +465,19 @@ impl PhysicalPlanReplacer for ReplaceReadSource { } } +struct ReplaceCompactBlock { + pub partitions: Partitions, +} + +impl PhysicalPlanReplacer for ReplaceCompactBlock { + fn replace_compact_partial(&mut self, plan: &CompactPartial) -> Result { + Ok(PhysicalPlan::CompactPartial(CompactPartial { + parts: self.partitions.clone(), + ..plan.clone() + })) + } +} + struct ReplaceDeletePartial { pub partitions: Partitions, } diff --git a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs index 4dd6f7c32631c..45fc60e19256b 100644 --- a/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs +++ b/src/query/service/tests/it/storages/fuse/operations/mutation/block_compact_mutator.rs @@ -16,7 +16,6 @@ use std::collections::HashSet; use std::sync::Arc; use common_base::base::tokio; -use common_catalog::table::CompactTarget; use common_catalog::table::Table; use common_exception::Result; use common_expression::BlockThresholds; @@ -25,9 +24,10 @@ use common_storages_fuse::operations::BlockCompactMutator; use common_storages_fuse::operations::CompactOptions; use common_storages_fuse::operations::CompactPartInfo; use common_storages_fuse::statistics::reducers::merge_statistics_mut; -use common_storages_fuse::FuseTable; +use databend_query::interpreters::OptimizeTableInterpreter; use databend_query::pipelines::executor::ExecutorSettings; use databend_query::pipelines::executor::PipelineCompleteExecutor; +use databend_query::schedulers::build_query_pipeline_without_render_result_set; use databend_query::sessions::QueryContext; use databend_query::sessions::TableContext; use databend_query::test_kits::table_test_fixture::execute_command; @@ -106,12 +106,25 @@ async fn test_compact() -> Result<()> { } async fn do_compact(ctx: Arc, table: Arc) -> Result { - let fuse_table = FuseTable::try_from_table(table.as_ref())?; let settings = ctx.get_settings(); let mut pipeline = common_pipeline_core::Pipeline::create(); - fuse_table - .compact(ctx.clone(), CompactTarget::Blocks, None, &mut pipeline) - .await?; + let res = table.compact_blocks(ctx.clone(), None).await?; + + let table_info = table.get_table_info().clone(); + let catalog_info = ctx.get_catalog("default").await?.info(); + if let Some((parts, snapshot)) = res { + let physical_plan = OptimizeTableInterpreter::build_physical_plan( + parts, + table_info, + snapshot, + catalog_info, + false, + )?; + + let build_res = + build_query_pipeline_without_render_result_set(&ctx, &physical_plan, false).await?; + pipeline = build_res.main_pipeline; + }; if !pipeline.is_empty() { pipeline.set_max_threads(settings.get_max_threads()? as usize); @@ -190,13 +203,6 @@ async fn test_safety() -> Result<()> { merge_statistics_mut(&mut summary, &seg.summary, None); } - let mut block_ids = HashSet::new(); - for seg in &segment_infos { - for b in &seg.blocks { - block_ids.insert(b.location.clone()); - } - } - let id = Uuid::new_v4(); let snapshot = TableSnapshot::new( id, @@ -204,7 +210,7 @@ async fn test_safety() -> Result<()> { None, schema.as_ref().clone(), summary, - locations, + locations.clone(), None, None, ); @@ -224,42 +230,57 @@ async fn test_safety() -> Result<()> { operator.clone(), cluster_key_id, ); - block_compact_mutator.target_select().await?; - let selections = block_compact_mutator.compact_tasks; - let mut blocks_number = 0; + let selections = block_compact_mutator.target_select().await?; + if selections.is_empty() { + eprintln!("no target select"); + continue; + } + assert!(!selections.is_lazy); - let mut block_ids_after_compaction = HashSet::new(); + let mut actual_blocks_number = 0; + let mut compact_segment_indices = HashSet::new(); + let mut actual_block_ids = HashSet::new(); for part in selections.partitions.into_iter() { let part = CompactPartInfo::from_part(&part)?; - blocks_number += part.blocks.len(); - for b in &part.blocks { - block_ids_after_compaction.insert(b.location.clone()); - } - } - - for unchanged in block_compact_mutator.unchanged_blocks_map.values() { - blocks_number += unchanged.len(); - for b in unchanged.values() { - block_ids_after_compaction.insert(b.location.clone()); + match part { + CompactPartInfo::CompactExtraInfo(extra) => { + compact_segment_indices.insert(extra.segment_index); + compact_segment_indices.extend(extra.removed_segment_indexes.iter()); + actual_blocks_number += extra.unchanged_blocks.len(); + for b in &extra.unchanged_blocks { + actual_block_ids.insert(b.1.location.clone()); + } + } + CompactPartInfo::CompactTaskInfo(task) => { + compact_segment_indices.insert(task.index.segment_idx); + actual_blocks_number += task.blocks.len(); + for b in &task.blocks { + actual_block_ids.insert(b.location.clone()); + } + } } } - for unchanged_segment in block_compact_mutator.unchanged_segments_map.values() { + eprintln!("compact_segment_indices: {:?}", compact_segment_indices); + let mut except_blocks_number = 0; + let mut except_block_ids = HashSet::new(); + for idx in compact_segment_indices.into_iter() { + let loc = locations.get(idx).unwrap(); let compact_segment = SegmentsIO::read_compact_segment( ctx.get_data_operator()?.operator(), - unchanged_segment.clone(), + loc.clone(), TestFixture::default_table_schema(), false, ) .await?; let segment = SegmentInfo::try_from(compact_segment)?; - blocks_number += segment.blocks.len(); + except_blocks_number += segment.blocks.len(); for b in &segment.blocks { - block_ids_after_compaction.insert(b.location.clone()); + except_block_ids.insert(b.location.clone()); } } - assert_eq!(number_of_blocks, blocks_number); - assert_eq!(block_ids, block_ids_after_compaction); + assert_eq!(except_blocks_number, actual_blocks_number); + assert_eq!(except_block_ids, actual_block_ids); } Ok(()) diff --git a/src/query/sql/src/evaluator/block_operator.rs b/src/query/sql/src/evaluator/block_operator.rs index c9ea1a01b69cd..289979b700889 100644 --- a/src/query/sql/src/evaluator/block_operator.rs +++ b/src/query/sql/src/evaluator/block_operator.rs @@ -73,7 +73,7 @@ pub enum BlockOperator { impl BlockOperator { pub fn execute(&self, func_ctx: &FunctionContext, mut input: DataBlock) -> Result { if input.is_empty() { - return Ok(DataBlock::empty()); + return Ok(input); } match self { BlockOperator::Map { exprs, projections } => { diff --git a/src/query/sql/src/executor/format.rs b/src/query/sql/src/executor/format.rs index d699da312dedd..bf69fbc770f2a 100644 --- a/src/query/sql/src/executor/format.rs +++ b/src/query/sql/src/executor/format.rs @@ -20,39 +20,39 @@ use common_functions::BUILTIN_FUNCTIONS; use common_profile::SharedProcessorProfiles; use itertools::Itertools; -use super::AggregateExpand; -use super::AggregateFinal; -use super::AggregateFunctionDesc; -use super::AggregatePartial; -use super::CopyIntoTable; -use super::DeletePartial; -use super::EvalScalar; -use super::Exchange; -use super::Filter; -use super::HashJoin; -use super::Lambda; -use super::Limit; -use super::MutationAggregate; -use super::PhysicalPlan; -use super::Project; -use super::ProjectSet; -use super::RowFetch; -use super::Sort; -use super::TableScan; -use super::UnionAll; -use super::WindowFunction; use crate::executor::explain::PlanStatsInfo; +use crate::executor::AggregateExpand; +use crate::executor::AggregateFinal; +use crate::executor::AggregateFunctionDesc; +use crate::executor::AggregatePartial; +use crate::executor::CommitSink; use crate::executor::ConstantTableScan; +use crate::executor::CopyIntoTable; use crate::executor::CteScan; +use crate::executor::DeletePartial; use crate::executor::DistributedInsertSelect; +use crate::executor::EvalScalar; +use crate::executor::Exchange; use crate::executor::ExchangeSink; use crate::executor::ExchangeSource; +use crate::executor::Filter; use crate::executor::FragmentKind; +use crate::executor::HashJoin; +use crate::executor::Lambda; +use crate::executor::Limit; use crate::executor::MaterializedCte; +use crate::executor::PhysicalPlan; +use crate::executor::Project; +use crate::executor::ProjectSet; use crate::executor::RangeJoin; use crate::executor::RangeJoinType; +use crate::executor::RowFetch; use crate::executor::RuntimeFilterSource; +use crate::executor::Sort; +use crate::executor::TableScan; +use crate::executor::UnionAll; use crate::executor::Window; +use crate::executor::WindowFunction; use crate::planner::Metadata; use crate::planner::MetadataRef; use crate::planner::DUMMY_TABLE_INDEX; @@ -198,9 +198,8 @@ fn to_format_tree( PhysicalPlan::DeletePartial(plan) => { delete_partial_to_format_tree(plan.as_ref(), metadata, profs) } - PhysicalPlan::MutationAggregate(plan) => { - delete_final_to_format_tree(plan.as_ref(), metadata, profs) - } + PhysicalPlan::CompactPartial(_) => Ok(FormatTreeNode::new("CompactPartial".to_string())), + PhysicalPlan::CommitSink(plan) => commit_sink_to_format_tree(plan, metadata, profs), PhysicalPlan::ProjectSet(plan) => project_set_to_format_tree(plan, metadata, profs), PhysicalPlan::Lambda(plan) => lambda_to_format_tree(plan, metadata, profs), PhysicalPlan::RuntimeFilterSource(plan) => { @@ -218,7 +217,6 @@ fn to_format_tree( materialized_cte_to_format_tree(plan, metadata, profs) } PhysicalPlan::ConstantTableScan(plan) => constant_table_scan_to_format_tree(plan, metadata), - PhysicalPlan::FinalCommit(_) => Ok(FormatTreeNode::new("FinalCommit".to_string())), } } @@ -1066,14 +1064,14 @@ fn delete_partial_to_format_tree( Ok(FormatTreeNode::new("DeletePartial".to_string())) } -fn delete_final_to_format_tree( - plan: &MutationAggregate, +fn commit_sink_to_format_tree( + plan: &CommitSink, metadata: &Metadata, prof_span_set: &SharedProcessorProfiles, ) -> Result> { let children = vec![to_format_tree(&plan.input, metadata, prof_span_set)?]; Ok(FormatTreeNode::with_children( - "DeleteFinal".to_string(), + "CommitSink".to_string(), children, )) } diff --git a/src/query/sql/src/executor/physical_plan.rs b/src/query/sql/src/executor/physical_plan.rs index 8423b0015b5c5..7a0cc6af5c8f8 100644 --- a/src/query/sql/src/executor/physical_plan.rs +++ b/src/query/sql/src/executor/physical_plan.rs @@ -14,8 +14,10 @@ use std::collections::BTreeMap; use std::collections::HashMap; +use std::collections::HashSet; use std::fmt::Display; use std::fmt::Formatter; +use std::sync::Arc; use common_catalog::plan::DataSourcePlan; use common_catalog::plan::InternalColumn; @@ -871,7 +873,7 @@ pub struct DeletePartial { pub catalog_info: CatalogInfo, pub col_indices: Vec, pub query_row_id_col: bool, - pub snapshot: TableSnapshot, + pub snapshot: Arc, } impl DeletePartial { @@ -880,7 +882,7 @@ impl DeletePartial { } } -impl MutationAggregate { +impl CommitSink { pub fn output_schema(&self) -> Result { Ok(DataSchemaRef::default()) } @@ -888,22 +890,23 @@ impl MutationAggregate { // TODO(sky): make TableMutationAggregator distributed #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub struct MutationAggregate { +pub struct CommitSink { pub input: Box, - pub snapshot: TableSnapshot, + pub snapshot: Arc, pub table_info: TableInfo, pub catalog_info: CatalogInfo, pub mutation_kind: MutationKind, + pub merge_meta: bool, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize, Copy)] -/// This is used by TableMutationAggregator, so no compact here. pub enum MutationKind { Delete, Update, Replace, Recluster, Insert, + Compact, } impl Display for MutationKind { @@ -914,6 +917,7 @@ impl Display for MutationKind { MutationKind::Recluster => write!(f, "Recluster"), MutationKind::Update => write!(f, "Update"), MutationKind::Replace => write!(f, "Replace"), + MutationKind::Compact => write!(f, "Compact"), } } } @@ -964,11 +968,11 @@ pub struct ReplaceInto { } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] -pub struct FinalCommit { - pub input: Box, - pub catalog_info: CatalogInfo, +pub struct CompactPartial { + pub parts: Partitions, pub table_info: TableInfo, - pub snapshot: TableSnapshot, + pub catalog_info: CatalogInfo, + pub column_ids: HashSet, } #[derive(Clone, Debug, serde::Serialize, serde::Deserialize)] @@ -1027,17 +1031,18 @@ pub enum PhysicalPlan { /// Delete DeletePartial(Box), - MutationAggregate(Box), /// Copy into table CopyIntoTable(Box), /// Replace AsyncSourcer(AsyncSourcerPlan), Deduplicate(Deduplicate), ReplaceInto(ReplaceInto), - FinalCommit(Box), // MergeInto MergeIntoSource(MergeIntoSource), MergeInto(MergeInto), + /// Compact + CompactPartial(CompactPartial), + CommitSink(CommitSink), } impl PhysicalPlan { @@ -1079,12 +1084,12 @@ impl PhysicalPlan { PhysicalPlan::DeletePartial(_) | PhysicalPlan::MergeInto(_) | PhysicalPlan::MergeIntoSource(_) - | PhysicalPlan::MutationAggregate(_) + | PhysicalPlan::CommitSink(_) | PhysicalPlan::CopyIntoTable(_) | PhysicalPlan::AsyncSourcer(_) | PhysicalPlan::Deduplicate(_) | PhysicalPlan::ReplaceInto(_) - | PhysicalPlan::FinalCommit(_) => { + | PhysicalPlan::CompactPartial(_) => { unreachable!() } } @@ -1113,7 +1118,7 @@ impl PhysicalPlan { PhysicalPlan::ProjectSet(plan) => plan.output_schema(), PhysicalPlan::RuntimeFilterSource(plan) => plan.output_schema(), PhysicalPlan::DeletePartial(plan) => plan.output_schema(), - PhysicalPlan::MutationAggregate(plan) => plan.output_schema(), + PhysicalPlan::CommitSink(plan) => plan.output_schema(), PhysicalPlan::RangeJoin(plan) => plan.output_schema(), PhysicalPlan::CopyIntoTable(plan) => plan.output_schema(), PhysicalPlan::CteScan(plan) => plan.output_schema(), @@ -1123,8 +1128,8 @@ impl PhysicalPlan { PhysicalPlan::AsyncSourcer(_) | PhysicalPlan::MergeInto(_) | PhysicalPlan::Deduplicate(_) - | PhysicalPlan::FinalCommit(_) - | PhysicalPlan::ReplaceInto(_) => Ok(DataSchemaRef::default()), + | PhysicalPlan::ReplaceInto(_) + | PhysicalPlan::CompactPartial(_) => Ok(DataSchemaRef::default()), } } @@ -1150,8 +1155,9 @@ impl PhysicalPlan { PhysicalPlan::ExchangeSink(_) => "Exchange Sink".to_string(), PhysicalPlan::ProjectSet(_) => "Unnest".to_string(), PhysicalPlan::RuntimeFilterSource(_) => "RuntimeFilterSource".to_string(), + PhysicalPlan::CompactPartial(_) => "CompactBlock".to_string(), PhysicalPlan::DeletePartial(_) => "DeletePartial".to_string(), - PhysicalPlan::MutationAggregate(_) => "MutationAggregate".to_string(), + PhysicalPlan::CommitSink(_) => "CommitSink".to_string(), PhysicalPlan::RangeJoin(_) => "RangeJoin".to_string(), PhysicalPlan::CopyIntoTable(_) => "CopyIntoTable".to_string(), PhysicalPlan::AsyncSourcer(_) => "AsyncSourcer".to_string(), @@ -1162,7 +1168,6 @@ impl PhysicalPlan { PhysicalPlan::CteScan(_) => "PhysicalCteScan".to_string(), PhysicalPlan::MaterializedCte(_) => "PhysicalMaterializedCte".to_string(), PhysicalPlan::ConstantTableScan(_) => "PhysicalConstantTableScan".to_string(), - PhysicalPlan::FinalCommit(_) => "FinalCommit".to_string(), } } @@ -1194,8 +1199,9 @@ impl PhysicalPlan { PhysicalPlan::DistributedInsertSelect(plan) => { Box::new(std::iter::once(plan.input.as_ref())) } + PhysicalPlan::CompactPartial(_) => Box::new(std::iter::empty()), PhysicalPlan::DeletePartial(_plan) => Box::new(std::iter::empty()), - PhysicalPlan::MutationAggregate(plan) => Box::new(std::iter::once(plan.input.as_ref())), + PhysicalPlan::CommitSink(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::ProjectSet(plan) => Box::new(std::iter::once(plan.input.as_ref())), PhysicalPlan::RuntimeFilterSource(plan) => Box::new( std::iter::once(plan.left_side.as_ref()) @@ -1213,7 +1219,6 @@ impl PhysicalPlan { PhysicalPlan::MaterializedCte(plan) => Box::new( std::iter::once(plan.left.as_ref()).chain(std::iter::once(plan.right.as_ref())), ), - PhysicalPlan::FinalCommit(plan) => Box::new(std::iter::once(plan.input.as_ref())), } } @@ -1242,8 +1247,9 @@ impl PhysicalPlan { | PhysicalPlan::AggregateExpand(_) | PhysicalPlan::AggregateFinal(_) | PhysicalPlan::AggregatePartial(_) + | PhysicalPlan::CompactPartial(_) | PhysicalPlan::DeletePartial(_) - | PhysicalPlan::MutationAggregate(_) + | PhysicalPlan::CommitSink(_) | PhysicalPlan::CopyIntoTable(_) | PhysicalPlan::AsyncSourcer(_) | PhysicalPlan::Deduplicate(_) @@ -1251,7 +1257,6 @@ impl PhysicalPlan { | PhysicalPlan::MergeInto(_) | PhysicalPlan::MergeIntoSource(_) | PhysicalPlan::ConstantTableScan(_) - | PhysicalPlan::FinalCommit(_) | PhysicalPlan::CteScan(_) => None, } } diff --git a/src/query/sql/src/executor/physical_plan_display.rs b/src/query/sql/src/executor/physical_plan_display.rs index bda90581c9813..e594e4b1d9eeb 100644 --- a/src/query/sql/src/executor/physical_plan_display.rs +++ b/src/query/sql/src/executor/physical_plan_display.rs @@ -18,23 +18,18 @@ use std::fmt::Formatter; use common_functions::BUILTIN_FUNCTIONS; use itertools::Itertools; -use super::AggregateExpand; -use super::AsyncSourcerPlan; -use super::CopyIntoTable; -use super::Deduplicate; -use super::DeletePartial; -use super::DistributedInsertSelect; -use super::FinalCommit; -use super::MergeInto; -use super::MergeIntoSource; -use super::MutationAggregate; -use super::ProjectSet; -use super::ReplaceInto; -use super::RowFetch; +use crate::executor::AggregateExpand; use crate::executor::AggregateFinal; use crate::executor::AggregatePartial; +use crate::executor::AsyncSourcerPlan; +use crate::executor::CommitSink; +use crate::executor::CompactPartial; use crate::executor::ConstantTableScan; +use crate::executor::CopyIntoTable; use crate::executor::CteScan; +use crate::executor::Deduplicate; +use crate::executor::DeletePartial; +use crate::executor::DistributedInsertSelect; use crate::executor::EvalScalar; use crate::executor::Exchange; use crate::executor::ExchangeSink; @@ -44,9 +39,14 @@ use crate::executor::HashJoin; use crate::executor::Lambda; use crate::executor::Limit; use crate::executor::MaterializedCte; +use crate::executor::MergeInto; +use crate::executor::MergeIntoSource; use crate::executor::PhysicalPlan; use crate::executor::Project; +use crate::executor::ProjectSet; use crate::executor::RangeJoin; +use crate::executor::ReplaceInto; +use crate::executor::RowFetch; use crate::executor::RuntimeFilterSource; use crate::executor::Sort; use crate::executor::TableScan; @@ -87,8 +87,9 @@ impl<'a> Display for PhysicalPlanIndentFormatDisplay<'a> { PhysicalPlan::ExchangeSink(sink) => write!(f, "{}", sink)?, PhysicalPlan::UnionAll(union_all) => write!(f, "{}", union_all)?, PhysicalPlan::DistributedInsertSelect(insert_select) => write!(f, "{}", insert_select)?, + PhysicalPlan::CompactPartial(compact_partial) => write!(f, "{}", compact_partial)?, PhysicalPlan::DeletePartial(delete) => write!(f, "{}", delete)?, - PhysicalPlan::MutationAggregate(mutation) => write!(f, "{}", mutation)?, + PhysicalPlan::CommitSink(commit) => write!(f, "{}", commit)?, PhysicalPlan::ProjectSet(unnest) => write!(f, "{}", unnest)?, PhysicalPlan::Lambda(lambda) => write!(f, "{}", lambda)?, PhysicalPlan::RuntimeFilterSource(plan) => write!(f, "{}", plan)?, @@ -102,7 +103,6 @@ impl<'a> Display for PhysicalPlanIndentFormatDisplay<'a> { PhysicalPlan::CteScan(cte_scan) => write!(f, "{}", cte_scan)?, PhysicalPlan::MaterializedCte(plan) => write!(f, "{}", plan)?, PhysicalPlan::ConstantTableScan(scan) => write!(f, "{}", scan)?, - PhysicalPlan::FinalCommit(plan) => write!(f, "{}", plan)?, } for node in self.node.children() { @@ -400,15 +400,21 @@ impl Display for DistributedInsertSelect { } } +impl Display for CompactPartial { + fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { + write!(f, "CompactPartial") + } +} + impl Display for DeletePartial { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { write!(f, "DeletePartial") } } -impl Display for MutationAggregate { +impl Display for CommitSink { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "MutationAggregate") + write!(f, "CommitSink") } } impl Display for CopyIntoTable { @@ -469,12 +475,6 @@ impl Display for MergeIntoSource { } } -impl Display for FinalCommit { - fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { - write!(f, "FinalCommit") - } -} - impl Display for Lambda { fn fmt(&self, f: &mut Formatter<'_>) -> std::fmt::Result { let scalars = self diff --git a/src/query/sql/src/executor/physical_plan_visitor.rs b/src/query/sql/src/executor/physical_plan_visitor.rs index 6386601dca527..a0d99825b13b2 100644 --- a/src/query/sql/src/executor/physical_plan_visitor.rs +++ b/src/query/sql/src/executor/physical_plan_visitor.rs @@ -14,39 +14,40 @@ use common_exception::Result; -use super::AggregateExpand; -use super::AggregateFinal; -use super::AggregatePartial; -use super::AsyncSourcerPlan; -use super::CopyIntoTable; -use super::CopyIntoTableSource; -use super::Deduplicate; -use super::DeletePartial; -use super::DistributedInsertSelect; -use super::EvalScalar; -use super::Exchange; -use super::ExchangeSink; -use super::ExchangeSource; -use super::Filter; -use super::HashJoin; -use super::Lambda; -use super::Limit; -use super::MergeInto; -use super::MergeIntoSource; -use super::MutationAggregate; -use super::PhysicalPlan; -use super::Project; -use super::ProjectSet; -use super::QuerySource; -use super::ReplaceInto; -use super::RowFetch; -use super::Sort; -use super::TableScan; +use crate::executor::AggregateExpand; +use crate::executor::AggregateFinal; +use crate::executor::AggregatePartial; +use crate::executor::AsyncSourcerPlan; +use crate::executor::CommitSink; +use crate::executor::CompactPartial; use crate::executor::ConstantTableScan; +use crate::executor::CopyIntoTable; +use crate::executor::CopyIntoTableSource; use crate::executor::CteScan; +use crate::executor::Deduplicate; +use crate::executor::DeletePartial; +use crate::executor::DistributedInsertSelect; +use crate::executor::EvalScalar; +use crate::executor::Exchange; +use crate::executor::ExchangeSink; +use crate::executor::ExchangeSource; +use crate::executor::Filter; +use crate::executor::HashJoin; +use crate::executor::Lambda; +use crate::executor::Limit; use crate::executor::MaterializedCte; +use crate::executor::MergeInto; +use crate::executor::MergeIntoSource; +use crate::executor::PhysicalPlan; +use crate::executor::Project; +use crate::executor::ProjectSet; +use crate::executor::QuerySource; use crate::executor::RangeJoin; +use crate::executor::ReplaceInto; +use crate::executor::RowFetch; use crate::executor::RuntimeFilterSource; +use crate::executor::Sort; +use crate::executor::TableScan; use crate::executor::UnionAll; use crate::executor::Window; @@ -74,8 +75,9 @@ pub trait PhysicalPlanReplacer { PhysicalPlan::ProjectSet(plan) => self.replace_project_set(plan), PhysicalPlan::Lambda(plan) => self.replace_lambda(plan), PhysicalPlan::RuntimeFilterSource(plan) => self.replace_runtime_filter_source(plan), + PhysicalPlan::CompactPartial(plan) => self.replace_compact_partial(plan), PhysicalPlan::DeletePartial(plan) => self.replace_delete_partial(plan), - PhysicalPlan::MutationAggregate(plan) => self.replace_delete_final(plan), + PhysicalPlan::CommitSink(plan) => self.replace_commit_sink(plan), PhysicalPlan::RangeJoin(plan) => self.replace_range_join(plan), PhysicalPlan::CopyIntoTable(plan) => self.replace_copy_into_table(plan), PhysicalPlan::AsyncSourcer(plan) => self.replace_async_sourcer(plan), @@ -85,7 +87,6 @@ pub trait PhysicalPlanReplacer { PhysicalPlan::MergeIntoSource(plan) => self.replace_merge_into_source(plan), PhysicalPlan::MaterializedCte(plan) => self.replace_materialized_cte(plan), PhysicalPlan::ConstantTableScan(plan) => self.replace_constant_table_scan(plan), - PhysicalPlan::FinalCommit(plan) => self.replace_final_commit(plan), } } @@ -365,18 +366,20 @@ pub trait PhysicalPlanReplacer { ))) } + fn replace_compact_partial(&mut self, plan: &CompactPartial) -> Result { + Ok(PhysicalPlan::CompactPartial(plan.clone())) + } + fn replace_delete_partial(&mut self, plan: &DeletePartial) -> Result { Ok(PhysicalPlan::DeletePartial(Box::new(plan.clone()))) } - fn replace_delete_final(&mut self, plan: &MutationAggregate) -> Result { + fn replace_commit_sink(&mut self, plan: &CommitSink) -> Result { let input = self.replace(&plan.input)?; - Ok(PhysicalPlan::MutationAggregate(Box::new( - MutationAggregate { - input: Box::new(input), - ..plan.clone() - }, - ))) + Ok(PhysicalPlan::CommitSink(CommitSink { + input: Box::new(input), + ..plan.clone() + })) } fn replace_async_sourcer(&mut self, plan: &AsyncSourcerPlan) -> Result { @@ -399,19 +402,6 @@ pub trait PhysicalPlanReplacer { })) } - fn replace_final_commit( - &mut self, - plan: &crate::executor::FinalCommit, - ) -> Result { - let input = self.replace(&plan.input)?; - Ok(PhysicalPlan::FinalCommit(Box::new( - crate::executor::FinalCommit { - input: Box::new(input), - ..plan.clone() - }, - ))) - } - fn replace_merge_into(&mut self, plan: &MergeInto) -> Result { let input = self.replace(&plan.input)?; Ok(PhysicalPlan::MergeInto(MergeInto { @@ -547,8 +537,9 @@ impl PhysicalPlan { Self::traverse(&plan.left, pre_visit, visit, post_visit); Self::traverse(&plan.right, pre_visit, visit, post_visit); } + PhysicalPlan::CompactPartial(_) => {} PhysicalPlan::DeletePartial(_) => {} - PhysicalPlan::MutationAggregate(plan) => { + PhysicalPlan::CommitSink(plan) => { Self::traverse(&plan.input, pre_visit, visit, post_visit); } PhysicalPlan::Deduplicate(plan) => { @@ -567,9 +558,6 @@ impl PhysicalPlan { Self::traverse(&plan.left, pre_visit, visit, post_visit); Self::traverse(&plan.right, pre_visit, visit, post_visit); } - PhysicalPlan::FinalCommit(plan) => { - Self::traverse(&plan.input, pre_visit, visit, post_visit) - } } post_visit(plan); } diff --git a/src/query/sql/src/executor/profile.rs b/src/query/sql/src/executor/profile.rs index 22b6dc13d8eeb..031ac6a90424e 100644 --- a/src/query/sql/src/executor/profile.rs +++ b/src/query/sql/src/executor/profile.rs @@ -505,14 +505,14 @@ fn flatten_plan_node_profile( } PhysicalPlan::MaterializedCte(_) => todo!(), PhysicalPlan::DeletePartial(_) - | PhysicalPlan::MutationAggregate(_) + | PhysicalPlan::CommitSink(_) | PhysicalPlan::CopyIntoTable(_) | PhysicalPlan::AsyncSourcer(_) | PhysicalPlan::MergeInto(_) | PhysicalPlan::MergeIntoSource(_) | PhysicalPlan::Deduplicate(_) - | PhysicalPlan::ReplaceInto(_) => unreachable!(), - PhysicalPlan::FinalCommit(_) => unreachable!(), + | PhysicalPlan::ReplaceInto(_) + | PhysicalPlan::CompactPartial(_) => unreachable!(), } Ok(()) diff --git a/src/query/sql/src/planner/plans/plan.rs b/src/query/sql/src/planner/plans/plan.rs index 2e2400a46a2df..dca1a37a69d94 100644 --- a/src/query/sql/src/planner/plans/plan.rs +++ b/src/query/sql/src/planner/plans/plan.rs @@ -23,47 +23,27 @@ use common_expression::DataSchema; use common_expression::DataSchemaRef; use common_expression::DataSchemaRefExt; -use super::data_mask::CreateDatamaskPolicyPlan; -use super::CopyIntoTableMode; -use super::CreateIndexPlan; -use super::CreateShareEndpointPlan; -use super::DescDatamaskPolicyPlan; -use super::DropDatamaskPolicyPlan; -use super::DropIndexPlan; -use super::DropShareEndpointPlan; -use super::MergeInto; -use super::ModifyTableColumnPlan; -use super::RenameTableColumnPlan; -use super::SetOptionsPlan; -use super::VacuumDropTablePlan; -use super::VacuumTablePlan; use crate::optimizer::SExpr; -use crate::plans::copy::CopyPlan; -use crate::plans::insert::Insert; -use crate::plans::presign::PresignPlan; -use crate::plans::recluster_table::ReclusterTablePlan; -use crate::plans::share::AlterShareTenantsPlan; -use crate::plans::share::CreateSharePlan; -use crate::plans::share::DescSharePlan; -use crate::plans::share::DropSharePlan; -use crate::plans::share::GrantShareObjectPlan; -use crate::plans::share::RevokeShareObjectPlan; -use crate::plans::share::ShowGrantTenantsOfSharePlan; -use crate::plans::share::ShowObjectGrantPrivilegesPlan; -use crate::plans::share::ShowSharesPlan; use crate::plans::AddTableColumnPlan; use crate::plans::AlterNetworkPolicyPlan; +use crate::plans::AlterShareTenantsPlan; use crate::plans::AlterTableClusterKeyPlan; use crate::plans::AlterUDFPlan; use crate::plans::AlterUserPlan; use crate::plans::AlterViewPlan; use crate::plans::AlterVirtualColumnPlan; use crate::plans::AnalyzeTablePlan; +use crate::plans::CopyIntoTableMode; +use crate::plans::CopyPlan; use crate::plans::CreateCatalogPlan; use crate::plans::CreateDatabasePlan; +use crate::plans::CreateDatamaskPolicyPlan; use crate::plans::CreateFileFormatPlan; +use crate::plans::CreateIndexPlan; use crate::plans::CreateNetworkPolicyPlan; use crate::plans::CreateRolePlan; +use crate::plans::CreateShareEndpointPlan; +use crate::plans::CreateSharePlan; use crate::plans::CreateStagePlan; use crate::plans::CreateTablePlan; use crate::plans::CreateUDFPlan; @@ -71,13 +51,19 @@ use crate::plans::CreateUserPlan; use crate::plans::CreateViewPlan; use crate::plans::CreateVirtualColumnPlan; use crate::plans::DeletePlan; +use crate::plans::DescDatamaskPolicyPlan; use crate::plans::DescNetworkPolicyPlan; +use crate::plans::DescSharePlan; use crate::plans::DescribeTablePlan; use crate::plans::DropCatalogPlan; use crate::plans::DropDatabasePlan; +use crate::plans::DropDatamaskPolicyPlan; use crate::plans::DropFileFormatPlan; +use crate::plans::DropIndexPlan; use crate::plans::DropNetworkPolicyPlan; use crate::plans::DropRolePlan; +use crate::plans::DropShareEndpointPlan; +use crate::plans::DropSharePlan; use crate::plans::DropStagePlan; use crate::plans::DropTableClusterKeyPlan; use crate::plans::DropTableColumnPlan; @@ -89,33 +75,47 @@ use crate::plans::DropVirtualColumnPlan; use crate::plans::ExistsTablePlan; use crate::plans::GrantPrivilegePlan; use crate::plans::GrantRolePlan; +use crate::plans::GrantShareObjectPlan; +use crate::plans::Insert; use crate::plans::KillPlan; +use crate::plans::MergeInto; +use crate::plans::ModifyTableColumnPlan; use crate::plans::OptimizeTablePlan; +use crate::plans::PresignPlan; +use crate::plans::ReclusterTablePlan; use crate::plans::RefreshIndexPlan; use crate::plans::RefreshVirtualColumnPlan; use crate::plans::RemoveStagePlan; use crate::plans::RenameDatabasePlan; +use crate::plans::RenameTableColumnPlan; use crate::plans::RenameTablePlan; use crate::plans::Replace; use crate::plans::RevertTablePlan; use crate::plans::RevokePrivilegePlan; use crate::plans::RevokeRolePlan; +use crate::plans::RevokeShareObjectPlan; +use crate::plans::SetOptionsPlan; use crate::plans::SetRolePlan; use crate::plans::SettingPlan; use crate::plans::ShowCreateCatalogPlan; use crate::plans::ShowCreateDatabasePlan; use crate::plans::ShowCreateTablePlan; use crate::plans::ShowFileFormatsPlan; +use crate::plans::ShowGrantTenantsOfSharePlan; use crate::plans::ShowGrantsPlan; use crate::plans::ShowNetworkPoliciesPlan; +use crate::plans::ShowObjectGrantPrivilegesPlan; use crate::plans::ShowRolesPlan; use crate::plans::ShowShareEndpointPlan; +use crate::plans::ShowSharesPlan; use crate::plans::TruncateTablePlan; use crate::plans::UnSettingPlan; use crate::plans::UndropDatabasePlan; use crate::plans::UndropTablePlan; use crate::plans::UpdatePlan; use crate::plans::UseDatabasePlan; +use crate::plans::VacuumDropTablePlan; +use crate::plans::VacuumTablePlan; use crate::BindContext; use crate::MetadataRef; diff --git a/src/query/storages/common/table-meta/src/meta/format.rs b/src/query/storages/common/table-meta/src/meta/format.rs index ae80595fdfd0f..913648e65aa32 100644 --- a/src/query/storages/common/table-meta/src/meta/format.rs +++ b/src/query/storages/common/table-meta/src/meta/format.rs @@ -39,7 +39,7 @@ pub const NUM_BLOCK_ID_BITS: usize = 11; pub const MAX_SEGMENT_BLOCK_NUMBER: usize = 1 << NUM_BLOCK_ID_BITS; #[repr(u8)] -#[derive(Default, Debug, Clone)] +#[derive(serde::Serialize, serde::Deserialize, Default, Debug, Clone, PartialEq)] pub enum MetaCompression { None = 0, #[default] @@ -107,7 +107,7 @@ pub fn decompress(compression: &MetaCompression, data: Vec) -> Result, pub encoding: MetaEncoding, pub compression: MetaCompression, } -#[derive(Clone)] +#[derive(serde::Serialize, serde::Deserialize, PartialEq, Clone)] pub struct CompactSegmentInfo { pub format_version: FormatVersion, pub summary: Statistics, diff --git a/src/query/storages/fuse/src/fuse_table.rs b/src/query/storages/fuse/src/fuse_table.rs index 40ac5219ea795..7aec5d6c3df7a 100644 --- a/src/query/storages/fuse/src/fuse_table.rs +++ b/src/query/storages/fuse/src/fuse_table.rs @@ -27,7 +27,6 @@ use common_catalog::plan::PushDownInfo; use common_catalog::statistics::BasicColumnStatistics; use common_catalog::table::AppendMode; use common_catalog::table::ColumnStatisticsProvider; -use common_catalog::table::CompactTarget; use common_catalog::table::NavigationDescriptor; use common_catalog::table_context::TableContext; use common_exception::ErrorCode; @@ -681,14 +680,21 @@ impl Table for FuseTable { } #[async_backtrace::framed] - async fn compact( + async fn compact_segments( &self, ctx: Arc, - target: CompactTarget, limit: Option, - pipeline: &mut Pipeline, ) -> Result<()> { - self.do_compact(ctx, target, limit, pipeline).await + self.do_compact_segments(ctx, limit).await + } + + #[async_backtrace::framed] + async fn compact_blocks( + &self, + ctx: Arc, + limit: Option, + ) -> Result)>> { + self.do_compact_blocks(ctx, limit).await } #[async_backtrace::framed] diff --git a/src/query/storages/fuse/src/operations/common/mutation_log.rs b/src/query/storages/fuse/src/operations/common/mutation_log.rs index 552a6af99f356..2cb87a797f4ae 100644 --- a/src/query/storages/fuse/src/operations/common/mutation_log.rs +++ b/src/query/storages/fuse/src/operations/common/mutation_log.rs @@ -24,10 +24,13 @@ use storages_common_table_meta::meta::BlockMeta; use storages_common_table_meta::meta::FormatVersion; use storages_common_table_meta::meta::Statistics; -use super::ConflictResolveContext; -use super::SnapshotChanges; use crate::operations::common::AbortOperation; -use crate::operations::mutation::MutationDeletedSegment; +use crate::operations::common::ConflictResolveContext; +use crate::operations::common::SnapshotChanges; +use crate::operations::mutation::BlockIndex; +use crate::operations::mutation::CompactExtraInfo; +use crate::operations::mutation::DeletedSegmentInfo; +use crate::operations::mutation::SegmentIndex; use crate::statistics::merge_statistics; #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Default)] @@ -47,19 +50,22 @@ pub enum MutationLogEntry { index: BlockMetaIndex, }, DeletedSegment { - deleted_segment: MutationDeletedSegment, + deleted_segment: DeletedSegmentInfo, }, ReplacedBlock { index: BlockMetaIndex, block_meta: Arc, }, + CompactExtras { + extras: CompactExtraInfo, + }, DoNothing, } #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, Default, PartialEq)] pub struct BlockMetaIndex { - pub segment_idx: usize, - pub block_idx: usize, + pub segment_idx: SegmentIndex, + pub block_idx: BlockIndex, // range is unused for now. // pub range: Option>, } diff --git a/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs b/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs index 28dd5bf9f0362..e073d2510eb0b 100644 --- a/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs +++ b/src/query/storages/fuse/src/operations/common/processors/transform_mutation_aggregator.rs @@ -12,6 +12,7 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::hash_map::Entry; use std::collections::BTreeMap; use std::collections::HashMap; use std::sync::Arc; @@ -25,8 +26,9 @@ use common_expression::BlockMetaInfoPtr; use common_expression::BlockThresholds; use common_expression::DataBlock; use common_expression::TableSchemaRef; -use common_pipeline_transforms::processors::transforms::transform_accumulating_async::AsyncAccumulatingTransform; +use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransform; use common_sql::executor::MutationKind; +use itertools::Itertools; use log::debug; use log::info; use opendal::Operator; @@ -47,7 +49,6 @@ use crate::operations::common::MutationLogs; use crate::operations::common::SnapshotChanges; use crate::operations::common::SnapshotMerged; use crate::operations::mutation::BlockIndex; -use crate::operations::mutation::MutationDeletedSegment; use crate::operations::mutation::SegmentIndex; use crate::statistics::reducers::merge_statistics_mut; use crate::statistics::reducers::reduce_block_metas; @@ -64,8 +65,9 @@ pub struct TableMutationAggregator { mutations: HashMap, appended_segments: Vec, - deleted_segments: Vec, appended_statistics: Statistics, + removed_segment_indexes: Vec, + removed_statistics: Statistics, abort_operation: AbortOperation, kind: MutationKind, @@ -117,7 +119,8 @@ impl TableMutationAggregator { base_segments, abort_operation: AbortOperation::default(), appended_statistics: Statistics::default(), - deleted_segments: vec![], + removed_segment_indexes: vec![], + removed_statistics: Statistics::default(), kind, finished_tasks: 0, start_time: Instant::now(), @@ -142,14 +145,15 @@ impl TableMutationAggregator { pub fn accumulate_log_entry(&mut self, log_entry: MutationLogEntry) { match log_entry { MutationLogEntry::ReplacedBlock { index, block_meta } => { - self.mutations - .entry(index.segment_idx) - .and_modify(|v| v.push_replaced(index.block_idx, block_meta.clone())) - .or_insert(BlockMutations::new_replacement( - index.block_idx, - block_meta.clone(), - )); self.abort_operation.add_block(&block_meta); + match self.mutations.entry(index.segment_idx) { + Entry::Occupied(mut v) => { + v.get_mut().push_replaced(index.block_idx, block_meta); + } + Entry::Vacant(v) => { + v.insert(BlockMutations::new_replacement(index.block_idx, block_meta)); + } + } } MutationLogEntry::DeletedBlock { index } => { self.mutations @@ -158,7 +162,12 @@ impl TableMutationAggregator { .or_insert(BlockMutations::new_deletion(index.block_idx)); } MutationLogEntry::DeletedSegment { deleted_segment } => { - self.deleted_segments.push(deleted_segment) + self.removed_segment_indexes.push(deleted_segment.index); + merge_statistics_mut( + &mut self.removed_statistics, + &deleted_segment.summary, + self.default_cluster_key_id, + ); } MutationLogEntry::DoNothing => (), MutationLogEntry::AppendSegment { @@ -177,6 +186,29 @@ impl TableMutationAggregator { self.appended_segments .push((segment_location, format_version)) } + MutationLogEntry::CompactExtras { extras } => { + match self.mutations.entry(extras.segment_index) { + Entry::Occupied(mut v) => { + v.get_mut() + .replaced_blocks + .extend(extras.unchanged_blocks.into_iter()); + } + Entry::Vacant(v) => { + v.insert(BlockMutations { + replaced_blocks: extras.unchanged_blocks, + deleted_blocks: vec![], + }); + } + } + + self.removed_segment_indexes + .extend(extras.removed_segment_indexes); + merge_statistics_mut( + &mut self.removed_statistics, + &extras.removed_segment_summary, + self.default_cluster_key_id, + ); + } } } @@ -195,17 +227,6 @@ impl TableMutationAggregator { let start = Instant::now(); let mut count = 0; - let mut removed_segment_indexes = vec![]; - let mut removed_statistics = Statistics::default(); - for s in &self.deleted_segments { - removed_segment_indexes.push(s.deleted_segment.index); - merge_statistics_mut( - &mut removed_statistics, - &s.deleted_segment.segment_info.1, - self.default_cluster_key_id, - ); - } - let mut replaced_segments = HashMap::new(); let mut merged_statistics = Statistics::default(); let chunk_size = self.ctx.get_settings().get_max_storage_io_requests()? as usize; @@ -224,21 +245,24 @@ impl TableMutationAggregator { replaced_segments .insert(result.index, (location, SegmentInfo::VERSION)); } else { - removed_segment_indexes.push(result.index); + self.removed_segment_indexes.push(result.index); } - merge_statistics_mut( - &mut removed_statistics, - &result.origin_summary, - self.default_cluster_key_id, - ); + if let Some(origin_summary) = result.origin_summary { + merge_statistics_mut( + &mut self.removed_statistics, + &origin_summary, + self.default_cluster_key_id, + ); + } } // Refresh status { count += chunk.len(); let status = format!( - "mutation: generate new segment files:{}/{}, cost:{} sec", + "{}: generate new segment files:{}/{}, cost:{} sec", + self.kind, count, segment_indices.len(), start.elapsed().as_secs() @@ -247,7 +271,7 @@ impl TableMutationAggregator { } } - info!("removed_segment_indexes:{:?}", removed_segment_indexes); + info!("removed_segment_indexes:{:?}", self.removed_segment_indexes); merge_statistics_mut( &mut merged_statistics, @@ -258,9 +282,9 @@ impl TableMutationAggregator { ConflictResolveContext::ModifiedSegmentExistsInLatest(SnapshotChanges { appended_segments, replaced_segments, - removed_segment_indexes, + removed_segment_indexes: std::mem::take(&mut self.removed_segment_indexes), merged_statistics, - removed_statistics, + removed_statistics: std::mem::take(&mut self.removed_statistics), }) } }; @@ -278,59 +302,73 @@ impl TableMutationAggregator { let mut tasks = Vec::with_capacity(segment_indices.len()); for index in segment_indices { let segment_mutation = self.mutations.remove(&index).unwrap(); - let location = self.base_segments[index].clone(); + let location = self.base_segments.get(index).cloned(); let schema = self.schema.clone(); let op = self.dal.clone(); let location_gen = self.location_gen.clone(); tasks.push(async move { - // read the old segment - let compact_segment_info = - SegmentsIO::read_compact_segment(op.clone(), location, schema, false).await?; - let mut segment_info = SegmentInfo::try_from(compact_segment_info)?; - - // take away the blocks, they are being mutated - let mut block_editor = BTreeMap::<_, _>::from_iter( - std::mem::take(&mut segment_info.blocks) - .into_iter() - .enumerate(), - ); - for (idx, new_meta) in segment_mutation.replaced_blocks { - block_editor.insert(idx, new_meta); - } - for idx in segment_mutation.deleted_blocks { - block_editor.remove(&idx); - } + let (new_blocks, origin_summary) = if let Some(loc) = location { + // read the old segment + let compact_segment_info = + SegmentsIO::read_compact_segment(op.clone(), loc, schema, false).await?; + let mut segment_info = SegmentInfo::try_from(compact_segment_info)?; + + // take away the blocks, they are being mutated + let mut block_editor = BTreeMap::<_, _>::from_iter( + std::mem::take(&mut segment_info.blocks) + .into_iter() + .enumerate(), + ); + for (idx, new_meta) in segment_mutation.replaced_blocks { + block_editor.insert(idx, new_meta); + } + for idx in segment_mutation.deleted_blocks { + block_editor.remove(&idx); + } + + if block_editor.is_empty() { + return Ok(SegmentLite { + index, + new_segment_info: None, + origin_summary: Some(segment_info.summary), + }); + } - if !block_editor.is_empty() { // assign back the mutated blocks to segment let new_blocks = block_editor.into_values().collect::>(); - // re-calculate the segment statistics - let new_summary = - reduce_block_metas(&new_blocks, thresholds, default_cluster_key_id); - // create new segment info - let new_segment = SegmentInfo::new(new_blocks, new_summary.clone()); - - // write the segment info. - let location = location_gen.gen_segment_info_location(); - let serialized_segment = SerializedSegment { - path: location.clone(), - segment: Arc::new(new_segment), - }; - SegmentsIO::write_segment(op, serialized_segment).await?; - - Ok(SegmentLite { - index, - new_segment_info: Some((location, new_summary)), - origin_summary: segment_info.summary, - }) + (new_blocks, Some(segment_info.summary)) } else { - Ok(SegmentLite { - index, - new_segment_info: None, - origin_summary: segment_info.summary, - }) - } + // use by compact. + assert!(segment_mutation.deleted_blocks.is_empty()); + let new_blocks = segment_mutation + .replaced_blocks + .into_iter() + .sorted_by(|a, b| a.0.cmp(&b.0)) + .map(|(_, meta)| meta) + .collect::>(); + (new_blocks, None) + }; + + // re-calculate the segment statistics + let new_summary = + reduce_block_metas(&new_blocks, thresholds, default_cluster_key_id); + // create new segment info + let new_segment = SegmentInfo::new(new_blocks, new_summary.clone()); + + // write the segment info. + let location = location_gen.gen_segment_info_location(); + let serialized_segment = SerializedSegment { + path: location.clone(), + segment: Arc::new(new_segment), + }; + SegmentsIO::write_segment(op, serialized_segment).await?; + + Ok(SegmentLite { + index, + new_segment_info: Some((location, new_summary)), + origin_summary, + }) }); } @@ -384,5 +422,5 @@ struct SegmentLite { // new segment location and summary. new_segment_info: Option<(String, Statistics)>, // origin segment summary. - origin_summary: Statistics, + origin_summary: Option, } diff --git a/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs b/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs index c2c2a8727be91..0782c5c8b098a 100644 --- a/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs +++ b/src/query/storages/fuse/src/operations/common/processors/transform_serialize_block.rs @@ -171,26 +171,40 @@ impl Processor for TransformSerializeBlock { if let Some(meta) = meta { let meta = SerializeDataMeta::downcast_from(meta).ok_or(ErrorCode::Internal("It's a bug"))?; - if let Some(deleted_segment) = meta.deleted_segment { - // delete a whole segment, segment level - let data_block = - Self::mutation_logs(MutationLogEntry::DeletedSegment { deleted_segment }); - self.output.push_data(Ok(data_block)); - Ok(Event::NeedConsume) - } else if input_data.is_empty() { - // delete a whole block, block level - let data_block = - Self::mutation_logs(MutationLogEntry::DeletedBlock { index: meta.index }); - self.output.push_data(Ok(data_block)); - Ok(Event::NeedConsume) - } else { - // replace the old block - self.state = State::NeedSerialize { - block: input_data, - stats_type: meta.stats_type, - index: Some(meta.index), - }; - Ok(Event::Sync) + match meta { + SerializeDataMeta::DeletedSegment(deleted_segment) => { + // delete a whole segment, segment level + let data_block = + Self::mutation_logs(MutationLogEntry::DeletedSegment { deleted_segment }); + self.output.push_data(Ok(data_block)); + Ok(Event::NeedConsume) + } + SerializeDataMeta::SerializeBlock(serialize_block) => { + if input_data.is_empty() { + // delete a whole block, block level + let data_block = Self::mutation_logs(MutationLogEntry::DeletedBlock { + index: serialize_block.index, + }); + self.output.push_data(Ok(data_block)); + Ok(Event::NeedConsume) + } else { + // replace the old block + self.state = State::NeedSerialize { + block: input_data, + stats_type: serialize_block.stats_type, + index: Some(serialize_block.index), + }; + Ok(Event::Sync) + } + } + SerializeDataMeta::CompactExtras(compact_extras) => { + // compact extras + let data_block = Self::mutation_logs(MutationLogEntry::CompactExtras { + extras: compact_extras, + }); + self.output.push_data(Ok(data_block)); + Ok(Event::NeedConsume) + } } } else if input_data.is_empty() { // do nothing diff --git a/src/query/storages/fuse/src/operations/common/snapshot_generator.rs b/src/query/storages/fuse/src/operations/common/snapshot_generator.rs index daabbe108d967..356db74c29e83 100644 --- a/src/query/storages/fuse/src/operations/common/snapshot_generator.rs +++ b/src/query/storages/fuse/src/operations/common/snapshot_generator.rs @@ -33,11 +33,9 @@ use storages_common_table_meta::meta::Statistics; use storages_common_table_meta::meta::TableSnapshot; use uuid::Uuid; -use crate::metrics::metrics_inc_commit_mutation_latest_snapshot_append_only; use crate::metrics::metrics_inc_commit_mutation_modified_segment_exists_in_latest; use crate::metrics::metrics_inc_commit_mutation_unresolvable_conflict; use crate::statistics::merge_statistics; -use crate::statistics::reducers::deduct_statistics; use crate::statistics::reducers::deduct_statistics_mut; use crate::statistics::reducers::merge_statistics_mut; @@ -111,7 +109,6 @@ pub struct SnapshotMerged { #[derive(Clone, serde::Serialize, serde::Deserialize, Debug, PartialEq)] pub enum ConflictResolveContext { AppendOnly((SnapshotMerged, TableSchemaRef)), - LatestSnapshotAppendOnly(SnapshotMerged), ModifiedSegmentExistsInLatest(SnapshotChanges), } @@ -238,42 +235,6 @@ impl SnapshotGenerator for MutationGenerator { "conflict_resolve_ctx should not be AppendOnly in MutationGenerator", )); } - ConflictResolveContext::LatestSnapshotAppendOnly(ctx) => { - if let Some(range_of_newly_append) = - ConflictResolveContext::is_latest_snapshot_append_only( - &self.base_snapshot, - &previous, - ) - { - info!("resolvable conflicts detected"); - metrics_inc_commit_mutation_latest_snapshot_append_only(); - let append_segments = &previous.segments[range_of_newly_append]; - let append_statistics = - deduct_statistics(&previous.summary, &self.base_snapshot.summary); - - let new_segments = append_segments - .iter() - .chain(ctx.merged_segments.iter()) - .cloned() - .collect::>(); - let new_summary = merge_statistics( - &ctx.merged_statistics, - &append_statistics, - default_cluster_key_id, - ); - let new_snapshot = TableSnapshot::new( - Uuid::new_v4(), - &previous.timestamp, - Some((previous.snapshot_id, previous.format_version)), - schema, - new_summary, - new_segments, - cluster_key_meta, - previous.table_statistics_location.clone(), - ); - return Ok(new_snapshot); - } - } ConflictResolveContext::ModifiedSegmentExistsInLatest(ctx) => { if let Some((removed, replaced)) = ConflictResolveContext::is_modified_segments_exists_in_latest( diff --git a/src/query/storages/fuse/src/operations/compact.rs b/src/query/storages/fuse/src/operations/compact.rs index f9e2be64bc53c..4aa9bdf208afd 100644 --- a/src/query/storages/fuse/src/operations/compact.rs +++ b/src/query/storages/fuse/src/operations/compact.rs @@ -12,20 +12,25 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::collections::HashSet; use std::sync::Arc; +use common_base::runtime::Runtime; +use common_catalog::plan::Partitions; +use common_catalog::plan::PartitionsShuffleKind; use common_catalog::plan::Projection; -use common_catalog::table::CompactTarget; +use common_exception::ErrorCode; use common_exception::Result; +use common_expression::ColumnId; use common_pipeline_core::processors::processor::ProcessorPtr; use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransformer; +use common_sql::executor::MutationKind; use storages_common_table_meta::meta::TableSnapshot; -use crate::operations::common::CommitSink; -use crate::operations::common::MutationGenerator; +use crate::operations::common::TableMutationAggregator; use crate::operations::common::TransformSerializeBlock; use crate::operations::mutation::BlockCompactMutator; -use crate::operations::mutation::CompactAggregator; +use crate::operations::mutation::CompactLazyPartInfo; use crate::operations::mutation::CompactSource; use crate::operations::mutation::SegmentCompactMutator; use crate::pipelines::Pipeline; @@ -45,50 +50,20 @@ pub struct CompactOptions { impl FuseTable { #[async_backtrace::framed] - pub(crate) async fn do_compact( + pub(crate) async fn do_compact_segments( &self, ctx: Arc, - target: CompactTarget, limit: Option, - pipeline: &mut Pipeline, ) -> Result<()> { - let snapshot_opt = self.read_table_snapshot().await?; - let base_snapshot = if let Some(val) = snapshot_opt { - val + let compact_options = if let Some(v) = self.compact_options(limit).await? { + v } else { - // no snapshot, no compaction. - return Ok(()); - }; - - if base_snapshot.summary.block_count <= 1 { return Ok(()); - } - - let block_per_seg = - self.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT); - - let compact_params = CompactOptions { - base_snapshot, - block_per_seg, - limit, }; - match target { - CompactTarget::Blocks => self.compact_blocks(ctx, pipeline, compact_params).await, - CompactTarget::Segments => self.compact_segments(ctx, pipeline, compact_params).await, - } - } - - #[async_backtrace::framed] - async fn compact_segments( - &self, - ctx: Arc, - _pipeline: &mut Pipeline, - options: CompactOptions, - ) -> Result<()> { let mut segment_mutator = SegmentCompactMutator::try_create( ctx.clone(), - options, + compact_options, self.meta_location_generator().clone(), self.operator.clone(), self.cluster_key_id(), @@ -101,46 +76,91 @@ impl FuseTable { segment_mutator.try_commit(Arc::new(self.clone())).await } - /// The flow of Pipeline is as follows: - /// +-------------+ +-----------------------+ - /// |CompactSource| ---> |TransformSerializeBlock| ------ - /// +-------------+ +-----------------------+ | +-----------------+ +----------+ - /// | ... | ---> | ... | ... | ---> |CompactAggregator| ---> |CommitSink| - /// +-------------+ +-----------------------+ | +-----------------+ +----------+ - /// |CompactSource| ---> |TransformSerializeBlock| ------ - /// +-------------+ +-----------------------+ #[async_backtrace::framed] - async fn compact_blocks( + pub(crate) async fn do_compact_blocks( &self, ctx: Arc, - pipeline: &mut Pipeline, - options: CompactOptions, - ) -> Result<()> { - let thresholds = self.get_block_thresholds(); + limit: Option, + ) -> Result)>> { + let compact_options = if let Some(v) = self.compact_options(limit).await? { + v + } else { + return Ok(None); + }; + let thresholds = self.get_block_thresholds(); let mut mutator = BlockCompactMutator::new( ctx.clone(), thresholds, - options, + compact_options, self.operator.clone(), - self.cluster_key_meta.as_ref().map(|k| k.0), + self.cluster_key_id(), ); - mutator.target_select().await?; - if mutator.compact_tasks.is_empty() { - return Ok(()); + + let partitions = mutator.target_select().await?; + if partitions.is_empty() { + return Ok(None); } - // Status. - ctx.set_status_info("compact: begin to run compact tasks"); - ctx.set_partitions(mutator.compact_tasks.clone())?; + Ok(Some(( + partitions, + mutator.compact_params.base_snapshot.clone(), + ))) + } + + pub fn build_compact_partial( + &self, + ctx: Arc, + parts: Partitions, + column_ids: HashSet, + pipeline: &mut Pipeline, + ) -> Result<()> { + let is_lazy = parts.is_lazy; + let thresholds = self.get_block_thresholds(); + let cluster_key_id = self.cluster_key_id(); + let mut max_threads = ctx.get_settings().get_max_threads()? as usize; + if is_lazy { + let query_ctx = ctx.clone(); + + let lazy_parts = parts + .partitions + .into_iter() + .map(|v| { + v.as_any() + .downcast_ref::() + .unwrap() + .clone() + }) + .collect::>(); + + pipeline.set_on_init(move || { + let ctx = query_ctx.clone(); + let column_ids = column_ids.clone(); + let partitions = Runtime::with_worker_threads(2, None)?.block_on(async move { + let partitions = BlockCompactMutator::build_compact_tasks( + ctx.clone(), + column_ids, + cluster_key_id, + thresholds, + lazy_parts, + ) + .await?; + + Result::<_, ErrorCode>::Ok(partitions) + })?; + + let partitions = Partitions::create_nolazy(PartitionsShuffleKind::Mod, partitions); + query_ctx.set_partitions(partitions)?; + Ok(()) + }); + } else { + max_threads = max_threads.min(parts.len()).max(1); + ctx.set_partitions(parts)?; + } let all_column_indices = self.all_column_indices(); let projection = Projection::Columns(all_column_indices); let block_reader = self.create_block_reader(projection, false, ctx.clone())?; - let max_threads = std::cmp::min( - ctx.get_settings().get_max_threads()? as usize, - mutator.compact_tasks.len(), - ); // Add source pipe. pipeline.add_source( |output| { @@ -154,11 +174,8 @@ impl FuseTable { max_threads, )?; - let block_thresholds = self.get_block_thresholds(); // sort - let cluster_stats_gen = - self.cluster_gen_for_append(ctx.clone(), pipeline, block_thresholds)?; - + let cluster_stats_gen = self.cluster_gen_for_append(ctx.clone(), pipeline, thresholds)?; pipeline.add_transform( |input: Arc, output| { let proc = TransformSerializeBlock::try_create( @@ -172,35 +189,42 @@ impl FuseTable { }, )?; - pipeline.try_resize(1)?; - - pipeline.add_transform(|input, output| { - let compact_aggregator = CompactAggregator::new( - self.operator.clone(), - self.meta_location_generator().clone(), - mutator.clone(), - ); - Ok(ProcessorPtr::create(AsyncAccumulatingTransformer::create( - input, - output, - compact_aggregator, - ))) - })?; - - let snapshot_gen = MutationGenerator::new(mutator.compact_params.base_snapshot); - pipeline.add_sink(|input| { - CommitSink::try_create( - self, - ctx.clone(), - None, - snapshot_gen.clone(), - input, - None, - true, - None, - ) - })?; - + if is_lazy { + pipeline.try_resize(1)?; + pipeline.add_transform(|input, output| { + let mutation_aggregator = + TableMutationAggregator::new(self, ctx.clone(), vec![], MutationKind::Compact); + Ok(ProcessorPtr::create(AsyncAccumulatingTransformer::create( + input, + output, + mutation_aggregator, + ))) + })?; + } Ok(()) } + + #[async_backtrace::framed] + async fn compact_options(&self, limit: Option) -> Result> { + let snapshot_opt = self.read_table_snapshot().await?; + let base_snapshot = if let Some(val) = snapshot_opt { + val + } else { + // no snapshot, no compaction. + return Ok(None); + }; + + if base_snapshot.summary.block_count <= 1 { + return Ok(None); + } + + let block_per_seg = + self.get_option(FUSE_OPT_KEY_BLOCK_PER_SEGMENT, DEFAULT_BLOCK_PER_SEGMENT); + + Ok(Some(CompactOptions { + base_snapshot, + block_per_seg, + limit, + })) + } } diff --git a/src/query/storages/fuse/src/operations/delete.rs b/src/query/storages/fuse/src/operations/delete.rs index bbb732851b67b..f471f284992a0 100644 --- a/src/query/storages/fuse/src/operations/delete.rs +++ b/src/query/storages/fuse/src/operations/delete.rs @@ -54,7 +54,6 @@ use crate::metrics::metrics_inc_deletion_block_range_pruned_whole_block_nums; use crate::metrics::metrics_inc_deletion_segment_range_purned_whole_segment_nums; use crate::operations::mutation::Mutation; use crate::operations::mutation::MutationAction; -use crate::operations::mutation::MutationDeletedSegment; use crate::operations::mutation::MutationPartInfo; use crate::operations::mutation::MutationSource; use crate::pipelines::Pipeline; @@ -76,7 +75,7 @@ impl FuseTable { filters: Option, col_indices: Vec, query_row_id_col: bool, - ) -> Result> { + ) -> Result)>> { let snapshot_opt = self.read_table_snapshot().await?; // check if table is empty @@ -147,7 +146,7 @@ impl FuseTable { if partitions.is_empty() { return Ok(None); } - Ok(Some((partitions, snapshot.as_ref().clone()))) + Ok(Some((partitions, snapshot.clone()))) } pub fn try_eval_const( @@ -242,9 +241,9 @@ impl FuseTable { projection.sort_by_key(|&i| source_col_indices[i]); let ops = vec![BlockOperator::Project { projection }]; - let max_threads = - std::cmp::min(ctx.get_settings().get_max_threads()? as usize, total_tasks); - let max_threads = std::cmp::max(max_threads, 1); + let max_threads = (ctx.get_settings().get_max_threads()? as usize) + .min(total_tasks) + .max(1); // Add source pipe. pipeline.add_source( |output| { @@ -366,22 +365,21 @@ impl FuseTable { block_metas .into_iter() .zip(inner_parts.partitions.into_iter()) - .map(|((block_meta_index, block_meta), c)| { + .map(|((index, block_meta), inner_part)| { let cluster_stats = if with_origin { block_meta.cluster_stats.clone() } else { None }; - let key = (block_meta_index.segment_idx, block_meta_index.block_idx); - let whole_block_deletion = whole_block_deletions.contains(&key); - let part_info_ptr: PartInfoPtr = Arc::new(Box::new( - Mutation::MutationPartInfo(MutationPartInfo::create( - block_meta_index, + let key = (index.segment_idx, index.block_idx); + let whole_block_mutation = whole_block_deletions.contains(&key); + let part_info_ptr: PartInfoPtr = + Arc::new(Box::new(Mutation::MutationPartInfo(MutationPartInfo { + index, cluster_stats, - c, - whole_block_deletion, - )), - )); + inner_part, + whole_block_mutation, + }))); part_info_ptr }) .collect(), @@ -392,12 +390,12 @@ impl FuseTable { let segment_num = pruner.deleted_segments.len(); // now try to add deleted_segment for deleted_segment in pruner.deleted_segments { - part_num += deleted_segment.segment_info.1.block_count as usize; - num_whole_block_mutation += deleted_segment.segment_info.1.block_count as usize; + part_num += deleted_segment.summary.block_count as usize; + num_whole_block_mutation += deleted_segment.summary.block_count as usize; parts .partitions .push(Arc::new(Box::new(Mutation::MutationDeletedSegment( - MutationDeletedSegment::create(deleted_segment), + deleted_segment, )))); } diff --git a/src/query/storages/fuse/src/operations/merge_into/processors/transform_matched_mutation_aggregator.rs b/src/query/storages/fuse/src/operations/merge_into/processors/transform_matched_mutation_aggregator.rs index 7dc70d903f47f..ca18f7a0f18da 100644 --- a/src/query/storages/fuse/src/operations/merge_into/processors/transform_matched_mutation_aggregator.rs +++ b/src/query/storages/fuse/src/operations/merge_into/processors/transform_matched_mutation_aggregator.rs @@ -18,7 +18,7 @@ use common_pipeline_core::pipe::PipeItem; use common_pipeline_core::processors::port::InputPort; use common_pipeline_core::processors::port::OutputPort; use common_pipeline_core::processors::processor::ProcessorPtr; -use common_pipeline_transforms::processors::transforms::transform_accumulating_async::AsyncAccumulatingTransform; +use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransform; use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransformer; use crate::operations::merge_into::mutator::MatchedAggregator; diff --git a/src/query/storages/fuse/src/operations/mod.rs b/src/query/storages/fuse/src/operations/mod.rs index af19fb2d6804f..4d63d9fc8f533 100644 --- a/src/query/storages/fuse/src/operations/mod.rs +++ b/src/query/storages/fuse/src/operations/mod.rs @@ -41,6 +41,7 @@ pub use common::TransformSerializeBlock; pub use compact::CompactOptions; pub use mutation::BlockCompactMutator; pub use mutation::CompactPartInfo; +pub use mutation::DeletedSegmentInfo; pub use mutation::Mutation; pub use mutation::ReclusterMutator; pub use mutation::SegmentCompactMutator; diff --git a/src/query/storages/fuse/src/operations/mutation/compact/block_compact_mutator.rs b/src/query/storages/fuse/src/operations/mutation/compact/block_compact_mutator.rs index fc9e850a2434c..52e0fa18104e7 100644 --- a/src/query/storages/fuse/src/operations/mutation/compact/block_compact_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/compact/block_compact_mutator.rs @@ -12,31 +12,39 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::BTreeMap; -use std::collections::HashMap; use std::collections::HashSet; use std::collections::VecDeque; use std::sync::Arc; use std::time::Instant; use std::vec; +use common_base::base::tokio::sync::OwnedSemaphorePermit; +use common_base::base::tokio::sync::Semaphore; +use common_base::runtime::GlobalIORuntime; +use common_base::runtime::TrySpawn; +use common_catalog::plan::PartInfoPtr; use common_catalog::plan::Partitions; use common_catalog::plan::PartitionsShuffleKind; +use common_exception::ErrorCode; use common_exception::Result; use common_expression::BlockThresholds; use common_expression::ColumnId; use opendal::Operator; use storages_common_table_meta::meta::BlockMeta; -use storages_common_table_meta::meta::Location; -use storages_common_table_meta::meta::SegmentInfo; +use storages_common_table_meta::meta::CompactSegmentInfo; use storages_common_table_meta::meta::Statistics; use crate::io::SegmentsIO; use crate::operations::common::BlockMetaIndex; -use crate::operations::mutation::CompactPartInfo; +use crate::operations::mutation::compact::compact_part::CompactExtraInfo; +use crate::operations::mutation::compact::compact_part::CompactLazyPartInfo; +use crate::operations::mutation::compact::compact_part::CompactPartInfo; +use crate::operations::mutation::BlockIndex; +use crate::operations::mutation::CompactTaskInfo; +use crate::operations::mutation::SegmentIndex; use crate::operations::mutation::MAX_BLOCK_COUNT; use crate::operations::CompactOptions; -use crate::statistics::reducers::deduct_statistics_mut; +use crate::statistics::reducers::merge_statistics_mut; use crate::statistics::sort_by_cluster_stats; use crate::TableContext; @@ -47,18 +55,7 @@ pub struct BlockCompactMutator { pub thresholds: BlockThresholds, pub compact_params: CompactOptions, - pub column_ids: HashSet, pub cluster_key_id: Option, - - // A set of Parts. - pub compact_tasks: Partitions, - pub unchanged_blocks_map: HashMap>>, - // locations all the unchanged segments. - pub unchanged_segments_map: BTreeMap, - // summarised statistics of all the unchanged segments - pub unchanged_segment_statistics: Statistics, - - compacted_segment_cnt: usize, } impl BlockCompactMutator { @@ -69,35 +66,23 @@ impl BlockCompactMutator { operator: Operator, cluster_key_id: Option, ) -> Self { - let column_ids = compact_params.base_snapshot.schema.to_leaf_column_id_set(); - let unchanged_segment_statistics = compact_params.base_snapshot.summary.clone(); Self { ctx, operator, thresholds, compact_params, - column_ids, cluster_key_id, - unchanged_blocks_map: HashMap::new(), - compact_tasks: Partitions::create_nolazy(PartitionsShuffleKind::Mod, vec![]), - unchanged_segments_map: BTreeMap::new(), - unchanged_segment_statistics, - compacted_segment_cnt: 0, } } #[async_backtrace::framed] - pub async fn target_select(&mut self) -> Result<()> { + pub async fn target_select(&mut self) -> Result { let start = Instant::now(); let snapshot = self.compact_params.base_snapshot.clone(); let segment_locations = &snapshot.segments; let number_segments = segment_locations.len(); let limit = self.compact_params.limit.unwrap_or(number_segments); - let mut segment_idx = 0; - let mut compacted_block_cnt = 0; - let mut checked_end_at = 0; - // Status. self.ctx .set_status_info("compact: begin to build compact tasks"); @@ -107,25 +92,36 @@ impl BlockCompactMutator { self.operator.clone(), Arc::new(self.compact_params.base_snapshot.schema.clone()), ); - let mut checker = SegmentCompactChecker::new(self.compact_params.block_per_seg as u64); - let chunk_size = self.ctx.get_settings().get_max_threads()? as usize * 4; + let mut checker = SegmentCompactChecker::new( + self.compact_params.block_per_seg as u64, + self.cluster_key_id, + ); + + let mut segment_idx = 0; let mut is_end = false; + let mut parts = Vec::new(); + let chunk_size = self.ctx.get_settings().get_max_threads()? as usize * 4; for chunk in segment_locations.chunks(chunk_size) { // Read the segments information in parallel. let mut segment_infos = segments_io - .read_segments::(chunk, false) + .read_segments::>(chunk, false) .await? .into_iter() - .zip(chunk.iter()) - .map(|(sg, chunk)| sg.map(|v| (v, chunk))) + .map(|sg| { + sg.map(|v| { + let idx = segment_idx; + segment_idx += 1; + (idx, v) + }) + }) .collect::>>()?; if let Some(default_cluster_key) = self.cluster_key_id { // sort descending. segment_infos.sort_by(|a, b| { sort_by_cluster_stats( - &b.0.summary.cluster_stats, - &a.0.summary.cluster_stats, + &b.1.summary.cluster_stats, + &a.1.summary.cluster_stats, default_cluster_key, ) }); @@ -133,48 +129,25 @@ impl BlockCompactMutator { // Check the segment to be compacted. // Size of compacted segment should be in range R == [threshold, 2 * threshold) - for (segment, loc) in segment_infos.into_iter() { - if is_end { - self.unchanged_segments_map.insert(segment_idx, loc.clone()); - segment_idx += 1; - continue; - } - - let segments_vec = checker.add(loc.clone(), segment); + for (segment_idx, compact_segment) in segment_infos.into_iter() { + let segments_vec = checker.add(segment_idx, compact_segment); for segments in segments_vec { - if SegmentCompactChecker::check_for_compact(&segments) { - self.compacted_segment_cnt += segments.len(); - compacted_block_cnt += - segments.iter().fold(0, |acc, x| acc + x.1.blocks.len()); - // build the compact tasks. - self.build_compact_tasks( - segments.into_iter().map(|s| s.1).collect(), - segment_idx, - ); - } else { - self.unchanged_segments_map - .insert(segment_idx, segments[0].0.clone()); - } - segment_idx += 1; + self.generate_part(segments, &mut parts, &mut checker); } - if self.compacted_segment_cnt + checker.segments.len() >= limit - || compacted_block_cnt >= MAX_BLOCK_COUNT + if checker.compacted_segment_cnt + checker.segments.len() >= limit + || checker.compacted_block_cnt >= MAX_BLOCK_COUNT as u64 { - // The remaining segments needs to be pushed into unchanged_map, - // so execute finalize here. - self.finalize(std::mem::take(&mut checker.segments), &mut segment_idx); is_end = true; + break; } } - checked_end_at += chunk.len(); - // Status. { let status = format!( "compact: read segment files:{}/{}, cost:{} sec", - checked_end_at, + segment_idx, number_segments, start.elapsed().as_secs() ); @@ -187,150 +160,210 @@ impl BlockCompactMutator { } // finalize the compaction. - self.finalize(std::mem::take(&mut checker.segments), &mut segment_idx); - - // combine with the unprocessed segments (which are outside of the limit). - for segment_location in segment_locations[checked_end_at..].iter() { - self.unchanged_segments_map - .insert(segment_idx, segment_location.clone()); - segment_idx += 1; - } + self.generate_part( + std::mem::take(&mut checker.segments), + &mut parts, + &mut checker, + ); // Status. self.ctx.set_status_info(&format!( - "compact: end to build compact tasks:{}, segments to be compacted:{}, cost:{} sec", - self.compact_tasks.len(), - self.compacted_segment_cnt, + "compact: end to build lazy compact parts:{}, segments to be compacted:{}, cost:{} sec", + parts.len(), + checker.compacted_segment_cnt, start.elapsed().as_secs() )); - Ok(()) - } - - // Select the row_count >= min_rows_per_block or block_size >= max_bytes_per_block - // as the perfect_block condition(N for short). Gets a set of segments, iterates - // through the blocks, and finds the blocks >= N and blocks < 2N as a task. - fn build_compact_tasks(&mut self, segments: Vec, segment_idx: usize) { - let mut builder = CompactTaskBuilder::new(self.column_ids.clone(), self.cluster_key_id); - let mut tasks = VecDeque::new(); - let mut block_idx = 0; - // Used to identify whether the latest block is unchanged or needs to be compacted. - let mut latest_flag = true; - let mut unchanged_blocks: BTreeMap> = BTreeMap::new(); - let mut blocks = Vec::new(); - // The order of the compact is from old to new. - segments.into_iter().rev().for_each(|s| { - deduct_statistics_mut(&mut self.unchanged_segment_statistics, &s.summary); - blocks.extend(s.blocks); - }); - - if let Some(default_cluster_key) = self.cluster_key_id { - // sort ascending. - blocks.sort_by(|a, b| { - sort_by_cluster_stats(&a.cluster_stats, &b.cluster_stats, default_cluster_key) - }); - } + let cluster = self.ctx.get_cluster(); + let partitions = if cluster.is_empty() || parts.len() < cluster.nodes.len() { + let column_ids = self + .compact_params + .base_snapshot + .schema + .to_leaf_column_id_set(); + let lazy_parts = parts + .into_iter() + .map(|v| { + v.as_any() + .downcast_ref::() + .unwrap() + .clone() + }) + .collect::>(); + Partitions::create_nolazy( + PartitionsShuffleKind::Mod, + BlockCompactMutator::build_compact_tasks( + self.ctx.clone(), + column_ids, + self.cluster_key_id, + self.thresholds, + lazy_parts, + ) + .await?, + ) + } else { + Partitions::create(PartitionsShuffleKind::Mod, parts, true) + }; + Ok(partitions) + } - for block in blocks.iter() { - let (unchanged, need_take) = builder.add(block, self.thresholds); - if need_take { - let blocks = builder.take_blocks(); - latest_flag = - builder.build_task(&mut tasks, &mut unchanged_blocks, block_idx, blocks); - block_idx += 1; - } - if unchanged { - let blocks = vec![block.clone()]; - latest_flag = - builder.build_task(&mut tasks, &mut unchanged_blocks, block_idx, blocks); - block_idx += 1; + #[async_backtrace::framed] + pub async fn build_compact_tasks( + ctx: Arc, + column_ids: HashSet, + cluster_key_id: Option, + thresholds: BlockThresholds, + mut lazy_parts: Vec, + ) -> Result> { + let max_concurrency = { + let max_io_requests = ctx.get_settings().get_max_storage_io_requests()? as usize; + // Prevent us from miss-configured max_storage_io_requests setting, e.g. 0 + let v = std::cmp::max(max_io_requests, 10); + if v > max_io_requests { + log::warn!( + "max_storage_io_requests setting is too low {}, increased to {}", + max_io_requests, + v + ) } + v + }; + + // Pruning runtime. + let runtime = GlobalIORuntime::instance(); + let semaphore = Arc::new(Semaphore::new(max_concurrency)); + + let mut remain = lazy_parts.len() % max_concurrency; + let batch_size = lazy_parts.len() / max_concurrency; + let mut works = Vec::with_capacity(max_concurrency); + + while !lazy_parts.is_empty() { + let gap_size = std::cmp::min(1, remain); + let batch_size = batch_size + gap_size; + remain -= gap_size; + + let column_ids = column_ids.clone(); + let semaphore = semaphore.clone(); + + let batch = lazy_parts.drain(0..batch_size).collect::>(); + works.push(runtime.spawn(async_backtrace::location!().frame({ + async move { + let mut res = vec![]; + for lazy_part in batch { + let mut builder = + CompactTaskBuilder::new(column_ids.clone(), cluster_key_id, thresholds); + let parts = builder + .build_tasks( + lazy_part.segment_indices, + lazy_part.compact_segments, + semaphore.clone(), + ) + .await?; + res.extend(parts); + } + Ok::<_, ErrorCode>(res) + } + }))); } - if !builder.is_empty() { - let tail = builder.take_blocks(); - if self.cluster_key_id.is_some() && latest_flag { - // The clustering table cannot compact different level blocks. - builder.build_task(&mut tasks, &mut unchanged_blocks, block_idx, tail); - } else { - let (index, mut blocks) = if latest_flag { - unchanged_blocks - .pop_last() - .map_or((0, vec![]), |(k, v)| (k, vec![v])) - } else { - tasks.pop_back().unwrap_or((0, vec![])) - }; - - blocks.extend(tail); - tasks.push_back((index, blocks)); + match futures::future::try_join_all(works).await { + Err(e) => Err(ErrorCode::StorageOther(format!( + "build compact tasks failure, {}", + e + ))), + Ok(workers) => { + let mut parts = vec![]; + for worker in workers { + let res = worker?; + parts.extend(res); + } + Ok(parts) } } - - let mut partitions = tasks - .into_iter() - .map(|(block_idx, blocks)| { - CompactPartInfo::create(blocks, BlockMetaIndex { - segment_idx, - block_idx, - }) - }) - .collect(); - self.compact_tasks.partitions.append(&mut partitions); - if !unchanged_blocks.is_empty() { - self.unchanged_blocks_map - .insert(segment_idx, unchanged_blocks); - } } - fn finalize(&mut self, segments: Vec<(Location, SegmentInfo)>, segment_idx: &mut usize) { - if !segments.is_empty() { - if SegmentCompactChecker::check_for_compact(&segments) { - self.compacted_segment_cnt += segments.len(); - self.build_compact_tasks(segments.into_iter().map(|s| s.1).collect(), *segment_idx); - } else { - self.unchanged_segments_map - .insert(*segment_idx, segments[0].0.clone()); + fn generate_part( + &mut self, + segments: Vec<(SegmentIndex, Arc)>, + parts: &mut Vec, + checker: &mut SegmentCompactChecker, + ) { + if !segments.is_empty() && checker.check_for_compact(&segments) { + let mut segment_indices = Vec::with_capacity(segments.len()); + let mut compact_segments = Vec::with_capacity(segments.len()); + for (idx, segment) in segments.into_iter() { + segment_indices.push(idx); + compact_segments.push(segment); } - *segment_idx += 1; + + let lazy_part = CompactLazyPartInfo::create(segment_indices, compact_segments); + parts.push(lazy_part); } } } struct SegmentCompactChecker { - segments: Vec<(Location, SegmentInfo)>, + segments: Vec<(SegmentIndex, Arc)>, total_block_count: u64, - threshold: u64, + block_threshold: u64, + cluster_key_id: Option, + + compacted_segment_cnt: usize, + compacted_block_cnt: u64, } impl SegmentCompactChecker { - fn new(threshold: u64) -> Self { + fn new(block_threshold: u64, cluster_key_id: Option) -> Self { Self { - threshold, - total_block_count: 0, segments: vec![], + total_block_count: 0, + block_threshold, + cluster_key_id, + compacted_block_cnt: 0, + compacted_segment_cnt: 0, } } - fn check_for_compact(segments: &Vec<(Location, SegmentInfo)>) -> bool { - segments.len() != 1 - || (segments[0].1.summary.block_count > 1 - && segments[0].1.summary.perfect_block_count != segments[0].1.summary.block_count) + fn check_for_compact( + &mut self, + segments: &Vec<(SegmentIndex, Arc)>, + ) -> bool { + if segments.is_empty() { + return false; + } + + if segments.len() == 1 { + let summary = &segments[0].1.summary; + if (summary.block_count == 1 || summary.perfect_block_count == summary.block_count) + && (self.cluster_key_id.is_none() + || self.cluster_key_id + == summary.cluster_stats.as_ref().map(|v| v.cluster_key_id)) + { + return false; + } + } + + self.compacted_segment_cnt += segments.len(); + self.compacted_block_cnt += segments + .iter() + .fold(0, |acc, x| acc + x.1.summary.block_count); + true } fn add( &mut self, - location: Location, - segment: SegmentInfo, - ) -> Vec> { + idx: SegmentIndex, + segment: Arc, + ) -> Vec)>> { self.total_block_count += segment.summary.block_count; - if self.total_block_count < self.threshold { - self.segments.push((location, segment)); + if self.total_block_count < self.block_threshold { + self.segments.push((idx, segment)); return vec![]; } - if self.total_block_count > 2 * self.threshold { + if self.total_block_count > 2 * self.block_threshold { self.total_block_count = 0; - let trivial = vec![(location, segment)]; + let trivial = vec![(idx, segment)]; if self.segments.is_empty() { return vec![trivial]; } else { @@ -339,7 +372,7 @@ impl SegmentCompactChecker { } self.total_block_count = 0; - self.segments.push((location, segment)); + self.segments.push((idx, segment)); vec![std::mem::take(&mut self.segments)] } } @@ -347,6 +380,7 @@ impl SegmentCompactChecker { struct CompactTaskBuilder { column_ids: HashSet, cluster_key_id: Option, + thresholds: BlockThresholds, blocks: Vec>, total_rows: usize, @@ -354,10 +388,15 @@ struct CompactTaskBuilder { } impl CompactTaskBuilder { - fn new(column_ids: HashSet, cluster_key_id: Option) -> Self { + fn new( + column_ids: HashSet, + cluster_key_id: Option, + thresholds: BlockThresholds, + ) -> Self { Self { column_ids, cluster_key_id, + thresholds, blocks: vec![], total_rows: 0, total_size: 0, @@ -404,13 +443,13 @@ impl CompactTaskBuilder { fn build_task( &self, tasks: &mut VecDeque<(usize, Vec>)>, - unchanged_blocks: &mut BTreeMap>, - block_idx: usize, + unchanged_blocks: &mut Vec<(BlockIndex, Arc)>, + block_idx: BlockIndex, blocks: Vec>, ) -> bool { let mut flag = false; if blocks.len() == 1 && !self.check_compact(&blocks[0]) { - unchanged_blocks.insert(block_idx, blocks[0].clone()); + unchanged_blocks.push((block_idx, blocks[0].clone())); flag = true; } else { tasks.push_back((block_idx, blocks)); @@ -432,4 +471,115 @@ impl CompactTaskBuilder { true } } + + // Select the row_count >= min_rows_per_block or block_size >= max_bytes_per_block + // as the perfect_block condition(N for short). Gets a set of segments, iterates + // through the blocks, and finds the blocks >= N and blocks < 2N as a task. + async fn build_tasks( + &mut self, + segment_indices: Vec, + compact_segments: Vec>, + semaphore: Arc, + ) -> Result> { + let mut block_idx = 0; + // Used to identify whether the latest block is unchanged or needs to be compacted. + let mut latest_flag = true; + let mut unchanged_blocks = Vec::new(); + let mut removed_segment_summary = Statistics::default(); + + let mut iter = compact_segments.into_iter().rev(); + let tasks = std::iter::from_fn(|| { + iter.next().map(|v| { + Box::new(move |permit: OwnedSemaphorePermit| { + Box::pin(async move { + let _permit = permit; + let blocks = v.block_metas()?; + Ok::<_, ErrorCode>((blocks, v.summary.clone())) + }) + }) + }) + }); + + let runtime = GlobalIORuntime::instance(); + let join_handlers = runtime + .try_spawn_batch_with_owned_semaphore(semaphore.clone(), tasks) + .await?; + + let joint = futures::future::try_join_all(join_handlers) + .await + .map_err(|e| ErrorCode::StorageOther(format!("deserialize failure, {}", e)))?; + + let mut blocks = joint + .into_iter() + .collect::>>()? + .into_iter() + .flat_map(|(blocks, summary)| { + merge_statistics_mut(&mut removed_segment_summary, &summary, self.cluster_key_id); + blocks + }) + .collect::>(); + + if let Some(default_cluster_key) = self.cluster_key_id { + // sort ascending. + blocks.sort_by(|a, b| { + sort_by_cluster_stats(&a.cluster_stats, &b.cluster_stats, default_cluster_key) + }); + } + + let mut tasks = VecDeque::new(); + for block in blocks.iter() { + let (unchanged, need_take) = self.add(block, self.thresholds); + if need_take { + let blocks = self.take_blocks(); + latest_flag = self.build_task(&mut tasks, &mut unchanged_blocks, block_idx, blocks); + block_idx += 1; + } + if unchanged { + let blocks = vec![block.clone()]; + latest_flag = self.build_task(&mut tasks, &mut unchanged_blocks, block_idx, blocks); + block_idx += 1; + } + } + + if !self.is_empty() { + let tail = self.take_blocks(); + if self.cluster_key_id.is_some() && latest_flag { + // The clustering table cannot compact different level blocks. + self.build_task(&mut tasks, &mut unchanged_blocks, block_idx, tail); + } else { + let (index, mut blocks) = if latest_flag { + unchanged_blocks + .pop() + .map_or((0, vec![]), |(k, v)| (k, vec![v])) + } else { + tasks.pop_back().unwrap_or((0, vec![])) + }; + + blocks.extend(tail); + tasks.push_back((index, blocks)); + } + } + + let mut removed_segment_indexes = segment_indices; + let segment_idx = removed_segment_indexes.pop().unwrap(); + let mut partitions: Vec = Vec::with_capacity(tasks.len() + 1); + for (block_idx, blocks) in tasks.into_iter() { + partitions.push(Arc::new(Box::new(CompactPartInfo::CompactTaskInfo( + CompactTaskInfo::create(blocks, BlockMetaIndex { + segment_idx, + block_idx, + }), + )))); + } + + partitions.push(Arc::new(Box::new(CompactPartInfo::CompactExtraInfo( + CompactExtraInfo::create( + segment_idx, + unchanged_blocks, + removed_segment_indexes, + removed_segment_summary, + ), + )))); + Ok(partitions) + } } diff --git a/src/query/storages/fuse/src/operations/mutation/compact/compact_aggregator.rs b/src/query/storages/fuse/src/operations/mutation/compact/compact_aggregator.rs deleted file mode 100644 index cb58ff141785b..0000000000000 --- a/src/query/storages/fuse/src/operations/mutation/compact/compact_aggregator.rs +++ /dev/null @@ -1,184 +0,0 @@ -// Copyright 2021 Datafuse Labs -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -use std::collections::BTreeMap; -use std::collections::HashMap; -use std::sync::Arc; -use std::time::Instant; - -use common_catalog::table_context::TableContext; -use common_exception::ErrorCode; -use common_exception::Result; -use common_expression::BlockMetaInfoDowncast; -use common_expression::BlockThresholds; -use common_expression::DataBlock; -use common_expression::TableSchemaRefExt; -use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransform; -use opendal::Operator; -use storages_common_table_meta::meta::BlockMeta; -use storages_common_table_meta::meta::Location; -use storages_common_table_meta::meta::SegmentInfo; -use storages_common_table_meta::meta::Statistics; -use storages_common_table_meta::meta::Versioned; - -use crate::io::SegmentsIO; -use crate::io::SerializedSegment; -use crate::io::TableMetaLocationGenerator; -use crate::operations::common::AbortOperation; -use crate::operations::common::CommitMeta; -use crate::operations::common::ConflictResolveContext; -use crate::operations::common::MutationLogEntry; -use crate::operations::common::MutationLogs; -use crate::operations::common::SnapshotMerged; -use crate::operations::mutation::BlockCompactMutator; -use crate::statistics::reducers::merge_statistics_mut; -use crate::statistics::reducers::reduce_block_metas; - -pub struct CompactAggregator { - ctx: Arc, - dal: Operator, - location_gen: TableMetaLocationGenerator, - thresholds: BlockThresholds, - default_cluster_key_id: Option, - - // locations all the merged segments. - merged_segments: BTreeMap, - // summarised statistics of all the merged segments - merged_statistics: Statistics, - // locations all the merged blocks. - merge_blocks: HashMap>>, - abort_operation: AbortOperation, - - start_time: Instant, - total_tasks: usize, -} - -impl CompactAggregator { - pub fn new( - dal: Operator, - location_gen: TableMetaLocationGenerator, - mutator: BlockCompactMutator, - ) -> Self { - Self { - ctx: mutator.ctx.clone(), - dal, - location_gen, - default_cluster_key_id: mutator.cluster_key_id, - merged_segments: mutator.unchanged_segments_map, - merged_statistics: mutator.unchanged_segment_statistics, - merge_blocks: mutator.unchanged_blocks_map, - thresholds: mutator.thresholds, - abort_operation: AbortOperation::default(), - start_time: Instant::now(), - total_tasks: mutator.compact_tasks.len(), - } - } -} - -#[async_trait::async_trait] -impl AsyncAccumulatingTransform for CompactAggregator { - const NAME: &'static str = "CompactAggregator"; - - #[async_backtrace::framed] - async fn transform(&mut self, data: DataBlock) -> Result> { - // gather the input data. - if let Some(meta) = data.get_owned_meta().and_then(MutationLogs::downcast_from) { - for entry in meta.entries.into_iter() { - match entry { - MutationLogEntry::ReplacedBlock { index, block_meta } => { - self.abort_operation.add_block(&block_meta); - self.merge_blocks - .entry(index.segment_idx) - .and_modify(|v| { - v.insert(index.block_idx, block_meta.clone()); - }) - .or_insert(BTreeMap::from([(index.block_idx, block_meta)])); - - // Refresh status - { - let status = format!( - "compact: run compact tasks:{}/{}, cost:{} sec", - self.abort_operation.blocks.len(), - self.total_tasks, - self.start_time.elapsed().as_secs() - ); - self.ctx.set_status_info(&status); - } - } - _ => return Err(ErrorCode::Internal("It's a bug.")), - } - } - } - // no partial output - Ok(None) - } - - #[async_backtrace::framed] - async fn on_finish(&mut self, _output: bool) -> Result> { - let mut serialized_segments = Vec::with_capacity(self.merge_blocks.len()); - for (segment_idx, block_map) in std::mem::take(&mut self.merge_blocks) { - // generate the new segment. - let blocks: Vec<_> = block_map.into_values().collect(); - let new_summary = - reduce_block_metas(&blocks, self.thresholds, self.default_cluster_key_id); - merge_statistics_mut( - &mut self.merged_statistics, - &new_summary, - self.default_cluster_key_id, - ); - let new_segment = SegmentInfo::new(blocks, new_summary); - let location = self.location_gen.gen_segment_info_location(); - self.abort_operation.add_segment(location.clone()); - self.merged_segments - .insert(segment_idx, (location.clone(), SegmentInfo::VERSION)); - serialized_segments.push(SerializedSegment { - path: location, - segment: Arc::new(new_segment), - }); - } - - let start = Instant::now(); - // Refresh status - { - let status = format!( - "compact: begin to write new segments:{}", - serialized_segments.len() - ); - self.ctx.set_status_info(&status); - } - // write segments, schema in segments_io is useless here. - let segments_io = SegmentsIO::create( - self.ctx.clone(), - self.dal.clone(), - TableSchemaRefExt::create(vec![]), - ); - segments_io.write_segments(serialized_segments).await?; - - // Refresh status - self.ctx.set_status_info(&format!( - "compact: end to write new segments, cost:{} sec", - start.elapsed().as_secs() - )); - // gather the all segments. - let merged_segments = std::mem::take(&mut self.merged_segments) - .into_values() - .collect(); - let ctx = ConflictResolveContext::LatestSnapshotAppendOnly(SnapshotMerged { - merged_segments, - merged_statistics: std::mem::take(&mut self.merged_statistics), - }); - let meta = CommitMeta::new(ctx, std::mem::take(&mut self.abort_operation)); - Ok(Some(DataBlock::empty_with_meta(Box::new(meta)))) - } -} diff --git a/src/query/storages/fuse/src/operations/mutation/compact/compact_part.rs b/src/query/storages/fuse/src/operations/mutation/compact/compact_part.rs index 626e9f8f5455c..0a828527f1fae 100644 --- a/src/query/storages/fuse/src/operations/mutation/compact/compact_part.rs +++ b/src/query/storages/fuse/src/operations/mutation/compact/compact_part.rs @@ -13,6 +13,9 @@ // limitations under the License. use std::any::Any; +use std::collections::hash_map::DefaultHasher; +use std::hash::Hash; +use std::hash::Hasher; use std::sync::Arc; use common_catalog::plan::PartInfo; @@ -20,16 +23,57 @@ use common_catalog::plan::PartInfoPtr; use common_exception::ErrorCode; use common_exception::Result; use storages_common_table_meta::meta::BlockMeta; +use storages_common_table_meta::meta::CompactSegmentInfo; +use storages_common_table_meta::meta::Statistics; use crate::operations::common::BlockMetaIndex; +use crate::operations::mutation::BlockIndex; +use crate::operations::mutation::SegmentIndex; + +#[derive(serde::Serialize, serde::Deserialize, PartialEq, Clone)] +pub struct CompactLazyPartInfo { + pub segment_indices: Vec, + pub compact_segments: Vec>, +} + +#[typetag::serde(name = "compact_lazy")] +impl PartInfo for CompactLazyPartInfo { + fn as_any(&self) -> &dyn Any { + self + } + + fn equals(&self, info: &Box) -> bool { + info.as_any() + .downcast_ref::() + .is_some_and(|other| self == other) + } + + fn hash(&self) -> u64 { + let mut s = DefaultHasher::new(); + self.segment_indices.hash(&mut s); + s.finish() + } +} + +impl CompactLazyPartInfo { + pub fn create( + segment_indices: Vec, + compact_segments: Vec>, + ) -> PartInfoPtr { + Arc::new(Box::new(CompactLazyPartInfo { + segment_indices, + compact_segments, + })) + } +} #[derive(serde::Serialize, serde::Deserialize, PartialEq)] -pub struct CompactPartInfo { - pub blocks: Vec>, - pub index: BlockMetaIndex, +pub enum CompactPartInfo { + CompactExtraInfo(CompactExtraInfo), + CompactTaskInfo(CompactTaskInfo), } -#[typetag::serde(name = "compact")] +#[typetag::serde(name = "compact_part_info")] impl PartInfo for CompactPartInfo { fn as_any(&self) -> &dyn Any { self @@ -42,15 +86,14 @@ impl PartInfo for CompactPartInfo { } fn hash(&self) -> u64 { - 0 + match self { + Self::CompactExtraInfo(extra) => extra.hash(), + Self::CompactTaskInfo(task) => task.hash(), + } } } impl CompactPartInfo { - pub fn create(blocks: Vec>, index: BlockMetaIndex) -> PartInfoPtr { - Arc::new(Box::new(CompactPartInfo { blocks, index })) - } - pub fn from_part(info: &PartInfoPtr) -> Result<&CompactPartInfo> { info.as_any() .downcast_ref::() @@ -59,3 +102,51 @@ impl CompactPartInfo { )) } } + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] +pub struct CompactExtraInfo { + pub segment_index: SegmentIndex, + pub unchanged_blocks: Vec<(BlockIndex, Arc)>, + pub removed_segment_indexes: Vec, + pub removed_segment_summary: Statistics, +} + +impl CompactExtraInfo { + pub fn create( + segment_index: SegmentIndex, + unchanged_blocks: Vec<(BlockIndex, Arc)>, + removed_segment_indexes: Vec, + removed_segment_summary: Statistics, + ) -> Self { + CompactExtraInfo { + segment_index, + unchanged_blocks, + removed_segment_indexes, + removed_segment_summary, + } + } + + fn hash(&self) -> u64 { + let mut s = DefaultHasher::new(); + self.segment_index.hash(&mut s); + s.finish() + } +} + +#[derive(serde::Serialize, serde::Deserialize, PartialEq)] +pub struct CompactTaskInfo { + pub blocks: Vec>, + pub index: BlockMetaIndex, +} + +impl CompactTaskInfo { + pub fn create(blocks: Vec>, index: BlockMetaIndex) -> Self { + CompactTaskInfo { blocks, index } + } + + fn hash(&self) -> u64 { + let mut s = DefaultHasher::new(); + self.blocks[0].location.0.hash(&mut s); + s.finish() + } +} diff --git a/src/query/storages/fuse/src/operations/mutation/compact/compact_source.rs b/src/query/storages/fuse/src/operations/mutation/compact/compact_source.rs index f4452d5f2fd52..d0f70b5e40ddb 100644 --- a/src/query/storages/fuse/src/operations/mutation/compact/compact_source.rs +++ b/src/query/storages/fuse/src/operations/mutation/compact/compact_source.rs @@ -27,8 +27,9 @@ use common_pipeline_core::processors::processor::ProcessorPtr; use crate::io::BlockReader; use crate::io::ReadSettings; use crate::metrics::*; +use crate::operations::mutation::compact::compact_part::CompactPartInfo; use crate::operations::mutation::mutation_meta::ClusterStatsGenType; -use crate::operations::mutation::CompactPartInfo; +use crate::operations::mutation::mutation_meta::SerializeBlock; use crate::operations::mutation::SerializeDataMeta; use crate::operations::BlockMetaIndex; use crate::pipelines::processors::port::OutputPort; @@ -126,7 +127,10 @@ impl Processor for CompactSource { DataBlock::concat(&blocks)? }; - let meta = SerializeDataMeta::create(index, ClusterStatsGenType::Generally); + let meta = Box::new(SerializeDataMeta::SerializeBlock(SerializeBlock::create( + index, + ClusterStatsGenType::Generally, + ))); let new_block = block.add_meta(Some(meta))?; let progress_values = ProgressValues { @@ -152,33 +156,41 @@ impl Processor for CompactSource { // block read tasks. let mut task_futures = Vec::new(); let part = CompactPartInfo::from_part(&part)?; - let mut stats = Vec::with_capacity(part.blocks.len()); - for block in &part.blocks { - stats.push(block.col_stats.clone()); - - let settings = ReadSettings::from_ctx(&self.ctx)?; - // read block in parallel. - task_futures.push(async move { - // Perf - { - metrics_inc_compact_block_read_nums(1); - metrics_inc_compact_block_read_bytes(block.block_size); + match part { + CompactPartInfo::CompactExtraInfo(extra) => { + let meta = Box::new(SerializeDataMeta::CompactExtras(extra.clone())); + let block = DataBlock::empty_with_meta(meta); + self.state = State::Output(self.ctx.get_partition(), block); + } + CompactPartInfo::CompactTaskInfo(task) => { + for block in &task.blocks { + let settings = ReadSettings::from_ctx(&self.ctx)?; + // read block in parallel. + task_futures.push(async move { + // Perf + { + metrics_inc_compact_block_read_nums(1); + metrics_inc_compact_block_read_bytes(block.block_size); + } + + block_reader + .read_by_meta(&settings, block.as_ref(), &storage_format) + .await + }); } - block_reader - .read_by_meta(&settings, block.as_ref(), &storage_format) - .await - }); - } - - let start = Instant::now(); + let start = Instant::now(); - let blocks = futures::future::try_join_all(task_futures).await?; - // Perf. - { - metrics_inc_compact_block_read_milliseconds(start.elapsed().as_millis() as u64); + let blocks = futures::future::try_join_all(task_futures).await?; + // Perf. + { + metrics_inc_compact_block_read_milliseconds( + start.elapsed().as_millis() as u64, + ); + } + self.state = State::Concat(blocks, task.index.clone()); + } } - self.state = State::Concat(blocks, part.index.clone()); Ok(()) } _ => Err(ErrorCode::Internal("It's a bug.")), diff --git a/src/query/storages/fuse/src/operations/mutation/compact/mod.rs b/src/query/storages/fuse/src/operations/mutation/compact/mod.rs index 8807d1ac63c00..f18e8a114f9ea 100644 --- a/src/query/storages/fuse/src/operations/mutation/compact/mod.rs +++ b/src/query/storages/fuse/src/operations/mutation/compact/mod.rs @@ -13,14 +13,15 @@ // limitations under the License. mod block_compact_mutator; -mod compact_aggregator; mod compact_part; mod compact_source; mod segment_compact_mutator; pub use block_compact_mutator::BlockCompactMutator; -pub use compact_aggregator::CompactAggregator; +pub use compact_part::CompactExtraInfo; +pub use compact_part::CompactLazyPartInfo; pub use compact_part::CompactPartInfo; +pub use compact_part::CompactTaskInfo; pub use compact_source::CompactSource; pub use segment_compact_mutator::SegmentCompactMutator; pub use segment_compact_mutator::SegmentCompactionState; diff --git a/src/query/storages/fuse/src/operations/mutation/mod.rs b/src/query/storages/fuse/src/operations/mutation/mod.rs index 21a9dde863162..5af83bf7266a5 100644 --- a/src/query/storages/fuse/src/operations/mutation/mod.rs +++ b/src/query/storages/fuse/src/operations/mutation/mod.rs @@ -20,16 +20,18 @@ mod recluster_aggregator; mod recluster_mutator; pub use compact::BlockCompactMutator; -pub use compact::CompactAggregator; +pub use compact::CompactExtraInfo; +pub use compact::CompactLazyPartInfo; pub use compact::CompactPartInfo; pub use compact::CompactSource; +pub use compact::CompactTaskInfo; pub use compact::SegmentCompactMutator; pub use compact::SegmentCompactionState; pub use compact::SegmentCompactor; pub use mutation_meta::ClusterStatsGenType; pub use mutation_meta::SerializeDataMeta; +pub use mutation_part::DeletedSegmentInfo; pub use mutation_part::Mutation; -pub use mutation_part::MutationDeletedSegment; pub use mutation_part::MutationPartInfo; pub use mutation_source::MutationAction; pub use mutation_source::MutationSource; diff --git a/src/query/storages/fuse/src/operations/mutation/mutation_meta.rs b/src/query/storages/fuse/src/operations/mutation/mutation_meta.rs index 6e85c143e538e..22dbe1b1cb050 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutation_meta.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutation_meta.rs @@ -14,23 +14,17 @@ use common_expression::BlockMetaInfo; use common_expression::BlockMetaInfoDowncast; -use common_expression::BlockMetaInfoPtr; use storages_common_table_meta::meta::ClusterStatistics; use crate::operations::common::BlockMetaIndex; -use crate::operations::mutation::MutationDeletedSegment; - -#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)] -pub enum ClusterStatsGenType { - Generally, - WithOrigin(Option), -} +use crate::operations::mutation::compact::CompactExtraInfo; +use crate::operations::mutation::DeletedSegmentInfo; #[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] -pub struct SerializeDataMeta { - pub index: BlockMetaIndex, - pub stats_type: ClusterStatsGenType, - pub deleted_segment: Option, +pub enum SerializeDataMeta { + SerializeBlock(SerializeBlock), + DeletedSegment(DeletedSegmentInfo), + CompactExtras(CompactExtraInfo), } #[typetag::serde(name = "serialize_data_meta")] @@ -44,22 +38,20 @@ impl BlockMetaInfo for SerializeDataMeta { } } -impl SerializeDataMeta { - pub fn create(index: BlockMetaIndex, stats_type: ClusterStatsGenType) -> BlockMetaInfoPtr { - Box::new(SerializeDataMeta { - index, - stats_type, - deleted_segment: None, - }) - } +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq, Eq)] +pub enum ClusterStatsGenType { + Generally, + WithOrigin(Option), +} + +#[derive(serde::Serialize, serde::Deserialize, Clone, Debug, PartialEq)] +pub struct SerializeBlock { + pub index: BlockMetaIndex, + pub stats_type: ClusterStatsGenType, +} - pub fn create_with_deleted_segment( - deleted_segment: MutationDeletedSegment, - ) -> BlockMetaInfoPtr { - Box::new(SerializeDataMeta { - index: BlockMetaIndex::default(), // default value - stats_type: ClusterStatsGenType::Generally, // default value - deleted_segment: Some(deleted_segment), - }) +impl SerializeBlock { + pub fn create(index: BlockMetaIndex, stats_type: ClusterStatsGenType) -> Self { + SerializeBlock { index, stats_type } } } diff --git a/src/query/storages/fuse/src/operations/mutation/mutation_part.rs b/src/query/storages/fuse/src/operations/mutation/mutation_part.rs index c3f80cfd252b0..269af79dcc2fa 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutation_part.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutation_part.rs @@ -13,6 +13,9 @@ // limitations under the License. use std::any::Any; +use std::collections::hash_map::DefaultHasher; +use std::hash::Hash; +use std::hash::Hasher; use common_catalog::plan::PartInfo; use common_catalog::plan::PartInfoPtr; @@ -20,12 +23,13 @@ use common_exception::ErrorCode; use common_exception::Result; use storages_common_pruner::BlockMetaIndex; use storages_common_table_meta::meta::ClusterStatistics; +use storages_common_table_meta::meta::Statistics; -use crate::pruning::DeletedSegmentInfo; +use crate::operations::mutation::SegmentIndex; #[derive(serde::Serialize, serde::Deserialize, PartialEq)] pub enum Mutation { - MutationDeletedSegment(MutationDeletedSegment), + MutationDeletedSegment(DeletedSegmentInfo), MutationPartInfo(MutationPartInfo), } @@ -36,12 +40,9 @@ impl PartInfo for Mutation { } fn equals(&self, info: &Box) -> bool { - match self { - Self::MutationDeletedSegment(mutation_deleted_segment) => { - mutation_deleted_segment.equals(info) - } - Self::MutationPartInfo(mutation_part_info) => mutation_part_info.equals(info), - } + info.as_any() + .downcast_ref::() + .is_some_and(|other| self == other) } fn hash(&self) -> u64 { @@ -65,30 +66,18 @@ impl Mutation { } #[derive(serde::Serialize, serde::Deserialize, PartialEq, Clone, Debug)] -pub struct MutationDeletedSegment { - pub deleted_segment: DeletedSegmentInfo, +pub struct DeletedSegmentInfo { + /// segment index. + pub index: SegmentIndex, + /// segment statistics. + pub summary: Statistics, } -#[typetag::serde(name = "mutation_delete_segment")] -impl PartInfo for MutationDeletedSegment { - fn as_any(&self) -> &dyn Any { - self - } - - fn equals(&self, info: &Box) -> bool { - info.as_any() - .downcast_ref::() - .is_some_and(|other| self == other) - } - +impl DeletedSegmentInfo { fn hash(&self) -> u64 { - self.deleted_segment.hash() - } -} - -impl MutationDeletedSegment { - pub fn create(deleted_segment: DeletedSegmentInfo) -> Self { - MutationDeletedSegment { deleted_segment } + let mut s = DefaultHasher::new(); + self.index.hash(&mut s); + s.finish() } } @@ -100,35 +89,8 @@ pub struct MutationPartInfo { pub whole_block_mutation: bool, } -#[typetag::serde(name = "mutation")] -impl PartInfo for MutationPartInfo { - fn as_any(&self) -> &dyn Any { - self - } - - fn equals(&self, info: &Box) -> bool { - info.as_any() - .downcast_ref::() - .is_some_and(|other| self == other) - } - +impl MutationPartInfo { fn hash(&self) -> u64 { self.inner_part.hash() } } - -impl MutationPartInfo { - pub fn create( - index: BlockMetaIndex, - cluster_stats: Option, - inner_part: PartInfoPtr, - whole_block_mutation: bool, - ) -> Self { - MutationPartInfo { - index, - cluster_stats, - inner_part, - whole_block_mutation, - } - } -} diff --git a/src/query/storages/fuse/src/operations/mutation/mutation_source.rs b/src/query/storages/fuse/src/operations/mutation/mutation_source.rs index ae7bf2b3cc57d..f5726cb3c5433 100644 --- a/src/query/storages/fuse/src/operations/mutation/mutation_source.rs +++ b/src/query/storages/fuse/src/operations/mutation/mutation_source.rs @@ -35,6 +35,7 @@ use common_expression::ROW_ID_COL_NAME; use common_functions::BUILTIN_FUNCTIONS; use common_sql::evaluator::BlockOperator; +use super::mutation_meta::SerializeBlock; use crate::fuse_part::FusePartInfo; use crate::io::BlockReader; use crate::io::ReadSettings; @@ -239,10 +240,12 @@ impl Processor for MutationSource { MutationAction::Deletion => { if affect_rows == num_rows { // all the rows should be removed. - let meta = SerializeDataMeta::create( - self.index.clone(), - self.stats_type.clone(), - ); + let meta = Box::new(SerializeDataMeta::SerializeBlock( + SerializeBlock::create( + self.index.clone(), + self.stats_type.clone(), + ), + )); self.state = State::Output( self.ctx.get_partition(), DataBlock::empty_with_meta(meta), @@ -329,7 +332,10 @@ impl Processor for MutationSource { .operators .iter() .try_fold(data_block, |input, op| op.execute(&func_ctx, input))?; - let meta = SerializeDataMeta::create(self.index.clone(), self.stats_type.clone()); + let meta = Box::new(SerializeDataMeta::SerializeBlock(SerializeBlock::create( + self.index.clone(), + self.stats_type.clone(), + ))); self.state = State::Output(self.ctx.get_partition(), block.add_meta(Some(meta))?); } _ => return Err(ErrorCode::Internal("It's a bug.")), @@ -345,17 +351,15 @@ impl Processor for MutationSource { match Mutation::from_part(&part)? { Mutation::MutationDeletedSegment(deleted_segment) => { let progress_values = ProgressValues { - rows: deleted_segment.deleted_segment.segment_info.1.row_count as usize, + rows: deleted_segment.summary.row_count as usize, bytes: 0, }; self.ctx.get_write_progress().incr(&progress_values); self.state = State::Output( self.ctx.get_partition(), - DataBlock::empty_with_meta( - SerializeDataMeta::create_with_deleted_segment( - deleted_segment.clone(), - ), - ), + DataBlock::empty_with_meta(Box::new( + SerializeDataMeta::DeletedSegment(deleted_segment.clone()), + )), ) } Mutation::MutationPartInfo(part) => { @@ -380,10 +384,9 @@ impl Processor for MutationSource { bytes: 0, }; self.ctx.get_write_progress().incr(&progress_values); - let meta = SerializeDataMeta::create( - self.index.clone(), - self.stats_type.clone(), - ); + let meta = Box::new(SerializeDataMeta::SerializeBlock( + SerializeBlock::create(self.index.clone(), self.stats_type.clone()), + )); self.state = State::Output( self.ctx.get_partition(), DataBlock::empty_with_meta(meta), diff --git a/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs b/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs index 9bb601456934b..a77f1a50b83dc 100644 --- a/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs +++ b/src/query/storages/fuse/src/operations/mutation/recluster_mutator.rs @@ -32,7 +32,6 @@ use minitrace::future::FutureExt; use minitrace::Span; use storages_common_table_meta::meta::BlockMeta; use storages_common_table_meta::meta::CompactSegmentInfo; -use storages_common_table_meta::meta::SegmentInfo; use storages_common_table_meta::meta::Statistics; use crate::statistics::reducers::merge_statistics_mut; @@ -259,8 +258,8 @@ impl ReclusterMutator { let tasks = std::iter::from_fn(|| { iter.next().map(|v| { async move { - SegmentInfo::try_from(v) - .map_err(|_| ErrorCode::Internal("Failed to convert compact segment info")) + v.block_metas() + .map_err(|_| ErrorCode::Internal("Failed to get block metas")) } .in_span(Span::enter_with_local_parent("try_from_segments")) }) @@ -268,7 +267,7 @@ impl ReclusterMutator { let thread_nums = self.ctx.get_settings().get_max_threads()? as usize; let permit_nums = self.ctx.get_settings().get_max_storage_io_requests()? as usize; - let segments = execute_futures_in_parallel( + let blocks = execute_futures_in_parallel( tasks, thread_nums, permit_nums, @@ -279,15 +278,13 @@ impl ReclusterMutator { .collect::>>()?; let mut blocks_map: BTreeMap>> = BTreeMap::new(); - for segment in segments.into_iter() { - for block in segment.blocks.into_iter() { - match &block.cluster_stats { - Some(stats) if stats.cluster_key_id == self.cluster_key_id => { - blocks_map.entry(stats.level).or_default().push(block) - } - _ => { - return Ok(BTreeMap::new()); - } + for block in blocks.into_iter().flatten() { + match &block.cluster_stats { + Some(stats) if stats.cluster_key_id == self.cluster_key_id => { + blocks_map.entry(stats.level).or_default().push(block) + } + _ => { + return Ok(BTreeMap::new()); } } } diff --git a/src/query/storages/fuse/src/operations/replace.rs b/src/query/storages/fuse/src/operations/replace.rs index a674d80bea4b1..8562743d79bce 100644 --- a/src/query/storages/fuse/src/operations/replace.rs +++ b/src/query/storages/fuse/src/operations/replace.rs @@ -176,12 +176,40 @@ impl FuseTable { }) } - pub fn chain_commit_sink( + pub fn chain_mutation_pipes( &self, ctx: &Arc, pipeline: &mut Pipeline, base_snapshot: Arc, + mutation_kind: MutationKind, + merge_meta: bool, ) -> Result<()> { + let cluster_key_id = self.cluster_key_id(); + pipeline.try_resize(1)?; + if merge_meta { + pipeline.add_transform(|input, output| { + let merger = TransformMergeCommitMeta::create(cluster_key_id); + Ok(ProcessorPtr::create(AccumulatingTransformer::create( + input, output, merger, + ))) + })?; + } else { + pipeline.add_transform(|input, output| { + let base_segments = if matches!(mutation_kind, MutationKind::Compact) { + vec![] + } else { + base_snapshot.segments.clone() + }; + let mutation_aggregator = + TableMutationAggregator::new(self, ctx.clone(), base_segments, mutation_kind); + Ok(ProcessorPtr::create(AsyncAccumulatingTransformer::create( + input, + output, + mutation_aggregator, + ))) + })?; + } + let snapshot_gen = MutationGenerator::new(base_snapshot); pipeline.add_sink(|input| { CommitSink::try_create( @@ -197,31 +225,6 @@ impl FuseTable { }) } - pub fn chain_mutation_pipes( - &self, - ctx: &Arc, - pipeline: &mut Pipeline, - base_snapshot: Arc, - mutation_kind: MutationKind, - ) -> Result<()> { - self.chain_mutation_aggregator(ctx, pipeline, base_snapshot.clone(), mutation_kind)?; - self.chain_commit_sink(ctx, pipeline, base_snapshot) - } - - pub fn chain_commit_meta_merger( - &self, - pipeline: &mut Pipeline, - default_cluster_key_id: Option, - ) -> Result<()> { - pipeline.try_resize(1)?; - pipeline.add_transform(|input, output| { - let merger = TransformMergeCommitMeta::create(default_cluster_key_id); - Ok(ProcessorPtr::create(AccumulatingTransformer::create( - input, output, merger, - ))) - }) - } - // choose the bloom filter columns (from on-conflict fields). // columns with larger number of number-of-distinct-values, will be kept, is their types // are supported by bloom index. diff --git a/src/query/storages/fuse/src/operations/replace_into/processors/transform_merge_into_mutation_aggregator.rs b/src/query/storages/fuse/src/operations/replace_into/processors/transform_merge_into_mutation_aggregator.rs index 0b3f0b24116fa..b057482104ee9 100644 --- a/src/query/storages/fuse/src/operations/replace_into/processors/transform_merge_into_mutation_aggregator.rs +++ b/src/query/storages/fuse/src/operations/replace_into/processors/transform_merge_into_mutation_aggregator.rs @@ -18,7 +18,7 @@ use common_pipeline_core::pipe::PipeItem; use common_pipeline_core::processors::port::InputPort; use common_pipeline_core::processors::port::OutputPort; use common_pipeline_core::processors::processor::ProcessorPtr; -use common_pipeline_transforms::processors::transforms::transform_accumulating_async::AsyncAccumulatingTransform; +use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransform; use common_pipeline_transforms::processors::transforms::AsyncAccumulatingTransformer; use crate::operations::replace_into::meta::merge_into_operation_meta::MergeIntoOperation; diff --git a/src/query/storages/fuse/src/operations/update.rs b/src/query/storages/fuse/src/operations/update.rs index 729e54a34c370..4a740018cee5e 100644 --- a/src/query/storages/fuse/src/operations/update.rs +++ b/src/query/storages/fuse/src/operations/update.rs @@ -110,7 +110,7 @@ impl FuseTable { proc.into_processor() })?; - self.chain_mutation_pipes(&ctx, pipeline, snapshot, MutationKind::Update) + self.chain_mutation_pipes(&ctx, pipeline, snapshot, MutationKind::Update, false) } #[async_backtrace::framed] diff --git a/src/query/storages/fuse/src/pruning/fuse_pruner.rs b/src/query/storages/fuse/src/pruning/fuse_pruner.rs index efd3408c57f41..aacaca3b1689f 100644 --- a/src/query/storages/fuse/src/pruning/fuse_pruner.rs +++ b/src/query/storages/fuse/src/pruning/fuse_pruner.rs @@ -12,9 +12,6 @@ // See the License for the specific language governing permissions and // limitations under the License. -use std::collections::hash_map::DefaultHasher; -use std::hash::Hash; -use std::hash::Hasher; use std::sync::Arc; use common_base::base::tokio::sync::Semaphore; @@ -45,10 +42,9 @@ use storages_common_pruner::TopNPrunner; use storages_common_table_meta::meta::BlockMeta; use storages_common_table_meta::meta::ClusterKey; use storages_common_table_meta::meta::ColumnStatistics; -use storages_common_table_meta::meta::Location; -use storages_common_table_meta::meta::Statistics; use storages_common_table_meta::meta::StatisticsOfColumns; +use crate::operations::DeletedSegmentInfo; use crate::pruning::segment_pruner::SegmentPruner; use crate::pruning::BlockPruner; use crate::pruning::BloomPruner; @@ -173,23 +169,6 @@ impl PruningContext { } } -#[derive(serde::Serialize, serde::Deserialize, PartialEq, Clone, Debug)] -pub struct DeletedSegmentInfo { - // segment index. - pub index: usize, - // deleted segment location and summary. - // location is used for hash - pub segment_info: (Location, Statistics), -} - -impl DeletedSegmentInfo { - pub fn hash(&self) -> u64 { - let mut s = DefaultHasher::new(); - self.segment_info.0.hash(&mut s); - s.finish() - } -} - pub struct FusePruner { max_concurrency: usize, pub table_schema: TableSchemaRef, @@ -334,10 +313,7 @@ impl FusePruner { { deleted_segments.push(DeletedSegmentInfo { index: segment_location.segment_idx, - segment_info: ( - segment_location.location.clone(), - compact_segment_info.summary.clone(), - ), + summary: compact_segment_info.summary.clone(), }) } else { res.extend( diff --git a/src/query/storages/fuse/src/pruning/mod.rs b/src/query/storages/fuse/src/pruning/mod.rs index 792bdea2f4475..ea0e7b274ae4d 100644 --- a/src/query/storages/fuse/src/pruning/mod.rs +++ b/src/query/storages/fuse/src/pruning/mod.rs @@ -22,7 +22,6 @@ mod segment_pruner; pub use block_pruner::BlockPruner; pub use bloom_pruner::BloomPruner; pub use bloom_pruner::BloomPrunerCreator; -pub use fuse_pruner::DeletedSegmentInfo; pub use fuse_pruner::FusePruner; pub use fuse_pruner::PruningContext; pub use pruner_location::create_segment_location_vector; diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table b/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table index 461296e1c2e5c..c238f4d509816 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0008_fuse_optimize_table @@ -521,6 +521,12 @@ insert into t9 values(-3) statement ok optimize table t9 compact +query II +select segment_count, block_count from fuse_snapshot('db_09_0008', 't9') limit 2 +---- +1 2 +2 2 + statement ok insert into t9 values(6) @@ -528,11 +534,10 @@ statement ok optimize table t9 compact query II -select segment_count, block_count from fuse_snapshot('db_09_0008', 't9') limit 3 +select segment_count, block_count from fuse_snapshot('db_09_0008', 't9') limit 2 ---- 1 2 -3 3 -2 2 +2 3 query I select a from t9 order by a diff --git a/tests/sqllogictests/suites/base/09_fuse_engine/09_0016_remote_alter_recluster b/tests/sqllogictests/suites/base/09_fuse_engine/09_0016_remote_alter_recluster index 7afc95d64e7ed..367af1a8242c3 100644 --- a/tests/sqllogictests/suites/base/09_fuse_engine/09_0016_remote_alter_recluster +++ b/tests/sqllogictests/suites/base/09_fuse_engine/09_0016_remote_alter_recluster @@ -43,7 +43,7 @@ select * from t1 order by a # Fix dead cycles in PR #11762 (https://github.com/datafuselabs/databend/pull/11762). statement ok -create table t2(a int) cluster by(a+1) +create table t2(a int) cluster by(a+1) row_per_block=3 statement ok insert into t2 values(1),(3) diff --git a/tests/sqllogictests/suites/mode/cluster/distributed_compact.sql b/tests/sqllogictests/suites/mode/cluster/distributed_compact.sql new file mode 100644 index 0000000000000..6632148666627 --- /dev/null +++ b/tests/sqllogictests/suites/mode/cluster/distributed_compact.sql @@ -0,0 +1,122 @@ +statement ok +drop table if exists t_compact_0 + +statement ok +create table t_compact_0 (a int not null) row_per_block=5 block_per_segment=5 + +statement ok +insert into t_compact_0 select 50 - number from numbers(100) + +statement ok +insert into t_compact_0 select 50 - number from numbers(100) + +statement ok +insert into t_compact_0 select 50 - number from numbers(100) + +query II +select count(),sum(a) from t_compact_0 +---- +300 150 + +statement ok +alter table t_compact_0 set options(row_per_block=10,block_per_segment=10) + +# lazy compact +# The number of compact segments task is greater than the number of cluster nodes, +# so will build compact blocks task during pipeline init. +# The explain pipeline optimize contain bug (ISSUE-12597), so comment. +# query T +# explain pipeline optimize table t_compact_0 compact +# ---- +# CommitSink × 1 processor +# TransformMergeCommitMeta × 1 processor +# TransformExchangeDeserializer × 1 processor +# Merge (DummyTransform × 3 processors) to (TransformExchangeDeserializer × 1) +# Merge (MutationAggregator × 1 processor) to (Resize × 3) +# MutationAggregator × 1 processor +# Merge (TransformSerializeBlock × 6 processors) to (MutationAggregator × 1) +# TransformSerializeBlock × 6 processors +# CompactSource × 6 processors + +statement ok +optimize table t_compact_0 compact + +query I +select count() from fuse_snapshot('default', 't_compact_0') +---- +4 + +query II +select count(),sum(a) from t_compact_0 +---- +300 150 + +statement ok +alter table t_compact_0 cluster by(abs(a)) + +# test compact and recluster +statement ok +optimize table t_compact_0 compact + +query I +select count() from fuse_snapshot('default', 't_compact_0') +---- +7 + +query II +select count(),sum(a) from t_compact_0 +---- +300 150 + +query I +select row_count from fuse_snapshot('default', 't_compact_0') limit 1 +---- +300 + +statement ok +create table t_compact_1 (a int not null) row_per_block=5 block_per_segment=5 + +statement ok +insert into t_compact_1 select 100 - number from numbers(150) + +query II +select count(),sum(a) from t_compact_1 +---- +150 3825 + +statement ok +alter table t_compact_1 set options(row_per_block=10,block_per_segment=15) + +# nolazy compact +# The number of compact segments task is less than the number of cluster nodes, +# so will build compact blocks task before execute pipeline +# query T +# explain pipeline optimize table t_compact_1 compact +# ---- +# CommitSink × 1 processor +# MutationAggregator × 1 processor +# Merge (TransformExchangeDeserializer × 2 processors) to (MutationAggregator × 1) +# TransformExchangeDeserializer × 6 processors +# Merge (DummyTransform × 8 processors) to (TransformExchangeDeserializer × 6) +# Merge (TransformSerializeBlock × 6 processors) to (Resize × 8) +# TransformSerializeBlock × 6 processors +# CompactSource × 6 processors + +statement ok +optimize table t_compact_1 compact + +query I +select count() from fuse_snapshot('default', 't_compact_1') +---- +2 + +query II +select count(),sum(a) from t_compact_1 +---- +150 3825 + +statement ok +drop table if exists t_compact_0 + +statement ok +drop table if exists t_compact_1