Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat(meta): support group notification #8741

Merged
merged 3 commits into from
Mar 24, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions proto/meta.proto
Original file line number Diff line number Diff line change
Expand Up @@ -251,6 +251,21 @@ message MetaSnapshot {
SnapshotVersion version = 13;
}

message Relation {
oneof relation_info {
catalog.Table table = 1;
catalog.Source source = 2;
catalog.Sink sink = 3;
catalog.Index index = 4;
catalog.View view = 5;
catalog.Function function = 6;
}
}

message RelationGroup {
repeated Relation relations = 1;
}

message SubscribeResponse {
enum Operation {
UNSPECIFIED = 0;
Expand All @@ -265,12 +280,6 @@ message SubscribeResponse {
oneof info {
catalog.Database database = 4;
catalog.Schema schema = 5;
catalog.Table table = 6;
catalog.Source source = 7;
catalog.Sink sink = 8;
catalog.Index index = 9;
catalog.View view = 10;
catalog.Function function = 18;
user.UserInfo user = 11;
FragmentParallelUnitMapping parallel_unit_mapping = 12;
common.WorkerNode node = 13;
Expand All @@ -280,6 +289,7 @@ message SubscribeResponse {
backup_service.MetaBackupManifestId meta_backup_manifest_id = 17;
SystemParams system_params = 19;
hummock.WriteLimits hummock_write_limits = 20;
RelationGroup relation_group = 21;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess we can ignore the compatibility here if we're not going to support progressive upgrades in a cluster. 😁

}
}

Expand Down
10 changes: 1 addition & 9 deletions src/common/common_service/src/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,15 +116,7 @@ where
};

notification_vec.retain_mut(|notification| match notification.info.as_ref().unwrap() {
Info::Database(_)
| Info::Schema(_)
| Info::Table(_)
| Info::Source(_)
| Info::Sink(_)
| Info::Index(_)
| Info::View(_)
| Info::Function(_)
| Info::User(_) => {
Info::Database(_) | Info::Schema(_) | Info::RelationGroup(_) | Info::User(_) => {
notification.version > info.version.as_ref().unwrap().catalog_version
}
Info::ParallelUnitMapping(_) => {
Expand Down
124 changes: 71 additions & 53 deletions src/frontend/src/observer/observer_manager.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ use risingwave_common::hash::ParallelUnitMapping;
use risingwave_common::system_param::local_manager::LocalSystemParamsManagerRef;
use risingwave_common_service::observer_manager::{ObserverState, SubscribeFrontend};
use risingwave_pb::common::WorkerNode;
use risingwave_pb::meta::relation::RelationInfo;
use risingwave_pb::meta::subscribe_response::{Info, Operation};
use risingwave_pb::meta::{FragmentParallelUnitMapping, SubscribeResponse};
use tokio::sync::watch::Sender;
Expand Down Expand Up @@ -49,14 +50,7 @@ impl ObserverState for FrontendObserverNode {
};

match info.to_owned() {
Info::Database(_)
| Info::Schema(_)
| Info::Table(_)
| Info::Source(_)
| Info::Index(_)
| Info::Sink(_)
| Info::Function(_)
| Info::View(_) => {
Info::Database(_) | Info::Schema(_) | Info::RelationGroup(_) => {
self.handle_catalog_notification(resp);
}
Info::Node(node) => {
Expand Down Expand Up @@ -193,52 +187,76 @@ impl FrontendObserverNode {
Operation::Delete => catalog_guard.drop_schema(schema.database_id, schema.id),
_ => panic!("receive an unsupported notify {:?}", resp),
},
Info::Table(table) => match resp.operation() {
Operation::Add => catalog_guard.create_table(table),
Operation::Delete => {
catalog_guard.drop_table(table.database_id, table.schema_id, table.id.into())
}
Operation::Update => catalog_guard.update_table(table),
_ => panic!("receive an unsupported notify {:?}", resp),
},
Info::Source(source) => match resp.operation() {
Operation::Add => catalog_guard.create_source(source),
Operation::Delete => {
catalog_guard.drop_source(source.database_id, source.schema_id, source.id)
}
_ => panic!("receive an unsupported notify {:?}", resp),
},
Info::Sink(sink) => match resp.operation() {
Operation::Add => catalog_guard.create_sink(sink),
Operation::Delete => {
catalog_guard.drop_sink(sink.database_id, sink.schema_id, sink.id)
}
_ => panic!("receive an unsupported notify {:?}", resp),
},
Info::Index(index) => match resp.operation() {
Operation::Add => catalog_guard.create_index(index),
Operation::Delete => {
catalog_guard.drop_index(index.database_id, index.schema_id, index.id.into())
}
Operation::Update => catalog_guard.update_index(index),
_ => panic!("receive an unsupported notify {:?}", resp),
},
Info::View(view) => match resp.operation() {
Operation::Add => catalog_guard.create_view(view),
Operation::Delete => {
catalog_guard.drop_view(view.database_id, view.schema_id, view.id)
Info::RelationGroup(relation_group) => {
for relation in &relation_group.relations {
let Some(relation) = relation.relation_info.as_ref() else {
continue;
};
match relation {
RelationInfo::Table(table) => match resp.operation() {
Operation::Add => catalog_guard.create_table(table),
Operation::Delete => catalog_guard.drop_table(
table.database_id,
table.schema_id,
table.id.into(),
),
Operation::Update => {
let old_table =
catalog_guard.get_table_by_id(&table.id.into()).unwrap();
catalog_guard.update_table(table);
assert!(old_table.fragment_id != table.fragment_id);
// FIXME: the frontend node delete its fragment for the update
// operation by itself.
self.worker_node_manager
.remove_fragment_mapping(&old_table.fragment_id);
}
_ => panic!("receive an unsupported notify {:?}", resp),
},
RelationInfo::Source(source) => match resp.operation() {
Operation::Add => catalog_guard.create_source(source),
Operation::Delete => catalog_guard.drop_source(
source.database_id,
source.schema_id,
source.id,
),
_ => panic!("receive an unsupported notify {:?}", resp),
},
RelationInfo::Sink(sink) => match resp.operation() {
Operation::Add => catalog_guard.create_sink(sink),
Operation::Delete => {
catalog_guard.drop_sink(sink.database_id, sink.schema_id, sink.id)
}
_ => panic!("receive an unsupported notify {:?}", resp),
},
RelationInfo::Index(index) => match resp.operation() {
Operation::Add => catalog_guard.create_index(index),
Operation::Delete => catalog_guard.drop_index(
index.database_id,
index.schema_id,
index.id.into(),
),
Operation::Update => catalog_guard.update_index(index),
_ => panic!("receive an unsupported notify {:?}", resp),
},
RelationInfo::View(view) => match resp.operation() {
Operation::Add => catalog_guard.create_view(view),
Operation::Delete => {
catalog_guard.drop_view(view.database_id, view.schema_id, view.id)
}
_ => panic!("receive an unsupported notify {:?}", resp),
},
RelationInfo::Function(function) => match resp.operation() {
Operation::Add => catalog_guard.create_function(function),
Operation::Delete => catalog_guard.drop_function(
function.database_id,
function.schema_id,
function.id.into(),
),
_ => panic!("receive an unsupported notify {:?}", resp),
},
}
}
_ => panic!("receive an unsupported notify {:?}", resp),
},
Info::Function(function) => match resp.operation() {
Operation::Add => catalog_guard.create_function(function),
Operation::Delete => catalog_guard.drop_function(
function.database_id,
function.schema_id,
function.id.into(),
),
_ => panic!("receive an unsupported notify {:?}", resp),
},
}
_ => unreachable!(),
}
assert!(
Expand Down
5 changes: 3 additions & 2 deletions src/meta/src/hummock/manager/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,7 @@ use risingwave_hummock_sdk::table_stats::{
use risingwave_pb::catalog::Table;
use risingwave_pb::hummock::version_update_payload::Payload;
use risingwave_pb::hummock::PbCompactionGroupInfo;
use risingwave_pb::meta::relation::RelationInfo;

/// Acquire write lock of the lock with `lock_name`.
/// The macro will use macro `function_name` to get the name of the function of method that calls
Expand Down Expand Up @@ -1732,11 +1733,11 @@ where
for table in table_catalogs {
self.env
.notification_manager()
.notify_hummock(Operation::Add, Info::Table(table.clone()))
.notify_hummock_relation_info(Operation::Add, RelationInfo::Table(table.clone()))
.await;
self.env
.notification_manager()
.notify_compactor(Operation::Add, Info::Table(table))
.notify_compactor_relation_info(Operation::Add, RelationInfo::Table(table))
.await;
}

Expand Down
7 changes: 5 additions & 2 deletions src/meta/src/manager/catalog/fragment.rs
Original file line number Diff line number Diff line change
Expand Up @@ -279,6 +279,7 @@ where
// FIXME: we use a dummy table ID for new table fragments, so we can drop the old fragments
// with the real table ID, then replace the dummy table ID with the real table ID. This is a
// workaround for not having the version info in the fragment manager.
#[allow(unused_variables)]
let old_table_fragment = table_fragments
.remove(table_id)
.with_context(|| format!("table_fragment not exist: id={}", table_id))?;
Expand Down Expand Up @@ -344,8 +345,10 @@ where
// Commit changes and notify about the changes.
commit_meta!(self, table_fragments)?;

self.notify_fragment_mapping(&old_table_fragment, Operation::Delete)
.await;
// FIXME: Do not notify frontend currently, because frontend nodes might refer to old table
// catalog and need to access the old fragment. Let frontend nodes delete the old fragment
// when they receive table catalog change. self.notify_fragment_mapping(&
// old_table_fragment, Operation::Delete) .await;
self.notify_fragment_mapping(&table_fragment, Operation::Add)
.await;

Expand Down
Loading