Skip to content

Commit

Permalink
feat: implement CloseRegionHandler (#1569)
Browse files Browse the repository at this point in the history
* feat: implement CloseRegionHandler

* feat: register heartbeat response handlers

* test: add tests for heartbeat response handlers

* fix: drop table does not release regions

* chore: apply suggestion from CR

* fix: fix close region issue

* chore: apply suggestion from CR

* chore: apply suggestion from CR

* chore: apply suggestion from CR

* chore: apply suggestion from CR

* chore: apply suggestion from CR

* chore: apply suggestion from CR

* chore: apply suggestion from CR

* chore: modify method name and add log

* refactor: refactor HeartbeatResponseHandler

* chore: apply suggestion from CR

* refactor: remove close method from Region trait

* chore: apply suggestion from CR

* chore: remove PartialEq from CloseTableResult

* chore: apply suggestion from CR
  • Loading branch information
WenyXu authored May 23, 2023
1 parent 7c55783 commit 3dc45f1
Show file tree
Hide file tree
Showing 10 changed files with 589 additions and 42 deletions.
33 changes: 32 additions & 1 deletion src/datanode/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,13 @@ pub enum Error {
source: catalog::error::Error,
},

#[snafu(display("Failed to deregister table: {}, source: {}", table_name, source))]
DeregisterTable {
table_name: String,
#[snafu(backtrace)]
source: catalog::error::Error,
},

#[snafu(display("Failed to open table: {}, source: {}", table_name, source))]
OpenTable {
table_name: String,
Expand All @@ -53,6 +60,19 @@ pub enum Error {
source: catalog::error::Error,
},

#[snafu(display(
"Failed to close regions {:?} in table {}, source: {}",
region_numbers,
table_name,
source
))]
CloseTable {
table_name: String,
region_numbers: Vec<RegionNumber>,
#[snafu(backtrace)]
source: TableError,
},

#[snafu(display("Failed to send message: {err_msg}"))]
SendMessage { err_msg: String, location: Location },

Expand Down Expand Up @@ -108,6 +128,13 @@ pub enum Error {
source: TableError,
},

#[snafu(display("Failed to get table: {}, source: {}", table_name, source))]
GetTable {
table_name: String,
#[snafu(backtrace)]
source: TableError,
},

#[snafu(display("Failed to drop table {}, source: {}", table_name, source))]
DropTable {
table_name: String,
Expand Down Expand Up @@ -446,14 +473,18 @@ impl ErrorExt for Error {
| ExecuteLogicalPlan { source } => source.status_code(),

OpenTable { source, .. } => source.status_code(),
RegisterTable { source, .. } | AccessCatalog { source, .. } => source.status_code(),
RegisterTable { source, .. }
| DeregisterTable { source, .. }
| AccessCatalog { source, .. } => source.status_code(),

DecodeLogicalPlan { source } => source.status_code(),
NewCatalog { source } | RegisterSchema { source } => source.status_code(),
FindTable { source, .. } => source.status_code(),
CreateTable { source, .. } | CheckRegion { source, .. } => source.status_code(),
DropTable { source, .. } => source.status_code(),
FlushTable { source, .. } => source.status_code(),
GetTable { source, .. } => source.status_code(),
CloseTable { source, .. } => source.status_code(),

Insert { source, .. } => source.status_code(),
Delete { source, .. } => source.status_code(),
Expand Down
40 changes: 23 additions & 17 deletions src/datanode/src/heartbeat/handler.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use common_telemetry::error;
use crate::error::Result;
use crate::heartbeat::mailbox::{IncomingMessage, MailboxRef};

pub mod close_region;
pub mod open_region;
pub mod parse_mailbox_message;
#[cfg(test)]
Expand All @@ -32,7 +33,15 @@ pub struct HeartbeatResponseHandlerContext {
pub mailbox: MailboxRef,
pub response: HeartbeatResponse,
pub incoming_message: Option<IncomingMessage>,
is_skip_all: bool,
}

/// HandleControl
///
/// Controls process of handling heartbeat response.
#[derive(PartialEq)]
pub enum HandleControl {
Continue,
Done,
}

impl HeartbeatResponseHandlerContext {
Expand All @@ -41,23 +50,19 @@ impl HeartbeatResponseHandlerContext {
mailbox,
response,
incoming_message: None,
is_skip_all: false,
}
}

pub fn is_skip_all(&self) -> bool {
self.is_skip_all
}

pub fn finish(&mut self) {
self.is_skip_all = true
}
}

/// HeartbeatResponseHandler
///
/// [`HeartbeatResponseHandler::is_acceptable`] returns true if handler can handle incoming [`HeartbeatResponseHandlerContext`].
///
/// [`HeartbeatResponseHandler::handle`] handles all or part of incoming [`HeartbeatResponseHandlerContext`].
pub trait HeartbeatResponseHandler: Send + Sync {
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool;

fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result<()>;
fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result<HandleControl>;
}

pub trait HeartbeatResponseHandlerExecutor: Send + Sync {
Expand All @@ -77,16 +82,17 @@ impl HandlerGroupExecutor {
impl HeartbeatResponseHandlerExecutor for HandlerGroupExecutor {
fn handle(&self, mut ctx: HeartbeatResponseHandlerContext) -> Result<()> {
for handler in &self.handlers {
if ctx.is_skip_all() {
break;
}

if !handler.is_acceptable(&ctx) {
continue;
}

if let Err(e) = handler.handle(&mut ctx) {
error!(e;"Error while handling: {:?}", ctx.response);
match handler.handle(&mut ctx) {
Ok(HandleControl::Done) => break,
Ok(HandleControl::Continue) => {}
Err(e) => {
error!(e;"Error while handling: {:?}", ctx.response);
break;
}
}
}
Ok(())
Expand Down
227 changes: 227 additions & 0 deletions src/datanode/src/heartbeat/handler/close_region.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,227 @@
// Copyright 2023 Greptime Team
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use std::sync::Arc;

use catalog::{CatalogManagerRef, DeregisterTableRequest};
use common_catalog::format_full_table_name;
use common_meta::instruction::{Instruction, InstructionReply, RegionIdent, SimpleReply};
use common_telemetry::{error, warn};
use log::info;
use snafu::ResultExt;
use store_api::storage::RegionNumber;
use table::engine::manager::TableEngineManagerRef;
use table::engine::{CloseTableResult, EngineContext, TableReference};
use table::requests::CloseTableRequest;

use crate::error::{self, Result};
use crate::heartbeat::handler::{HandleControl, HeartbeatResponseHandler};
use crate::heartbeat::HeartbeatResponseHandlerContext;

#[derive(Clone)]
pub struct CloseRegionHandler {
catalog_manager: CatalogManagerRef,
table_engine_manager: TableEngineManagerRef,
}

impl HeartbeatResponseHandler for CloseRegionHandler {
fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool {
matches!(
ctx.incoming_message.as_ref(),
Some((_, Instruction::CloseRegion { .. }))
)
}

fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result<HandleControl> {
let Some((meta, Instruction::CloseRegion(region_ident))) = ctx.incoming_message.take() else {
unreachable!("CloseRegionHandler: should be guarded by 'is_acceptable'");
};

let mailbox = ctx.mailbox.clone();
let self_ref = Arc::new(self.clone());

let RegionIdent {
engine,
catalog,
schema,
table,
region_number,
..
} = region_ident;

common_runtime::spawn_bg(async move {
let result = self_ref
.close_region_inner(
engine,
&TableReference::full(&catalog, &schema, &table),
vec![region_number],
)
.await;

if let Err(e) = mailbox
.send((meta, CloseRegionHandler::map_result(result)))
.await
{
error!(e;"Failed to send reply to mailbox");
}
});

Ok(HandleControl::Done)
}
}

impl CloseRegionHandler {
pub fn new(
catalog_manager: CatalogManagerRef,
table_engine_manager: TableEngineManagerRef,
) -> Self {
Self {
catalog_manager,
table_engine_manager,
}
}

fn map_result(result: Result<bool>) -> InstructionReply {
result.map_or_else(
|error| {
InstructionReply::CloseRegion(SimpleReply {
result: false,
error: Some(error.to_string()),
})
},
|result| {
InstructionReply::CloseRegion(SimpleReply {
result,
error: None,
})
},
)
}

/// Returns true if a table or target regions have been closed.
async fn regions_closed(
&self,
catalog_name: &str,
schema_name: &str,
table_name: &str,
region_numbers: &[RegionNumber],
) -> Result<bool> {
if let Some(table) = self
.catalog_manager
.table(catalog_name, schema_name, table_name)
.await
.context(error::AccessCatalogSnafu)?
{
for r in region_numbers {
let region_exist =
table
.contains_region(*r)
.with_context(|_| error::CheckRegionSnafu {
table_name: format_full_table_name(
catalog_name,
schema_name,
table_name,
),
region_number: *r,
})?;
if region_exist {
return Ok(false);
}
}
}
// Returns true if table not exist
Ok(true)
}

async fn close_region_inner(
&self,
engine: String,
table_ref: &TableReference<'_>,
region_numbers: Vec<RegionNumber>,
) -> Result<bool> {
let engine =
self.table_engine_manager
.engine(&engine)
.context(error::TableEngineNotFoundSnafu {
engine_name: &engine,
})?;
let ctx = EngineContext::default();

if self
.regions_closed(
table_ref.catalog,
table_ref.schema,
table_ref.table,
&region_numbers,
)
.await?
{
return Ok(true);
}

if engine
.get_table(&ctx, table_ref)
.with_context(|_| error::GetTableSnafu {
table_name: table_ref.to_string(),
})?
.is_some()
{
return match engine
.close_table(
&ctx,
CloseTableRequest {
catalog_name: table_ref.catalog.to_string(),
schema_name: table_ref.schema.to_string(),
table_name: table_ref.table.to_string(),
region_numbers: region_numbers.clone(),
},
)
.await
.with_context(|_| error::CloseTableSnafu {
table_name: table_ref.to_string(),
region_numbers: region_numbers.clone(),
})? {
CloseTableResult::NotFound | CloseTableResult::Released(_) => {
// Deregister table if The table released.
self.deregister_table(table_ref).await
}
CloseTableResult::PartialClosed(regions) => {
// Requires caller to update the region_numbers
info!(
"Close partial regions: {:?} in table: {}",
regions, table_ref
);
Ok(true)
}
};
}

warn!("Trying to close a non-existing table: {}", table_ref);
// Table doesn't exist
Ok(true)
}

async fn deregister_table(&self, table_ref: &TableReference<'_>) -> Result<bool> {
self.catalog_manager
.deregister_table(DeregisterTableRequest {
catalog: table_ref.catalog.to_string(),
schema: table_ref.schema.to_string(),
table_name: table_ref.table.to_string(),
})
.await
.with_context(|_| error::DeregisterTableSnafu {
table_name: table_ref.to_string(),
})
}
}
Loading

0 comments on commit 3dc45f1

Please sign in to comment.