Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: auto optimize table during execution of replace into statement #12100

Merged
merged 4 commits into from
Jul 17, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
68 changes: 68 additions & 0 deletions src/query/service/src/interpreters/interpreter_replace.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,24 +14,31 @@

use std::sync::Arc;

use common_base::runtime::GlobalIORuntime;
use common_catalog::table_context::TableContext;
use common_exception::ErrorCode;
use common_exception::Result;
use common_expression::DataSchema;
use common_expression::DataSchemaRef;
use common_pipeline_sources::AsyncSourcer;
use common_sql::plans::InsertInputSource;
use common_sql::plans::OptimizeTableAction;
use common_sql::plans::OptimizeTablePlan;
use common_sql::plans::Plan;
use common_sql::plans::Replace;
use common_sql::NameResolutionContext;
use tracing::info;

use crate::interpreters::common::check_deduplicate_label;
use crate::interpreters::interpreter_copy::CopyInterpreter;
use crate::interpreters::interpreter_insert::ValueSource;
use crate::interpreters::Interpreter;
use crate::interpreters::InterpreterPtr;
use crate::interpreters::OptimizeTableInterpreter;
use crate::interpreters::SelectInterpreter;
use crate::pipelines::builders::build_fill_missing_columns_pipeline;
use crate::pipelines::executor::ExecutorSettings;
use crate::pipelines::executor::PipelineCompleteExecutor;
use crate::pipelines::processors::TransformCastSchema;
use crate::pipelines::PipelineBuildResult;
use crate::sessions::QueryContext;
Expand Down Expand Up @@ -68,6 +75,8 @@ impl Interpreter for ReplaceInterpreter {
.get_table(&plan.catalog, &plan.database, &plan.table)
.await?;

let has_cluster_key = !table.cluster_keys(self.ctx.clone()).is_empty();

let mut pipeline = self
.connect_input_source(self.ctx.clone(), &self.plan.source, self.plan.schema())
.await?;
Expand All @@ -91,6 +100,65 @@ impl Interpreter for ReplaceInterpreter {
on_conflict_fields,
)
.await?;

if !pipeline.main_pipeline.is_empty()
&& has_cluster_key
&& self.ctx.get_settings().get_enable_auto_reclustering()?
{
let ctx = self.ctx.clone();
let catalog = self.plan.catalog.clone();
let database = self.plan.database.to_string();
let table = self.plan.table.to_string();
pipeline.main_pipeline.set_on_finished(|err| {
if err.is_none() {
info!("execute replace into finished successfully. running table optimization job.");
match GlobalIORuntime::instance().block_on({
async move {
ctx.evict_table_from_cache(&catalog, &database, &table)?;
let optimize_interpreter = OptimizeTableInterpreter::try_create(ctx.clone(),
OptimizeTablePlan {
catalog,
database,
table,
action: OptimizeTableAction::CompactBlocks,
limit: None,
}
)?;

let mut build_res = optimize_interpreter.execute2().await?;

if build_res.main_pipeline.is_empty() {
return Ok(());
}

let settings = ctx.get_settings();
let query_id = ctx.get_id();
build_res.set_max_threads(settings.get_max_threads()? as usize);
let settings = ExecutorSettings::try_create(&settings, query_id)?;

if build_res.main_pipeline.is_complete_pipeline()? {
let mut pipelines = build_res.sources_pipelines;
pipelines.push(build_res.main_pipeline);

let complete_executor = PipelineCompleteExecutor::from_pipelines(pipelines, settings)?;

ctx.set_executor(complete_executor.get_inner())?;
complete_executor.execute()?;
}
Ok(())
}
}) {
Ok(_) => {
info!("execute replace into finished successfully. table optimization job finished.");
}
Err(e) => { info!("execute replace into finished successfully. table optimization job failed. {:?}", e)}
}

return Ok(());
}
Ok(())
});
}
Ok(pipeline)
}
}
Expand Down
57 changes: 51 additions & 6 deletions src/query/service/src/interpreters/interpreter_table_optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use std::time::SystemTime;

use common_base::runtime::GlobalIORuntime;
use common_catalog::table::CompactTarget;
use common_catalog::table::Table;
use common_catalog::table::TableExt;
use common_exception::ErrorCode;
use common_exception::Result;
Expand Down Expand Up @@ -71,6 +72,44 @@ impl Interpreter for OptimizeTableInterpreter {
}

impl OptimizeTableInterpreter {
pub async fn build_compact_pipeline(
ctx: &Arc<QueryContext>,
mut table: Arc<dyn Table>,
target: CompactTarget,
) -> Result<PipelineBuildResult> {
let need_recluster = !table.cluster_keys(ctx.clone()).is_empty();
let mut pipeline = Pipeline::create();
table
.compact(ctx.clone(), target, None, &mut pipeline)
.await?;

let mut build_res = PipelineBuildResult::create();
let settings = ctx.get_settings();
if need_recluster {
if !pipeline.is_empty() {
pipeline.set_max_threads(settings.get_max_threads()? as usize);

let query_id = ctx.get_id();
let executor_settings = ExecutorSettings::try_create(&settings, query_id)?;
let executor = PipelineCompleteExecutor::try_create(pipeline, executor_settings)?;

ctx.set_executor(executor.get_inner())?;
executor.execute()?;

// refresh table.
table = table.as_ref().refresh(ctx.as_ref()).await?;
}

table
.recluster(ctx.clone(), None, None, &mut build_res.main_pipeline)
.await?;
} else {
build_res.main_pipeline = pipeline;
}

Ok(build_res)
}

async fn build_pipeline(
&self,
target: CompactTarget,
Expand All @@ -94,21 +133,27 @@ impl OptimizeTableInterpreter {
)));
}

let mut pipeline = Pipeline::create();
let mut compact_pipeline = Pipeline::create();
table
.compact(self.ctx.clone(), target, self.plan.limit, &mut pipeline)
.compact(
self.ctx.clone(),
target,
self.plan.limit,
&mut compact_pipeline,
)
.await?;

let mut build_res = PipelineBuildResult::create();
let settings = self.ctx.get_settings();
let mut reclustered_block_count = 0;
if need_recluster {
if !pipeline.is_empty() {
pipeline.set_max_threads(settings.get_max_threads()? as usize);
if !compact_pipeline.is_empty() {
compact_pipeline.set_max_threads(settings.get_max_threads()? as usize);

let query_id = self.ctx.get_id();
let executor_settings = ExecutorSettings::try_create(&settings, query_id)?;
let executor = PipelineCompleteExecutor::try_create(pipeline, executor_settings)?;
let executor =
PipelineCompleteExecutor::try_create(compact_pipeline, executor_settings)?;

self.ctx.set_executor(executor.get_inner())?;
executor.execute()?;
Expand All @@ -126,7 +171,7 @@ impl OptimizeTableInterpreter {
)
.await?;
} else {
build_res.main_pipeline = pipeline;
build_res.main_pipeline = compact_pipeline;
}

let ctx = self.ctx.clone();
Expand Down
4 changes: 4 additions & 0 deletions src/query/service/src/sessions/query_ctx.rs
Original file line number Diff line number Diff line change
Expand Up @@ -240,6 +240,10 @@ impl QueryContext {
pub fn get_created_time(&self) -> SystemTime {
self.shared.created_time
}

pub fn evict_table_from_cache(&self, catalog: &str, database: &str, table: &str) -> Result<()> {
self.shared.evict_table_from_cache(catalog, database, table)
}
}

#[async_trait::async_trait]
Expand Down
7 changes: 7 additions & 0 deletions src/query/service/src/sessions/query_ctx_shared.rs
Original file line number Diff line number Diff line change
Expand Up @@ -278,6 +278,13 @@ impl QueryContextShared {
}
}

pub fn evict_table_from_cache(&self, catalog: &str, database: &str, table: &str) -> Result<()> {
let table_meta_key = (catalog.to_string(), database.to_string(), table.to_string());
let mut tables_refs = self.tables_refs.lock();
tables_refs.remove(&table_meta_key);
Ok(())
}

/// Init runtime when first get
pub fn try_get_runtime(&self) -> Result<Arc<Runtime>> {
let mut query_runtime = self.runtime.write();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ DB.Table: 'system'.'settings', Table: settings-table_id:1, ver:0, Engine: System
| 'collation' | 'binary' | 'binary' | 'SESSION' | 'Sets the character collation. Available values include "binary" and "utf8".' | 'String' |
| 'efficiently_memory_group_by' | '0' | '0' | 'SESSION' | 'Memory is used efficiently, but this may cause performance degradation.' | 'UInt64' |
| 'enable_aggregating_index_scan' | '1' | '1' | 'SESSION' | 'Enable scanning aggregating index data while querying.' | 'UInt64' |
| 'enable_auto_reclustering' | '1' | '1' | 'SESSION' | 'Enables auto re-clustering.' | 'UInt64' |
| 'enable_bushy_join' | '0' | '0' | 'SESSION' | 'Enables generating a bushy join plan with the optimizer.' | 'UInt64' |
| 'enable_cbo' | '1' | '1' | 'SESSION' | 'Enables cost-based optimization.' | 'UInt64' |
| 'enable_distributed_copy_into' | '0' | '0' | 'SESSION' | 'Enable distributed execution of copy into.' | 'UInt64' |
Expand Down
6 changes: 6 additions & 0 deletions src/query/settings/src/settings_default.rs
Original file line number Diff line number Diff line change
Expand Up @@ -328,6 +328,12 @@ impl DefaultSettings {
possible_values: None,
display_in_show_settings: true,
}),
("enable_auto_reclustering", DefaultSettingValue {
value: UserSettingValue::UInt64(1),
desc: "Enables auto re-clustering.",
possible_values: None,
display_in_show_settings: true,
}),
]);

Ok(Arc::new(DefaultSettings {
Expand Down
8 changes: 8 additions & 0 deletions src/query/settings/src/settings_getter_setter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -386,4 +386,12 @@ impl Settings {
pub fn set_enable_aggregating_index_scan(&self, val: bool) -> Result<()> {
self.try_set_u64("enable_aggregating_index_scan", u64::from(val))
}

pub fn get_enable_auto_reclustering(&self) -> Result<bool> {
Ok(self.try_get_u64("enable_auto_reclustering")? != 0)
}

pub fn set_enable_auto_reclustering(&self, val: bool) -> Result<()> {
self.try_set_u64("enable_auto_reclustering", u64::from(val))
}
}
1 change: 1 addition & 0 deletions src/query/storages/fuse/src/io/write/block_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,7 @@ pub struct BloomIndexState {
pub(crate) data: Vec<u8>,
pub(crate) size: u64,
pub(crate) location: Location,
#[allow(dead_code)]
pub(crate) column_distinct_count: HashMap<FieldIndex, usize>,
}

Expand Down
31 changes: 31 additions & 0 deletions src/query/storages/fuse/src/metrics/fuse_metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -197,3 +197,34 @@ pub fn metrics_inc_deletion_block_range_pruned_whole_block_nums(c: u64) {
c as f64
);
}

pub fn metrics_inc_replace_block_number_after_pruning(c: u64) {
increment_gauge!(key!("replace_into_block_number_after_pruning"), c as f64);
}

pub fn metrics_inc_replace_row_number_after_pruning(c: u64) {
increment_gauge!(key!("replace_into_row_number_after_pruning"), c as f64);
}

pub fn metrics_inc_replace_block_number_totally_loaded(c: u64) {
increment_gauge!(key!("replace_into_block_number_totally_loaded"), c as f64);
}

pub fn metrics_inc_replace_row_number_write(c: u64) {
increment_gauge!(key!("replace_into_row_number_write"), c as f64);
}
pub fn metrics_inc_replace_block_number_write(c: u64) {
increment_gauge!(key!("replace_into_block_number_write"), c as f64);
}

pub fn metrics_inc_replace_row_number_totally_loaded(c: u64) {
increment_gauge!(key!("replace_into_row_number_totally_loaded"), c as f64);
}

pub fn metrics_inc_replace_whole_block_deletion(c: u64) {
increment_gauge!(key!("replace_into_whole_block_deletion"), c as f64);
}

pub fn metrics_inc_replace_block_of_zero_row_deleted(c: u64) {
increment_gauge!(key!("replace_into_block_of_zero_row_deleted"), c as f64);
}
Loading