Skip to content

Commit

Permalink
feat(frontend): add pg_catalog.pg_views and support \dv (#6295)
Browse files Browse the repository at this point in the history
* add pg_views

* support \dv

* cargo fmt

Co-authored-by: mergify[bot] <37929162+mergify[bot]@users.noreply.github.com>
  • Loading branch information
HuaHuaY and mergify[bot] authored Nov 10, 2022
1 parent 2e575da commit daf3ea3
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 37 deletions.
35 changes: 18 additions & 17 deletions src/frontend/src/catalog/system_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,13 +145,13 @@ pub fn get_sys_catalogs_in_schema(schema_name: &str) -> Option<Vec<SystemCatalog
}

macro_rules! prepare_sys_catalog {
($( { $catalog_id:expr, $schema_name:expr, $catalog_name:ident, $pk:expr, $func:tt $($await:tt)? } ),* $(,)?) => {
($( { $schema_name:expr, $catalog_name:ident, $pk:expr, $func:tt $($await:tt)? } ),* $(,)?) => {
/// `SYS_CATALOG_MAP` includes all system catalogs.
pub(crate) static SYS_CATALOG_MAP: LazyLock<HashMap<&str, Vec<SystemCatalog>>> = LazyLock::new(|| {
let mut hash_map: HashMap<&str, Vec<SystemCatalog>> = HashMap::new();
$(
paste!{
let sys_catalog = def_sys_catalog!($catalog_id, [<$catalog_name _TABLE_NAME>], [<$catalog_name _COLUMNS>], $pk);
let sys_catalog = def_sys_catalog!(${index()} + 1, [<$catalog_name _TABLE_NAME>], [<$catalog_name _COLUMNS>], $pk);
hash_map.entry([<$schema_name _SCHEMA_NAME>]).or_insert(vec![]).push(sys_catalog);
}
)*
Expand All @@ -161,9 +161,9 @@ macro_rules! prepare_sys_catalog {
#[async_trait]
impl SysCatalogReader for SysCatalogReaderImpl {
async fn read_table(&self, table_id: &TableId) -> Result<Vec<Row>> {
match table_id.table_id {
match table_id.table_id - 1 {
$(
$catalog_id => {
${index()} => {
let rows = self.$func();
$(let rows = rows.$await;)?
rows
Expand All @@ -178,17 +178,18 @@ macro_rules! prepare_sys_catalog {

// If you added a new system catalog, be sure to add a corresponding entry here.
prepare_sys_catalog! {
{ 1, PG_CATALOG, PG_TYPE, vec![0], read_types },
{ 2, PG_CATALOG, PG_NAMESPACE, vec![0], read_namespace },
{ 3, PG_CATALOG, PG_CAST, vec![0], read_cast },
{ 4, PG_CATALOG, PG_MATVIEWS_INFO, vec![0], read_mviews_info await },
{ 5, PG_CATALOG, PG_USER, vec![0], read_user_info },
{ 6, PG_CATALOG, PG_CLASS, vec![0], read_class_info },
{ 7, PG_CATALOG, PG_INDEX, vec![0], read_index_info },
{ 8, PG_CATALOG, PG_OPCLASS, vec![0], read_opclass_info },
{ 9, PG_CATALOG, PG_COLLATION, vec![0], read_collation_info },
{ 10, PG_CATALOG, PG_AM, vec![0], read_am_info },
{ 11, PG_CATALOG, PG_OPERATOR, vec![0], read_operator_info },
{ 12, INFORMATION_SCHEMA, COLUMNS, vec![], read_columns_info },
{ 13, INFORMATION_SCHEMA, TABLES, vec![], read_tables_info },
{ PG_CATALOG, PG_TYPE, vec![0], read_types },
{ PG_CATALOG, PG_NAMESPACE, vec![0], read_namespace },
{ PG_CATALOG, PG_CAST, vec![0], read_cast },
{ PG_CATALOG, PG_MATVIEWS_INFO, vec![0], read_mviews_info await },
{ PG_CATALOG, PG_USER, vec![0], read_user_info },
{ PG_CATALOG, PG_CLASS, vec![0], read_class_info },
{ PG_CATALOG, PG_INDEX, vec![0], read_index_info },
{ PG_CATALOG, PG_OPCLASS, vec![0], read_opclass_info },
{ PG_CATALOG, PG_COLLATION, vec![0], read_collation_info },
{ PG_CATALOG, PG_AM, vec![0], read_am_info },
{ PG_CATALOG, PG_OPERATOR, vec![0], read_operator_info },
{ PG_CATALOG, PG_VIEWS, vec![], read_views_info },
{ INFORMATION_SCHEMA, COLUMNS, vec![], read_columns_info },
{ INFORMATION_SCHEMA, TABLES, vec![], read_tables_info },
}
42 changes: 42 additions & 0 deletions src/frontend/src/catalog/system_catalog/pg_catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ pub mod pg_opclass;
pub mod pg_operator;
pub mod pg_type;
pub mod pg_user;
pub mod pg_views;

use std::collections::HashMap;

Expand All @@ -38,6 +39,7 @@ pub use pg_opclass::*;
pub use pg_operator::*;
pub use pg_type::*;
pub use pg_user::*;
pub use pg_views::*;
use risingwave_common::array::Row;
use risingwave_common::error::Result;
use risingwave_common::types::ScalarImpl;
Expand Down Expand Up @@ -261,11 +263,25 @@ impl SysCatalogReaderImpl {
})
.collect_vec();

let views = schema
.iter_view()
.map(|view| {
Row::new(vec![
Some(ScalarImpl::Int32(view.id as i32)),
Some(ScalarImpl::Utf8(view.name().to_string())),
Some(ScalarImpl::Int32(schema_info.id as i32)),
Some(ScalarImpl::Int32(view.owner as i32)),
Some(ScalarImpl::Utf8("v".to_string())),
])
})
.collect_vec();

rows.into_iter()
.chain(mvs.into_iter())
.chain(indexes.into_iter())
.chain(sources.into_iter())
.chain(sys_tables.into_iter())
.chain(views.into_iter())
.collect_vec()
})
.collect_vec())
Expand Down Expand Up @@ -327,4 +343,30 @@ impl SysCatalogReaderImpl {

Ok(rows)
}

pub(super) fn read_views_info(&self) -> Result<Vec<Row>> {
// TODO(zehua): solve the deadlock problem.
// Get two read locks. The order must be the same as
// `FrontendObserverNode::handle_initialization_notification`.
let catalog_reader = self.catalog_reader.read_guard();
let user_info_reader = self.user_info_reader.read_guard();
let schemas = catalog_reader.iter_schemas(&self.auth_context.database)?;

Ok(schemas
.flat_map(|schema| {
schema.iter_view().map(|view| {
Row::new(vec![
Some(ScalarImpl::Utf8(schema.name())),
Some(ScalarImpl::Utf8(view.name().to_string())),
// TODO(zehua): after fix issue #6080, there must be Some.
user_info_reader
.get_user_name_by_id(view.owner)
.map(ScalarImpl::Utf8),
// TODO(zehua): may be not same as postgresql's "definition" column.
Some(ScalarImpl::Utf8(view.sql.clone())),
])
})
})
.collect_vec())
}
}
29 changes: 29 additions & 0 deletions src/frontend/src/catalog/system_catalog/pg_catalog/pg_views.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
// Copyright 2022 Singularity Data
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

use risingwave_common::types::DataType;

use crate::catalog::system_catalog::SystemCatalogColumnsDef;

/// The view `pg_views` provides access to useful information about each view in the database.
/// Ref: [`https://www.postgresql.org/docs/current/view-pg-views.html`]
///
/// `pg_views` in RisingWave doesn't contain system catalog.
pub const PG_VIEWS_TABLE_NAME: &str = "pg_views";
pub const PG_VIEWS_COLUMNS: &[SystemCatalogColumnsDef<'_>] = &[
(DataType::Varchar, "schemaname"),
(DataType::Varchar, "viewname"),
(DataType::Varchar, "viewowner"),
(DataType::Varchar, "definition"),
];
1 change: 1 addition & 0 deletions src/frontend/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
#![feature(box_patterns)]
#![feature(once_cell)]
#![feature(result_option_inspect)]
#![feature(macro_metavar_expr)]
#![recursion_limit = "256"]

#[macro_use]
Expand Down
14 changes: 3 additions & 11 deletions src/meta/src/manager/catalog/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@

use std::collections::hash_map::Entry;
use std::collections::{BTreeMap, HashMap, HashSet};
use std::marker::PhantomData;

use itertools::Itertools;
use risingwave_pb::catalog::{Database, Index, Schema, Sink, Source, Table, View};
Expand All @@ -41,7 +40,7 @@ type RelationKey = (DatabaseId, SchemaId, String);

/// [`DatabaseManager`] caches meta catalog information and maintains dependent relationship
/// between tables.
pub struct DatabaseManager<S: MetaStore> {
pub struct DatabaseManager {
/// Cached database information.
pub(super) databases: BTreeMap<DatabaseId, Database>,
/// Cached schema information.
Expand All @@ -68,15 +67,10 @@ pub struct DatabaseManager<S: MetaStore> {
pub(super) in_progress_creation_streaming_job: HashSet<TableId>,
// In-progress creating tables, including internal tables.
pub(super) in_progress_creating_tables: HashMap<TableId, Table>,

_phantom: PhantomData<S>,
}

impl<S> DatabaseManager<S>
where
S: MetaStore,
{
pub async fn new(env: MetaSrvEnv<S>) -> MetaResult<Self> {
impl DatabaseManager {
pub async fn new<S: MetaStore>(env: MetaSrvEnv<S>) -> MetaResult<Self> {
let databases = Database::list(env.meta_store()).await?;
let schemas = Schema::list(env.meta_store()).await?;
let sources = Source::list(env.meta_store()).await?;
Expand Down Expand Up @@ -126,8 +120,6 @@ where
in_progress_creation_tracker: HashSet::default(),
in_progress_creation_streaming_job: HashSet::default(),
in_progress_creating_tables: HashMap::default(),

_phantom: PhantomData,
})
}

Expand Down
15 changes: 6 additions & 9 deletions src/meta/src/manager/catalog/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,19 +91,16 @@ pub type CatalogManagerRef<S> = Arc<CatalogManager<S>>;
/// to Meta.
pub struct CatalogManager<S: MetaStore> {
env: MetaSrvEnv<S>,
core: Mutex<CatalogManagerCore<S>>,
core: Mutex<CatalogManagerCore>,
}

pub struct CatalogManagerCore<S: MetaStore> {
pub database: DatabaseManager<S>,
pub struct CatalogManagerCore {
pub database: DatabaseManager,
pub user: UserManager,
}

impl<S> CatalogManagerCore<S>
where
S: MetaStore,
{
async fn new(env: MetaSrvEnv<S>) -> MetaResult<Self> {
impl CatalogManagerCore {
async fn new<S: MetaStore>(env: MetaSrvEnv<S>) -> MetaResult<Self> {
let database = DatabaseManager::new(env.clone()).await?;
let user = UserManager::new(env).await?;
Ok(Self { database, user })
Expand All @@ -127,7 +124,7 @@ where
Ok(())
}

pub async fn get_catalog_core_guard(&self) -> MutexGuard<'_, CatalogManagerCore<S>> {
pub async fn get_catalog_core_guard(&self) -> MutexGuard<'_, CatalogManagerCore> {
self.core.lock().await
}
}
Expand Down

0 comments on commit daf3ea3

Please sign in to comment.