Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: support update for memory catalog #1002

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
141 changes: 124 additions & 17 deletions crates/catalog/memory/src/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,30 @@ impl MemoryCatalog {
warehouse_location,
}
}

fn new_metadata_location(&self, location: &str) -> String {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This should include version increment, and be in the format as defined in the spec.

format!("{}/metadata/{}.metadata.json", location, Uuid::new_v4())
}

async fn commit_table(
&self,
table_ident: &TableIdent,
next_metadata: TableMetadata,
) -> Result<()> {
let mut root_namespace_state = self.root_namespace_state.lock().await;

let table_metadata_dir =
root_namespace_state.get_existing_table_metadata_dir(table_ident)?;
let metadata_location = self.new_metadata_location(table_metadata_dir);
self.file_io
.new_output(&metadata_location)?
.write(serde_json::to_vec(&next_metadata)?.into())
.await?;

root_namespace_state.update_table(table_ident, metadata_location)?;

Ok(())
}
}

#[async_trait]
Expand Down Expand Up @@ -197,19 +221,14 @@ impl Catalog for MemoryCatalog {
let metadata = TableMetadataBuilder::from_table_creation(table_creation)?
.build()?
.metadata;
let metadata_location = format!(
"{}/metadata/{}-{}.metadata.json",
&location,
0,
Uuid::new_v4()
);
let metadata_location = self.new_metadata_location(&location);

self.file_io
.new_output(&metadata_location)?
.write(serde_json::to_vec(&metadata)?.into())
.await?;

root_namespace_state.insert_new_table(&table_ident, metadata_location.clone())?;
root_namespace_state.insert_new_table(&table_ident, location, metadata_location.clone())?;

Table::builder()
.file_io(self.file_io.clone())
Expand Down Expand Up @@ -263,19 +282,39 @@ impl Catalog for MemoryCatalog {
let metadata_location = new_root_namespace_state
.get_existing_table_location(src_table_ident)?
.clone();
let metadata_dir = new_root_namespace_state
.get_existing_table_metadata_dir(src_table_ident)?
.clone();
new_root_namespace_state.remove_existing_table(src_table_ident)?;
new_root_namespace_state.insert_new_table(dst_table_ident, metadata_location)?;
new_root_namespace_state.insert_new_table(
dst_table_ident,
metadata_dir,
metadata_location,
)?;
*root_namespace_state = new_root_namespace_state;

Ok(())
}

/// Update a table to the catalog.
async fn update_table(&self, _commit: TableCommit) -> Result<Table> {
Err(Error::new(
ErrorKind::FeatureUnsupported,
"MemoryCatalog does not currently support updating tables.",
))
async fn update_table(&self, mut commit: TableCommit) -> Result<Table> {
// Apply the update to get the new metadata.
let table = self.load_table(commit.identifier()).await?;
let requirements = commit.take_requirements();
let updates = commit.take_updates();
let metadata = table.metadata().clone();
for requirement in requirements {
requirement.check(Some(&metadata))?;
}
let mut metadata_builder = metadata.into_builder(None);
for update in updates {
metadata_builder = update.apply(metadata_builder)?;
}

self.commit_table(commit.identifier(), metadata_builder.build()?.metadata)
.await?;

self.load_table(commit.identifier()).await
}
}

Expand All @@ -287,6 +326,7 @@ mod tests {

use iceberg::io::FileIOBuilder;
use iceberg::spec::{NestedField, PartitionSpec, PrimitiveType, Schema, SortOrder, Type};
use iceberg::transaction::Transaction;
use regex::Regex;
use tempfile::TempDir;

Expand Down Expand Up @@ -1035,7 +1075,7 @@ mod tests {
let table_name = "tbl1";
let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
let expected_table_metadata_location_regex = format!(
"^{}/tbl1/metadata/0-{}.metadata.json$",
"^{}/tbl1/metadata/{}.metadata.json$",
namespace_location, UUID_REGEX_STR,
);

Expand Down Expand Up @@ -1088,7 +1128,7 @@ mod tests {
let expected_table_ident =
TableIdent::new(nested_namespace_ident.clone(), table_name.into());
let expected_table_metadata_location_regex = format!(
"^{}/tbl1/metadata/0-{}.metadata.json$",
"^{}/tbl1/metadata/{}.metadata.json$",
nested_namespace_location, UUID_REGEX_STR,
);

Expand Down Expand Up @@ -1129,7 +1169,7 @@ mod tests {
let table_name = "tbl1";
let expected_table_ident = TableIdent::new(namespace_ident.clone(), table_name.into());
let expected_table_metadata_location_regex = format!(
"^{}/a/tbl1/metadata/0-{}.metadata.json$",
"^{}/a/tbl1/metadata/{}.metadata.json$",
warehouse_location, UUID_REGEX_STR
);

Expand Down Expand Up @@ -1177,7 +1217,7 @@ mod tests {
let expected_table_ident =
TableIdent::new(nested_namespace_ident.clone(), table_name.into());
let expected_table_metadata_location_regex = format!(
"^{}/a/b/tbl1/metadata/0-{}.metadata.json$",
"^{}/a/b/tbl1/metadata/{}.metadata.json$",
warehouse_location, UUID_REGEX_STR
);

Expand Down Expand Up @@ -1678,4 +1718,71 @@ mod tests {
),
);
}

#[tokio::test]
async fn test_update_table() {
let catalog = new_memory_catalog();
let namespace_ident = NamespaceIdent::new("n1".into());
create_namespace(&catalog, &namespace_ident).await;
let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
create_table(&catalog, &table_ident).await;

let table = catalog.load_table(&table_ident).await.unwrap();
assert!(table.metadata().properties().is_empty());

let transaction = Transaction::new(&table);
let transaction = transaction
.set_properties(HashMap::from_iter(vec![("k".to_string(), "v".to_string())]))
.unwrap();
transaction.commit(&catalog).await.unwrap();

let table = catalog.load_table(&table_ident).await.unwrap();
assert_eq!(
table.metadata().properties(),
&HashMap::from_iter(vec![("k".to_string(), "v".to_string())])
);
}

#[tokio::test]
async fn test_update_rename_table() {
let catalog = new_memory_catalog();
let namespace_ident = NamespaceIdent::new("n1".into());
create_namespace(&catalog, &namespace_ident).await;
let table_ident = TableIdent::new(namespace_ident.clone(), "tbl1".into());
create_table(&catalog, &table_ident).await;

let table = catalog.load_table(&table_ident).await.unwrap();
assert!(table.metadata().properties().is_empty());

let transaction = Transaction::new(&table);
let transaction = transaction
.set_properties(HashMap::from_iter(vec![("k".to_string(), "v".to_string())]))
.unwrap();
transaction.commit(&catalog).await.unwrap();

let dst_table_ident = TableIdent::new(namespace_ident.clone(), "tbl2".into());
catalog
.rename_table(&table_ident, &dst_table_ident)
.await
.unwrap();

let table = catalog.load_table(&dst_table_ident).await.unwrap();
let transaction = Transaction::new(&table);
let transaction = transaction
.set_properties(HashMap::from_iter(vec![(
"k1".to_string(),
"v2".to_string(),
)]))
.unwrap();
transaction.commit(&catalog).await.unwrap();

let table = catalog.load_table(&dst_table_ident).await.unwrap();
assert_eq!(
table.metadata().properties(),
&HashMap::from_iter(vec![
("k".to_string(), "v".to_string()),
("k1".to_string(), "v2".to_string())
])
);
}
}
45 changes: 44 additions & 1 deletion crates/catalog/memory/src/namespace_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ pub(crate) struct NamespaceState {
namespaces: HashMap<String, NamespaceState>,
// Mapping of tables to metadata locations in this namespace
table_metadata_locations: HashMap<String, String>,
// Mapping of tables to metadata dir locations in this namespace
table_metadata_dirs: HashMap<String, String>,
}

fn no_such_namespace_err<T>(namespace_ident: &NamespaceIdent) -> Result<T> {
Expand Down Expand Up @@ -175,6 +177,7 @@ impl NamespaceState {
properties,
namespaces: HashMap::new(),
table_metadata_locations: HashMap::new(),
table_metadata_dirs: HashMap::new(),
});

Ok(())
Expand Down Expand Up @@ -266,6 +269,7 @@ impl NamespaceState {
pub(crate) fn insert_new_table(
&mut self,
table_ident: &TableIdent,
table_metadata_dir: String,
metadata_location: String,
) -> Result<()> {
let namespace = self.get_mut_namespace(table_ident.namespace())?;
Expand All @@ -277,9 +281,45 @@ impl NamespaceState {
hash_map::Entry::Occupied(_) => table_already_exists_err(table_ident),
hash_map::Entry::Vacant(entry) => {
let _ = entry.insert(metadata_location);
let dir = namespace
.table_metadata_dirs
.insert(table_ident.name().to_string(), table_metadata_dir);
// New table should not have a metadata dir.
assert_eq!(dir, None);
Ok(())
}
}
}

pub(crate) fn update_table(
&mut self,
table_ident: &TableIdent,
metadata_location: String,
) -> Result<()> {
let namespace = self.get_mut_namespace(table_ident.namespace())?;

match namespace
.table_metadata_locations
.entry(table_ident.name().to_string())
{
hash_map::Entry::Occupied(mut entry) => {
let _ = entry.insert(metadata_location);
Ok(())
}
hash_map::Entry::Vacant(_) => no_such_table_err(table_ident),
}
}

/// Return the metadata dir of the given table or an error if doesn't exist
pub(crate) fn get_existing_table_metadata_dir(
&self,
table_ident: &TableIdent,
) -> Result<&String> {
let namespace = self.get_namespace(table_ident.namespace())?;

match namespace.table_metadata_dirs.get(table_ident.name()) {
None => no_such_table_err(table_ident),
Some(table_metadata_dir) => Ok(table_metadata_dir),
}
}

Expand All @@ -292,7 +332,10 @@ impl NamespaceState {
.remove(table_ident.name())
{
None => no_such_table_err(table_ident),
Some(metadata_location) => Ok(metadata_location),
Some(metadata_location) => {
let _ = namespace.table_metadata_dirs.remove(table_ident.name());
Ok(metadata_location)
}
}
}
}
Loading