Skip to content

Commit

Permalink
Merge pull request #3982 from benesch/coord-refactor
Browse files Browse the repository at this point in the history
coord: major cleanup
  • Loading branch information
benesch authored Aug 19, 2020
2 parents 2dbf323 + 8dd1ca8 commit 3623229
Show file tree
Hide file tree
Showing 11 changed files with 874 additions and 1,089 deletions.
112 changes: 104 additions & 8 deletions src/coord/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

use std::collections::{BTreeMap, HashMap, HashSet};
use std::iter;
use std::path::Path;
use std::sync::{Arc, Mutex, MutexGuard};
use std::time::SystemTime;

Expand Down Expand Up @@ -72,7 +73,7 @@ pub const MZ_CATALOG_SCHEMA: &str = "mz_catalog";
pub struct Catalog {
by_name: BTreeMap<String, Database>,
by_id: BTreeMap<GlobalId, CatalogEntry>,
indexes: HashMap<GlobalId, Vec<Vec<ScalarExpr>>>,
indexes: HashMap<GlobalId, Vec<(GlobalId, Vec<ScalarExpr>)>>,
ambient_schemas: BTreeMap<String, Schema>,
temporary_schemas: HashMap<u32, Schema>,
storage: Arc<Mutex<storage::Connection>>,
Expand Down Expand Up @@ -739,12 +740,19 @@ impl Catalog {
}
}

if let CatalogItem::Index(index) = entry.item() {
self.indexes
.entry(index.on)
.or_insert_with(Vec::new)
.push(index.keys.clone());
match entry.item() {
CatalogItem::Table(_) | CatalogItem::Source(_) | CatalogItem::View(_) => {
self.indexes.insert(id, vec![]);
}
CatalogItem::Index(index) => {
self.indexes
.get_mut(&index.on)
.unwrap()
.push((id, index.keys.clone()));
}
CatalogItem::Sink(_) => (),
}

let conn_id = entry.item().conn_id().unwrap_or(SYSTEM_CONN_ID);
self.get_schema_mut(&entry.name.database, &entry.name.schema, conn_id)
.expect("catalog out of sync")
Expand Down Expand Up @@ -1115,9 +1123,11 @@ impl Catalog {
.expect("catalog out of sync");
let i = indexes
.iter()
.position(|keys| keys == &index.keys)
.position(|(idx_id, _keys)| *idx_id == id)
.expect("catalog out of sync");
indexes.remove(i);
} else {
self.indexes.remove(&id);
}
OpStatus::DroppedItem(metadata)
}
Expand Down Expand Up @@ -1254,10 +1264,73 @@ impl Catalog {

/// Returns a mapping that indicates all indices that are available for
/// each item in the catalog.
pub fn indexes(&self) -> &HashMap<GlobalId, Vec<Vec<ScalarExpr>>> {
pub fn indexes(&self) -> &HashMap<GlobalId, Vec<(GlobalId, Vec<ScalarExpr>)>> {
&self.indexes
}

/// Finds the nearest indexes that can satisfy the views or sources whose
/// identifiers are listed in `ids`.
///
/// Returns the identifiers of all discovered indexes, along with a boolean
/// indicating whether the set of indexes is complete. If incomplete, then
/// one of the provided identifiers transitively depends on an
/// unmaterialized source.
pub fn nearest_indexes(&self, ids: &[GlobalId]) -> (Vec<GlobalId>, bool) {
fn inner(
catalog: &Catalog,
id: GlobalId,
indexes: &mut Vec<GlobalId>,
complete: &mut bool,
) {
// If an index exists for `id`, record it in the output set and stop
// searching.
if let Some((index_id, _)) = catalog.indexes[&id].first() {
indexes.push(*index_id);
return;
}

match catalog.get_by_id(&id).item() {
view @ CatalogItem::View(_) => {
// Unmaterialized view. Recursively search its dependencies.
for id in view.uses() {
inner(catalog, id, indexes, complete)
}
}
CatalogItem::Source(_) => {
// Unmaterialized source. Record that we are missing at
// least one index.
*complete = false;
}
CatalogItem::Table(_) => {
unreachable!("tables always have at least one index");
}
CatalogItem::Sink(_) | CatalogItem::Index(_) => {
unreachable!("sinks and indexes cannot be depended upon");
}
}
}

let mut indexes = vec![];
let mut complete = true;
for id in ids {
inner(self, *id, &mut indexes, &mut complete)
}
indexes.sort();
indexes.dedup();
(indexes, complete)
}

pub fn uses_tables(&self, id: GlobalId) -> bool {
match self.get_by_id(&id).item() {
CatalogItem::Table(_) => true,
CatalogItem::Source(_) => false,
item @ CatalogItem::View(_) => item.uses().into_iter().any(|id| self.uses_tables(id)),
CatalogItem::Sink(_) | CatalogItem::Index(_) => {
unreachable!("sinks and indexes cannot be depended upon");
}
}
}

pub fn dump(&self) -> String {
serde_json::to_string(&self.by_name).expect("serialization cannot fail")
}
Expand Down Expand Up @@ -1346,6 +1419,20 @@ impl From<PlanContext> for SerializedPlanContext {
}
}

/// Loads the catalog stored at `path` and returns its serialized state.
///
/// There are no guarantees about the format of the serialized state, except
/// that the serialized state for two identical catalogs will compare
/// identically.
pub fn dump(path: &Path) -> Result<String, anyhow::Error> {
let catalog = Catalog::open(Config {
path: Some(path),
enable_logging: true,
experimental_mode: None,
})?;
Ok(catalog.dump())
}

impl sql::catalog::Catalog for ConnCatalog<'_> {
fn startup_time(&self) -> SystemTime {
self.catalog.startup_time
Expand Down Expand Up @@ -1436,6 +1523,15 @@ impl sql::catalog::Catalog for ConnCatalog<'_> {
fn experimental_mode(&self) -> bool {
self.catalog.experimental_mode
}

fn is_queryable(&self, id: GlobalId) -> bool {
let (_, complete) = self.catalog.nearest_indexes(&[id]);
complete
}

fn is_materialized(&self, id: GlobalId) -> bool {
!self.catalog.indexes[&id].is_empty()
}
}

impl sql::catalog::CatalogItem for CatalogEntry {
Expand Down
7 changes: 0 additions & 7 deletions src/coord/src/catalog/builtin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -456,11 +456,4 @@ impl BUILTINS {
_ => None,
})
}

pub fn tables(&self) -> impl Iterator<Item = &'static BuiltinTable> + '_ {
self.values().filter_map(|b| match b {
Builtin::Table(table) => Some(*table),
_ => None,
})
}
}
Loading

0 comments on commit 3623229

Please sign in to comment.