diff --git a/src/datanode/src/error.rs b/src/datanode/src/error.rs index 47eef74ffb82..39c4983d053f 100644 --- a/src/datanode/src/error.rs +++ b/src/datanode/src/error.rs @@ -39,6 +39,13 @@ pub enum Error { source: catalog::error::Error, }, + #[snafu(display("Failed to deregister table: {}, source: {}", table_name, source))] + DeregisterTable { + table_name: String, + #[snafu(backtrace)] + source: catalog::error::Error, + }, + #[snafu(display("Failed to open table: {}, source: {}", table_name, source))] OpenTable { table_name: String, @@ -53,6 +60,19 @@ pub enum Error { source: catalog::error::Error, }, + #[snafu(display( + "Failed to close regions {:?} in table {}, source: {}", + region_numbers, + table_name, + source + ))] + CloseTable { + table_name: String, + region_numbers: Vec, + #[snafu(backtrace)] + source: TableError, + }, + #[snafu(display("Failed to send message: {err_msg}"))] SendMessage { err_msg: String, location: Location }, @@ -108,6 +128,13 @@ pub enum Error { source: TableError, }, + #[snafu(display("Failed to get table: {}, source: {}", table_name, source))] + GetTable { + table_name: String, + #[snafu(backtrace)] + source: TableError, + }, + #[snafu(display("Failed to drop table {}, source: {}", table_name, source))] DropTable { table_name: String, @@ -446,7 +473,9 @@ impl ErrorExt for Error { | ExecuteLogicalPlan { source } => source.status_code(), OpenTable { source, .. } => source.status_code(), - RegisterTable { source, .. } | AccessCatalog { source, .. } => source.status_code(), + RegisterTable { source, .. } + | DeregisterTable { source, .. } + | AccessCatalog { source, .. } => source.status_code(), DecodeLogicalPlan { source } => source.status_code(), NewCatalog { source } | RegisterSchema { source } => source.status_code(), @@ -454,6 +483,8 @@ impl ErrorExt for Error { CreateTable { source, .. } | CheckRegion { source, .. } => source.status_code(), DropTable { source, .. } => source.status_code(), FlushTable { source, .. } => source.status_code(), + GetTable { source, .. } => source.status_code(), + CloseTable { source, .. } => source.status_code(), Insert { source, .. } => source.status_code(), Delete { source, .. } => source.status_code(), diff --git a/src/datanode/src/heartbeat/handler.rs b/src/datanode/src/heartbeat/handler.rs index 34f4724edfc7..b41b8fcaed4f 100644 --- a/src/datanode/src/heartbeat/handler.rs +++ b/src/datanode/src/heartbeat/handler.rs @@ -20,6 +20,7 @@ use common_telemetry::error; use crate::error::Result; use crate::heartbeat::mailbox::{IncomingMessage, MailboxRef}; +pub mod close_region; pub mod open_region; pub mod parse_mailbox_message; #[cfg(test)] @@ -32,7 +33,15 @@ pub struct HeartbeatResponseHandlerContext { pub mailbox: MailboxRef, pub response: HeartbeatResponse, pub incoming_message: Option, - is_skip_all: bool, +} + +/// HandleControl +/// +/// Controls process of handling heartbeat response. +#[derive(PartialEq)] +pub enum HandleControl { + Continue, + Done, } impl HeartbeatResponseHandlerContext { @@ -41,23 +50,19 @@ impl HeartbeatResponseHandlerContext { mailbox, response, incoming_message: None, - is_skip_all: false, } } - - pub fn is_skip_all(&self) -> bool { - self.is_skip_all - } - - pub fn finish(&mut self) { - self.is_skip_all = true - } } +/// HeartbeatResponseHandler +/// +/// [`HeartbeatResponseHandler::is_acceptable`] returns true if handler can handle incoming [`HeartbeatResponseHandlerContext`]. +/// +/// [`HeartbeatResponseHandler::handle`] handles all or part of incoming [`HeartbeatResponseHandlerContext`]. pub trait HeartbeatResponseHandler: Send + Sync { fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool; - fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result<()>; + fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result; } pub trait HeartbeatResponseHandlerExecutor: Send + Sync { @@ -77,16 +82,17 @@ impl HandlerGroupExecutor { impl HeartbeatResponseHandlerExecutor for HandlerGroupExecutor { fn handle(&self, mut ctx: HeartbeatResponseHandlerContext) -> Result<()> { for handler in &self.handlers { - if ctx.is_skip_all() { - break; - } - if !handler.is_acceptable(&ctx) { continue; } - if let Err(e) = handler.handle(&mut ctx) { - error!(e;"Error while handling: {:?}", ctx.response); + match handler.handle(&mut ctx) { + Ok(HandleControl::Done) => break, + Ok(HandleControl::Continue) => {} + Err(e) => { + error!(e;"Error while handling: {:?}", ctx.response); + break; + } } } Ok(()) diff --git a/src/datanode/src/heartbeat/handler/close_region.rs b/src/datanode/src/heartbeat/handler/close_region.rs new file mode 100644 index 000000000000..810912d49135 --- /dev/null +++ b/src/datanode/src/heartbeat/handler/close_region.rs @@ -0,0 +1,227 @@ +// Copyright 2023 Greptime Team +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use std::sync::Arc; + +use catalog::{CatalogManagerRef, DeregisterTableRequest}; +use common_catalog::format_full_table_name; +use common_meta::instruction::{Instruction, InstructionReply, RegionIdent, SimpleReply}; +use common_telemetry::{error, warn}; +use log::info; +use snafu::ResultExt; +use store_api::storage::RegionNumber; +use table::engine::manager::TableEngineManagerRef; +use table::engine::{CloseTableResult, EngineContext, TableReference}; +use table::requests::CloseTableRequest; + +use crate::error::{self, Result}; +use crate::heartbeat::handler::{HandleControl, HeartbeatResponseHandler}; +use crate::heartbeat::HeartbeatResponseHandlerContext; + +#[derive(Clone)] +pub struct CloseRegionHandler { + catalog_manager: CatalogManagerRef, + table_engine_manager: TableEngineManagerRef, +} + +impl HeartbeatResponseHandler for CloseRegionHandler { + fn is_acceptable(&self, ctx: &HeartbeatResponseHandlerContext) -> bool { + matches!( + ctx.incoming_message.as_ref(), + Some((_, Instruction::CloseRegion { .. })) + ) + } + + fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result { + let Some((meta, Instruction::CloseRegion(region_ident))) = ctx.incoming_message.take() else { + unreachable!("CloseRegionHandler: should be guarded by 'is_acceptable'"); + }; + + let mailbox = ctx.mailbox.clone(); + let self_ref = Arc::new(self.clone()); + + let RegionIdent { + engine, + catalog, + schema, + table, + region_number, + .. + } = region_ident; + + common_runtime::spawn_bg(async move { + let result = self_ref + .close_region_inner( + engine, + &TableReference::full(&catalog, &schema, &table), + vec![region_number], + ) + .await; + + if let Err(e) = mailbox + .send((meta, CloseRegionHandler::map_result(result))) + .await + { + error!(e;"Failed to send reply to mailbox"); + } + }); + + Ok(HandleControl::Done) + } +} + +impl CloseRegionHandler { + pub fn new( + catalog_manager: CatalogManagerRef, + table_engine_manager: TableEngineManagerRef, + ) -> Self { + Self { + catalog_manager, + table_engine_manager, + } + } + + fn map_result(result: Result) -> InstructionReply { + result.map_or_else( + |error| { + InstructionReply::CloseRegion(SimpleReply { + result: false, + error: Some(error.to_string()), + }) + }, + |result| { + InstructionReply::CloseRegion(SimpleReply { + result, + error: None, + }) + }, + ) + } + + /// Returns true if a table or target regions have been closed. + async fn regions_closed( + &self, + catalog_name: &str, + schema_name: &str, + table_name: &str, + region_numbers: &[RegionNumber], + ) -> Result { + if let Some(table) = self + .catalog_manager + .table(catalog_name, schema_name, table_name) + .await + .context(error::AccessCatalogSnafu)? + { + for r in region_numbers { + let region_exist = + table + .contains_region(*r) + .with_context(|_| error::CheckRegionSnafu { + table_name: format_full_table_name( + catalog_name, + schema_name, + table_name, + ), + region_number: *r, + })?; + if region_exist { + return Ok(false); + } + } + } + // Returns true if table not exist + Ok(true) + } + + async fn close_region_inner( + &self, + engine: String, + table_ref: &TableReference<'_>, + region_numbers: Vec, + ) -> Result { + let engine = + self.table_engine_manager + .engine(&engine) + .context(error::TableEngineNotFoundSnafu { + engine_name: &engine, + })?; + let ctx = EngineContext::default(); + + if self + .regions_closed( + table_ref.catalog, + table_ref.schema, + table_ref.table, + ®ion_numbers, + ) + .await? + { + return Ok(true); + } + + if engine + .get_table(&ctx, table_ref) + .with_context(|_| error::GetTableSnafu { + table_name: table_ref.to_string(), + })? + .is_some() + { + return match engine + .close_table( + &ctx, + CloseTableRequest { + catalog_name: table_ref.catalog.to_string(), + schema_name: table_ref.schema.to_string(), + table_name: table_ref.table.to_string(), + region_numbers: region_numbers.clone(), + }, + ) + .await + .with_context(|_| error::CloseTableSnafu { + table_name: table_ref.to_string(), + region_numbers: region_numbers.clone(), + })? { + CloseTableResult::NotFound | CloseTableResult::Released(_) => { + // Deregister table if The table released. + self.deregister_table(table_ref).await + } + CloseTableResult::PartialClosed(regions) => { + // Requires caller to update the region_numbers + info!( + "Close partial regions: {:?} in table: {}", + regions, table_ref + ); + Ok(true) + } + }; + } + + warn!("Trying to close a non-existing table: {}", table_ref); + // Table doesn't exist + Ok(true) + } + + async fn deregister_table(&self, table_ref: &TableReference<'_>) -> Result { + self.catalog_manager + .deregister_table(DeregisterTableRequest { + catalog: table_ref.catalog.to_string(), + schema: table_ref.schema.to_string(), + table_name: table_ref.table.to_string(), + }) + .await + .with_context(|_| error::DeregisterTableSnafu { + table_name: table_ref.to_string(), + }) + } +} diff --git a/src/datanode/src/heartbeat/handler/open_region.rs b/src/datanode/src/heartbeat/handler/open_region.rs index 3c2273880ba7..95f21823f01f 100644 --- a/src/datanode/src/heartbeat/handler/open_region.rs +++ b/src/datanode/src/heartbeat/handler/open_region.rs @@ -26,7 +26,7 @@ use table::engine::EngineContext; use table::requests::OpenTableRequest; use crate::error::{self, Result}; -use crate::heartbeat::handler::HeartbeatResponseHandler; +use crate::heartbeat::handler::{HandleControl, HeartbeatResponseHandler}; use crate::heartbeat::HeartbeatResponseHandlerContext; #[derive(Clone)] @@ -43,12 +43,11 @@ impl HeartbeatResponseHandler for OpenRegionHandler { ) } - fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result<()> { + fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result { let Some((meta, Instruction::OpenRegion(region_ident))) = ctx.incoming_message.take() else { unreachable!("OpenRegionHandler: should be guarded by 'is_acceptable'"); }; - ctx.finish(); let mailbox = ctx.mailbox.clone(); let self_ref = Arc::new(self.clone()); @@ -62,7 +61,7 @@ impl HeartbeatResponseHandler for OpenRegionHandler { error!(e; "Failed to send reply to mailbox"); } }); - Ok(()) + Ok(HandleControl::Done) } } @@ -117,8 +116,8 @@ impl OpenRegionHandler { ) } - /// Returns true if table has been opened. - async fn check_table( + /// Returns true if a table or target regions have been opened. + async fn regions_opened( &self, catalog_name: &str, schema_name: &str, @@ -152,8 +151,9 @@ impl OpenRegionHandler { return Ok(false); } } + return Ok(true); } - Ok(true) + Ok(false) } async fn open_region_inner(&self, engine: String, request: OpenTableRequest) -> Result { @@ -173,7 +173,7 @@ impl OpenRegionHandler { let ctx = EngineContext::default(); if self - .check_table(catalog_name, schema_name, table_name, region_numbers) + .regions_opened(catalog_name, schema_name, table_name, region_numbers) .await? { return Ok(true); @@ -207,6 +207,8 @@ impl OpenRegionHandler { // Therefore, we won't meet this case, in theory. // Case 2: The target region was not found in table meta + + // Case 3: The table not exist Ok(false) } } diff --git a/src/datanode/src/heartbeat/handler/parse_mailbox_message.rs b/src/datanode/src/heartbeat/handler/parse_mailbox_message.rs index fc0ee46a5aaf..bc7044011517 100644 --- a/src/datanode/src/heartbeat/handler/parse_mailbox_message.rs +++ b/src/datanode/src/heartbeat/handler/parse_mailbox_message.rs @@ -13,7 +13,9 @@ // limitations under the License. use crate::error::Result; -use crate::heartbeat::handler::{HeartbeatResponseHandler, HeartbeatResponseHandlerContext}; +use crate::heartbeat::handler::{ + HandleControl, HeartbeatResponseHandler, HeartbeatResponseHandlerContext, +}; use crate::heartbeat::utils::mailbox_message_to_incoming_message; #[derive(Default)] @@ -24,7 +26,7 @@ impl HeartbeatResponseHandler for ParseMailboxMessageHandler { true } - fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result<()> { + fn handle(&self, ctx: &mut HeartbeatResponseHandlerContext) -> Result { if let Some(message) = &ctx.response.mailbox_message { if message.payload.is_some() { // mailbox_message_to_incoming_message will raise an error if payload is none @@ -32,6 +34,6 @@ impl HeartbeatResponseHandler for ParseMailboxMessageHandler { } } - Ok(()) + Ok(HandleControl::Continue) } } diff --git a/src/datanode/src/instance.rs b/src/datanode/src/instance.rs index 18111b1bd0c7..8ac6ae38b5b3 100644 --- a/src/datanode/src/instance.rs +++ b/src/datanode/src/instance.rs @@ -58,6 +58,7 @@ use crate::error::{ NewCatalogSnafu, OpenLogStoreSnafu, RecoverProcedureSnafu, Result, ShutdownInstanceSnafu, StartProcedureManagerSnafu, StopProcedureManagerSnafu, }; +use crate::heartbeat::handler::close_region::CloseRegionHandler; use crate::heartbeat::handler::open_region::OpenRegionHandler; use crate::heartbeat::handler::parse_mailbox_message::ParseMailboxMessageHandler; use crate::heartbeat::handler::HandlerGroupExecutor; @@ -199,6 +200,18 @@ impl Instance { let factory = QueryEngineFactory::new(catalog_manager.clone(), false); let query_engine = factory.query_engine(); + let handlers_executor = HandlerGroupExecutor::new(vec![ + Arc::new(ParseMailboxMessageHandler::default()), + Arc::new(OpenRegionHandler::new( + catalog_manager.clone(), + engine_manager.clone(), + )), + Arc::new(CloseRegionHandler::new( + catalog_manager.clone(), + engine_manager.clone(), + )), + ]); + let heartbeat_task = match opts.mode { Mode::Standalone => None, Mode::Distributed => Some(HeartbeatTask::new( @@ -207,13 +220,7 @@ impl Instance { opts.rpc_hostname.clone(), meta_client.as_ref().unwrap().clone(), catalog_manager.clone(), - Arc::new(HandlerGroupExecutor::new(vec![ - Arc::new(ParseMailboxMessageHandler::default()), - Arc::new(OpenRegionHandler::new( - catalog_manager.clone(), - engine_manager.clone(), - )), - ])), + Arc::new(handlers_executor), )), }; diff --git a/src/datanode/src/sql.rs b/src/datanode/src/sql.rs index f5fd04978ece..611bfe7f56fa 100644 --- a/src/datanode/src/sql.rs +++ b/src/datanode/src/sql.rs @@ -98,8 +98,12 @@ impl SqlHandler { Ok(table) } - pub fn table_engine_manager(&self) -> TableEngineManagerRef { - self.table_engine_manager.clone() + pub fn table_engine_manager(&self) -> &TableEngineManagerRef { + &self.table_engine_manager + } + + pub fn catalog_manager(&self) -> &CatalogManagerRef { + &self.catalog_manager } pub fn table_engine(&self, table: TableRef) -> Result { diff --git a/src/datanode/src/tests.rs b/src/datanode/src/tests.rs index 10c7ca845992..aad0e6e6743f 100644 --- a/src/datanode/src/tests.rs +++ b/src/datanode/src/tests.rs @@ -12,4 +12,270 @@ // See the License for the specific language governing permissions and // limitations under the License. +use std::assert_matches::assert_matches; +use std::sync::Arc; + +use api::v1::greptime_request::Request as GrpcRequest; +use api::v1::meta::HeartbeatResponse; +use api::v1::query_request::Query; +use api::v1::QueryRequest; +use catalog::CatalogManagerRef; +use common_meta::instruction::{Instruction, InstructionReply, RegionIdent, SimpleReply}; +use common_query::Output; +use datatypes::prelude::ConcreteDataType; +use servers::query_handler::grpc::GrpcQueryHandler; +use session::context::QueryContext; +use table::engine::manager::TableEngineManagerRef; +use test_util::MockInstance; +use tokio::sync::mpsc::{self, Receiver}; + +use crate::heartbeat::handler::close_region::CloseRegionHandler; +use crate::heartbeat::handler::open_region::OpenRegionHandler; +use crate::heartbeat::handler::{ + HandlerGroupExecutor, HeartbeatResponseHandlerContext, HeartbeatResponseHandlerExecutor, +}; +use crate::heartbeat::mailbox::{HeartbeatMailbox, MessageMeta}; +use crate::instance::Instance; + pub(crate) mod test_util; + +struct HandlerTestGuard { + instance: MockInstance, + mailbox: Arc, + rx: Receiver<(MessageMeta, InstructionReply)>, + engine_manager_ref: TableEngineManagerRef, + catalog_manager_ref: CatalogManagerRef, +} + +#[tokio::test] +async fn test_close_region_handler() { + let HandlerTestGuard { + instance, + mailbox, + mut rx, + engine_manager_ref, + catalog_manager_ref, + .. + } = parepare_handler_test("test_close_region_handler").await; + + let executor = Arc::new(HandlerGroupExecutor::new(vec![Arc::new( + CloseRegionHandler::new(catalog_manager_ref.clone(), engine_manager_ref.clone()), + )])); + + parepare_table(instance.inner()).await; + + // Closes demo table + handle_instruction( + executor.clone(), + mailbox.clone(), + close_region_instruction(), + ); + let (_, reply) = rx.recv().await.unwrap(); + assert_matches!( + reply, + InstructionReply::CloseRegion(SimpleReply { result: true, .. }) + ); + + assert_test_table_not_found(instance.inner()).await; + + // Closes demo table again + handle_instruction( + executor.clone(), + mailbox.clone(), + close_region_instruction(), + ); + let (_, reply) = rx.recv().await.unwrap(); + assert_matches!( + reply, + InstructionReply::CloseRegion(SimpleReply { result: true, .. }) + ); + + // Closes non-exist table + handle_instruction( + executor.clone(), + mailbox.clone(), + Instruction::CloseRegion(RegionIdent { + catalog: "greptime".to_string(), + schema: "public".to_string(), + table: "non-exist".to_string(), + table_id: 1025, + engine: "mito".to_string(), + region_number: 0, + cluster_id: 1, + datanode_id: 2, + }), + ); + let (_, reply) = rx.recv().await.unwrap(); + assert_matches!( + reply, + InstructionReply::CloseRegion(SimpleReply { result: true, .. }) + ); +} + +#[tokio::test] +async fn test_open_region_handler() { + let HandlerTestGuard { + instance, + mailbox, + mut rx, + engine_manager_ref, + catalog_manager_ref, + .. + } = parepare_handler_test("test_open_region_handler").await; + + let executor = Arc::new(HandlerGroupExecutor::new(vec![ + Arc::new(OpenRegionHandler::new( + catalog_manager_ref.clone(), + engine_manager_ref.clone(), + )), + Arc::new(CloseRegionHandler::new( + catalog_manager_ref.clone(), + engine_manager_ref.clone(), + )), + ])); + + parepare_table(instance.inner()).await; + + // Opens a opened table + handle_instruction(executor.clone(), mailbox.clone(), open_region_instruction()); + let (_, reply) = rx.recv().await.unwrap(); + assert_matches!( + reply, + InstructionReply::OpenRegion(SimpleReply { result: true, .. }) + ); + + // Opens a non-exist table + handle_instruction( + executor.clone(), + mailbox.clone(), + Instruction::OpenRegion(RegionIdent { + catalog: "greptime".to_string(), + schema: "public".to_string(), + table: "non-exist".to_string(), + table_id: 2024, + engine: "mito".to_string(), + region_number: 0, + cluster_id: 1, + datanode_id: 2, + }), + ); + let (_, reply) = rx.recv().await.unwrap(); + assert_matches!( + reply, + InstructionReply::OpenRegion(SimpleReply { result: false, .. }) + ); + + // Closes demo table + handle_instruction( + executor.clone(), + mailbox.clone(), + close_region_instruction(), + ); + let (_, reply) = rx.recv().await.unwrap(); + assert_matches!( + reply, + InstructionReply::CloseRegion(SimpleReply { result: true, .. }) + ); + assert_test_table_not_found(instance.inner()).await; + + // Opens demo table + handle_instruction(executor.clone(), mailbox.clone(), open_region_instruction()); + let (_, reply) = rx.recv().await.unwrap(); + assert_matches!( + reply, + InstructionReply::OpenRegion(SimpleReply { result: true, .. }) + ); + assert_test_table_found(instance.inner()).await; +} + +async fn parepare_handler_test(name: &str) -> HandlerTestGuard { + let mock_instance = MockInstance::new(name).await; + let instance = mock_instance.inner(); + let engine_manager = instance.sql_handler().table_engine_manager().clone(); + let catalog_manager = instance.sql_handler().catalog_manager().clone(); + let (tx, rx) = mpsc::channel(8); + let mailbox = Arc::new(HeartbeatMailbox::new(tx)); + + HandlerTestGuard { + instance: mock_instance, + mailbox, + rx, + engine_manager_ref: engine_manager, + catalog_manager_ref: catalog_manager, + } +} + +fn handle_instruction( + executor: Arc, + mailbox: Arc, + instruction: Instruction, +) { + let response = HeartbeatResponse::default(); + let mut ctx: HeartbeatResponseHandlerContext = + HeartbeatResponseHandlerContext::new(mailbox, response); + ctx.incoming_message = Some((MessageMeta::new_test(1, "hi", "foo", "bar"), instruction)); + executor.handle(ctx).unwrap(); +} + +fn close_region_instruction() -> Instruction { + Instruction::CloseRegion(RegionIdent { + catalog: "greptime".to_string(), + schema: "public".to_string(), + table: "demo".to_string(), + table_id: 1024, + engine: "mito".to_string(), + region_number: 0, + cluster_id: 1, + datanode_id: 2, + }) +} + +fn open_region_instruction() -> Instruction { + Instruction::OpenRegion(RegionIdent { + catalog: "greptime".to_string(), + schema: "public".to_string(), + table: "demo".to_string(), + table_id: 1024, + engine: "mito".to_string(), + region_number: 0, + cluster_id: 1, + datanode_id: 2, + }) +} + +async fn parepare_table(instance: &Instance) { + test_util::create_test_table(instance, ConcreteDataType::timestamp_millisecond_datatype()) + .await + .unwrap(); +} + +async fn assert_test_table_not_found(instance: &Instance) { + let query = GrpcRequest::Query(QueryRequest { + query: Some(Query::Sql( + "INSERT INTO demo(host, cpu, memory, ts) VALUES \ + ('host1', 66.6, 1024, 1672201025000),\ + ('host2', 88.8, 333.3, 1672201026000)" + .to_string(), + )), + }); + let output = instance + .do_query(query, QueryContext::arc()) + .await + .unwrap_err(); + + assert_eq!(output.to_string(), "Failed to execute sql, source: Failure during query execution, source: Table not found: greptime.public.demo"); +} + +async fn assert_test_table_found(instance: &Instance) { + let query = GrpcRequest::Query(QueryRequest { + query: Some(Query::Sql( + "INSERT INTO demo(host, cpu, memory, ts) VALUES \ + ('host1', 66.6, 1024, 1672201025000),\ + ('host2', 88.8, 333.3, 1672201026000)" + .to_string(), + )), + }); + let output = instance.do_query(query, QueryContext::arc()).await.unwrap(); + + assert!(matches!(output, Output::AffectedRows(2))); +} diff --git a/src/file-table-engine/src/table/immutable.rs b/src/file-table-engine/src/table/immutable.rs index 3f80253b0fdc..980c4d9b02a6 100644 --- a/src/file-table-engine/src/table/immutable.rs +++ b/src/file-table-engine/src/table/immutable.rs @@ -104,10 +104,6 @@ impl Table for ImmutableFileTable { // nothing to flush Ok(()) } - - async fn close(&self, _region_number: &[RegionNumber]) -> TableResult<()> { - Ok(()) - } } impl ImmutableFileTable { diff --git a/src/mito/src/engine.rs b/src/mito/src/engine.rs index a13877053c30..f559c90d20e7 100644 --- a/src/mito/src/engine.rs +++ b/src/mito/src/engine.rs @@ -739,6 +739,12 @@ impl MitoEngineInner { if table.is_releasable() { self.tables.remove(&table_ref.to_string()); + + logging::info!( + "Mito engine closed table: {} in schema: {}", + table_ref.table, + table_ref.schema, + ); return Ok(CloseTableResult::Released(removed_regions)); }