From 2a76837210b9daa29105fd888904155977a0d5a6 Mon Sep 17 00:00:00 2001 From: August Date: Tue, 4 Apr 2023 15:10:30 +0800 Subject: [PATCH] chore: notify function seperately rather than inside the relation group --- proto/meta.proto | 2 +- .../common_service/src/observer_manager.rs | 6 +++++- src/frontend/src/observer/observer_manager.rs | 20 +++++++++---------- src/meta/src/manager/catalog/mod.rs | 7 ++----- 4 files changed, 18 insertions(+), 17 deletions(-) diff --git a/proto/meta.proto b/proto/meta.proto index cd9f044d4f988..e3a500ad4ac37 100644 --- a/proto/meta.proto +++ b/proto/meta.proto @@ -258,7 +258,6 @@ message Relation { catalog.Sink sink = 3; catalog.Index index = 4; catalog.View view = 5; - catalog.Function function = 6; } } @@ -280,6 +279,7 @@ message SubscribeResponse { oneof info { catalog.Database database = 4; catalog.Schema schema = 5; + catalog.Function function = 6; user.UserInfo user = 11; FragmentParallelUnitMapping parallel_unit_mapping = 12; common.WorkerNode node = 13; diff --git a/src/common/common_service/src/observer_manager.rs b/src/common/common_service/src/observer_manager.rs index 695e3b4d4411a..271fb39675e81 100644 --- a/src/common/common_service/src/observer_manager.rs +++ b/src/common/common_service/src/observer_manager.rs @@ -116,7 +116,11 @@ where }; notification_vec.retain_mut(|notification| match notification.info.as_ref().unwrap() { - Info::Database(_) | Info::Schema(_) | Info::RelationGroup(_) | Info::User(_) => { + Info::Database(_) + | Info::Schema(_) + | Info::RelationGroup(_) + | Info::User(_) + | Info::Function(_) => { notification.version > info.version.as_ref().unwrap().catalog_version } Info::ParallelUnitMapping(_) => { diff --git a/src/frontend/src/observer/observer_manager.rs b/src/frontend/src/observer/observer_manager.rs index dbb6044ce1583..db18d2849148b 100644 --- a/src/frontend/src/observer/observer_manager.rs +++ b/src/frontend/src/observer/observer_manager.rs @@ -50,7 +50,7 @@ impl ObserverState for FrontendObserverNode { }; match info.to_owned() { - Info::Database(_) | Info::Schema(_) | Info::RelationGroup(_) => { + Info::Database(_) | Info::Schema(_) | Info::RelationGroup(_) | Info::Function(_) => { self.handle_catalog_notification(resp); } Info::Node(node) => { @@ -269,18 +269,18 @@ impl FrontendObserverNode { Operation::Update => catalog_guard.update_view(view), _ => panic!("receive an unsupported notify {:?}", resp), }, - RelationInfo::Function(function) => match resp.operation() { - Operation::Add => catalog_guard.create_function(function), - Operation::Delete => catalog_guard.drop_function( - function.database_id, - function.schema_id, - function.id.into(), - ), - _ => panic!("receive an unsupported notify {:?}", resp), - }, } } } + Info::Function(function) => match resp.operation() { + Operation::Add => catalog_guard.create_function(function), + Operation::Delete => catalog_guard.drop_function( + function.database_id, + function.schema_id, + function.id.into(), + ), + _ => panic!("receive an unsupported notify {:?}", resp), + }, _ => unreachable!(), } assert!( diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 5c2b278c28e5f..d84862cc7778d 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -519,10 +519,7 @@ where user_core.increase_ref(function.owner); let version = self - .notify_frontend_relation_info( - Operation::Add, - RelationInfo::Function(function.to_owned()), - ) + .notify_frontend(Operation::Add, Info::Function(function.to_owned())) .await; Ok(version) @@ -552,7 +549,7 @@ where } let version = self - .notify_frontend_relation_info(Operation::Delete, RelationInfo::Function(function)) + .notify_frontend(Operation::Delete, Info::Function(function)) .await; Ok(version)