From daf3ea3b23b51dbc9ee52fd7f024e488c6614e56 Mon Sep 17 00:00:00 2001 From: Zehua Zou <41586196+HuaHuaY@users.noreply.github.com> Date: Thu, 10 Nov 2022 18:18:05 +0800 Subject: [PATCH] feat(frontend): add `pg_catalog.pg_views` and support `\dv` (#6295) * add pg_views * support \dv * cargo fmt Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com> --- .../src/catalog/system_catalog/mod.rs | 35 ++++++++-------- .../catalog/system_catalog/pg_catalog/mod.rs | 42 +++++++++++++++++++ .../system_catalog/pg_catalog/pg_views.rs | 29 +++++++++++++ src/frontend/src/lib.rs | 1 + src/meta/src/manager/catalog/database.rs | 14 ++----- src/meta/src/manager/catalog/mod.rs | 15 +++---- 6 files changed, 99 insertions(+), 37 deletions(-) create mode 100644 src/frontend/src/catalog/system_catalog/pg_catalog/pg_views.rs diff --git a/src/frontend/src/catalog/system_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/mod.rs index b5babdc73e7e1..2487d06aa59ef 100644 --- a/src/frontend/src/catalog/system_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/mod.rs @@ -145,13 +145,13 @@ pub fn get_sys_catalogs_in_schema(schema_name: &str) -> Option { + ($( { $schema_name:expr, $catalog_name:ident, $pk:expr, $func:tt $($await:tt)? } ),* $(,)?) => { /// `SYS_CATALOG_MAP` includes all system catalogs. pub(crate) static SYS_CATALOG_MAP: LazyLock>> = LazyLock::new(|| { let mut hash_map: HashMap<&str, Vec> = HashMap::new(); $( paste!{ - let sys_catalog = def_sys_catalog!($catalog_id, [<$catalog_name _TABLE_NAME>], [<$catalog_name _COLUMNS>], $pk); + let sys_catalog = def_sys_catalog!(${index()} + 1, [<$catalog_name _TABLE_NAME>], [<$catalog_name _COLUMNS>], $pk); hash_map.entry([<$schema_name _SCHEMA_NAME>]).or_insert(vec![]).push(sys_catalog); } )* @@ -161,9 +161,9 @@ macro_rules! prepare_sys_catalog { #[async_trait] impl SysCatalogReader for SysCatalogReaderImpl { async fn read_table(&self, table_id: &TableId) -> Result> { - match table_id.table_id { + match table_id.table_id - 1 { $( - $catalog_id => { + ${index()} => { let rows = self.$func(); $(let rows = rows.$await;)? rows @@ -178,17 +178,18 @@ macro_rules! prepare_sys_catalog { // If you added a new system catalog, be sure to add a corresponding entry here. prepare_sys_catalog! { - { 1, PG_CATALOG, PG_TYPE, vec![0], read_types }, - { 2, PG_CATALOG, PG_NAMESPACE, vec![0], read_namespace }, - { 3, PG_CATALOG, PG_CAST, vec![0], read_cast }, - { 4, PG_CATALOG, PG_MATVIEWS_INFO, vec![0], read_mviews_info await }, - { 5, PG_CATALOG, PG_USER, vec![0], read_user_info }, - { 6, PG_CATALOG, PG_CLASS, vec![0], read_class_info }, - { 7, PG_CATALOG, PG_INDEX, vec![0], read_index_info }, - { 8, PG_CATALOG, PG_OPCLASS, vec![0], read_opclass_info }, - { 9, PG_CATALOG, PG_COLLATION, vec![0], read_collation_info }, - { 10, PG_CATALOG, PG_AM, vec![0], read_am_info }, - { 11, PG_CATALOG, PG_OPERATOR, vec![0], read_operator_info }, - { 12, INFORMATION_SCHEMA, COLUMNS, vec![], read_columns_info }, - { 13, INFORMATION_SCHEMA, TABLES, vec![], read_tables_info }, + { PG_CATALOG, PG_TYPE, vec![0], read_types }, + { PG_CATALOG, PG_NAMESPACE, vec![0], read_namespace }, + { PG_CATALOG, PG_CAST, vec![0], read_cast }, + { PG_CATALOG, PG_MATVIEWS_INFO, vec![0], read_mviews_info await }, + { PG_CATALOG, PG_USER, vec![0], read_user_info }, + { PG_CATALOG, PG_CLASS, vec![0], read_class_info }, + { PG_CATALOG, PG_INDEX, vec![0], read_index_info }, + { PG_CATALOG, PG_OPCLASS, vec![0], read_opclass_info }, + { PG_CATALOG, PG_COLLATION, vec![0], read_collation_info }, + { PG_CATALOG, PG_AM, vec![0], read_am_info }, + { PG_CATALOG, PG_OPERATOR, vec![0], read_operator_info }, + { PG_CATALOG, PG_VIEWS, vec![], read_views_info }, + { INFORMATION_SCHEMA, COLUMNS, vec![], read_columns_info }, + { INFORMATION_SCHEMA, TABLES, vec![], read_tables_info }, } diff --git a/src/frontend/src/catalog/system_catalog/pg_catalog/mod.rs b/src/frontend/src/catalog/system_catalog/pg_catalog/mod.rs index 3e0a58f395dcd..a108641ae9a77 100644 --- a/src/frontend/src/catalog/system_catalog/pg_catalog/mod.rs +++ b/src/frontend/src/catalog/system_catalog/pg_catalog/mod.rs @@ -23,6 +23,7 @@ pub mod pg_opclass; pub mod pg_operator; pub mod pg_type; pub mod pg_user; +pub mod pg_views; use std::collections::HashMap; @@ -38,6 +39,7 @@ pub use pg_opclass::*; pub use pg_operator::*; pub use pg_type::*; pub use pg_user::*; +pub use pg_views::*; use risingwave_common::array::Row; use risingwave_common::error::Result; use risingwave_common::types::ScalarImpl; @@ -261,11 +263,25 @@ impl SysCatalogReaderImpl { }) .collect_vec(); + let views = schema + .iter_view() + .map(|view| { + Row::new(vec![ + Some(ScalarImpl::Int32(view.id as i32)), + Some(ScalarImpl::Utf8(view.name().to_string())), + Some(ScalarImpl::Int32(schema_info.id as i32)), + Some(ScalarImpl::Int32(view.owner as i32)), + Some(ScalarImpl::Utf8("v".to_string())), + ]) + }) + .collect_vec(); + rows.into_iter() .chain(mvs.into_iter()) .chain(indexes.into_iter()) .chain(sources.into_iter()) .chain(sys_tables.into_iter()) + .chain(views.into_iter()) .collect_vec() }) .collect_vec()) @@ -327,4 +343,30 @@ impl SysCatalogReaderImpl { Ok(rows) } + + pub(super) fn read_views_info(&self) -> Result> { + // TODO(zehua): solve the deadlock problem. + // Get two read locks. The order must be the same as + // `FrontendObserverNode::handle_initialization_notification`. + let catalog_reader = self.catalog_reader.read_guard(); + let user_info_reader = self.user_info_reader.read_guard(); + let schemas = catalog_reader.iter_schemas(&self.auth_context.database)?; + + Ok(schemas + .flat_map(|schema| { + schema.iter_view().map(|view| { + Row::new(vec![ + Some(ScalarImpl::Utf8(schema.name())), + Some(ScalarImpl::Utf8(view.name().to_string())), + // TODO(zehua): after fix issue #6080, there must be Some. + user_info_reader + .get_user_name_by_id(view.owner) + .map(ScalarImpl::Utf8), + // TODO(zehua): may be not same as postgresql's "definition" column. + Some(ScalarImpl::Utf8(view.sql.clone())), + ]) + }) + }) + .collect_vec()) + } } diff --git a/src/frontend/src/catalog/system_catalog/pg_catalog/pg_views.rs b/src/frontend/src/catalog/system_catalog/pg_catalog/pg_views.rs new file mode 100644 index 0000000000000..fd55a27b9795d --- /dev/null +++ b/src/frontend/src/catalog/system_catalog/pg_catalog/pg_views.rs @@ -0,0 +1,29 @@ +// Copyright 2022 Singularity Data +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +use risingwave_common::types::DataType; + +use crate::catalog::system_catalog::SystemCatalogColumnsDef; + +/// The view `pg_views` provides access to useful information about each view in the database. +/// Ref: [`https://www.postgresql.org/docs/current/view-pg-views.html`] +/// +/// `pg_views` in RisingWave doesn't contain system catalog. +pub const PG_VIEWS_TABLE_NAME: &str = "pg_views"; +pub const PG_VIEWS_COLUMNS: &[SystemCatalogColumnsDef<'_>] = &[ + (DataType::Varchar, "schemaname"), + (DataType::Varchar, "viewname"), + (DataType::Varchar, "viewowner"), + (DataType::Varchar, "definition"), +]; diff --git a/src/frontend/src/lib.rs b/src/frontend/src/lib.rs index ddd626de4de99..b341eea8670c3 100644 --- a/src/frontend/src/lib.rs +++ b/src/frontend/src/lib.rs @@ -27,6 +27,7 @@ #![feature(box_patterns)] #![feature(once_cell)] #![feature(result_option_inspect)] +#![feature(macro_metavar_expr)] #![recursion_limit = "256"] #[macro_use] diff --git a/src/meta/src/manager/catalog/database.rs b/src/meta/src/manager/catalog/database.rs index 0e8bb43de760d..3e4fc3501b1c1 100644 --- a/src/meta/src/manager/catalog/database.rs +++ b/src/meta/src/manager/catalog/database.rs @@ -14,7 +14,6 @@ use std::collections::hash_map::Entry; use std::collections::{BTreeMap, HashMap, HashSet}; -use std::marker::PhantomData; use itertools::Itertools; use risingwave_pb::catalog::{Database, Index, Schema, Sink, Source, Table, View}; @@ -41,7 +40,7 @@ type RelationKey = (DatabaseId, SchemaId, String); /// [`DatabaseManager`] caches meta catalog information and maintains dependent relationship /// between tables. -pub struct DatabaseManager { +pub struct DatabaseManager { /// Cached database information. pub(super) databases: BTreeMap, /// Cached schema information. @@ -68,15 +67,10 @@ pub struct DatabaseManager { pub(super) in_progress_creation_streaming_job: HashSet, // In-progress creating tables, including internal tables. pub(super) in_progress_creating_tables: HashMap, - - _phantom: PhantomData, } -impl DatabaseManager -where - S: MetaStore, -{ - pub async fn new(env: MetaSrvEnv) -> MetaResult { +impl DatabaseManager { + pub async fn new(env: MetaSrvEnv) -> MetaResult { let databases = Database::list(env.meta_store()).await?; let schemas = Schema::list(env.meta_store()).await?; let sources = Source::list(env.meta_store()).await?; @@ -126,8 +120,6 @@ where in_progress_creation_tracker: HashSet::default(), in_progress_creation_streaming_job: HashSet::default(), in_progress_creating_tables: HashMap::default(), - - _phantom: PhantomData, }) } diff --git a/src/meta/src/manager/catalog/mod.rs b/src/meta/src/manager/catalog/mod.rs index 520add7b18a8d..e512c8c0ad7a4 100644 --- a/src/meta/src/manager/catalog/mod.rs +++ b/src/meta/src/manager/catalog/mod.rs @@ -91,19 +91,16 @@ pub type CatalogManagerRef = Arc>; /// to Meta. pub struct CatalogManager { env: MetaSrvEnv, - core: Mutex>, + core: Mutex, } -pub struct CatalogManagerCore { - pub database: DatabaseManager, +pub struct CatalogManagerCore { + pub database: DatabaseManager, pub user: UserManager, } -impl CatalogManagerCore -where - S: MetaStore, -{ - async fn new(env: MetaSrvEnv) -> MetaResult { +impl CatalogManagerCore { + async fn new(env: MetaSrvEnv) -> MetaResult { let database = DatabaseManager::new(env.clone()).await?; let user = UserManager::new(env).await?; Ok(Self { database, user }) @@ -127,7 +124,7 @@ where Ok(()) } - pub async fn get_catalog_core_guard(&self) -> MutexGuard<'_, CatalogManagerCore> { + pub async fn get_catalog_core_guard(&self) -> MutexGuard<'_, CatalogManagerCore> { self.core.lock().await } }