Skip to content

Commit

Permalink
feat: extend configuration handling (#1206)
Browse files Browse the repository at this point in the history
# Description

This PR refactors and extends the table configuration handling. The
approach is analogous to how we and object_store handle configuration
via storage properties. The idea is to provide a somewhat typed layer
over the untyped configuration keys.

There was one surprising thing along the way. From what I can tell, we
may have been omitting the `delta.` prefix on the config keys we parse.
So this would definitely be breaking behaviour, since we no longer
recognize keys we were parsing before. We can in principle handle
aliases for keys quite easily, but I was not sure what the desired
behaviour is.

cc @rtyler @xianwill - This change would probably affect
`kafka-delta-ingest`, so especially interested in your opinions!

# Related Issue(s)

part of #632

# Documentation

<!---
Share links to useful documentation
--->
  • Loading branch information
roeap authored Mar 6, 2023
1 parent 901292c commit 01896f5
Show file tree
Hide file tree
Showing 7 changed files with 380 additions and 229 deletions.
446 changes: 331 additions & 115 deletions rust/src/delta_config.rs

Large diffs are not rendered by default.

1 change: 0 additions & 1 deletion rust/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,7 +91,6 @@ pub mod operations;
pub mod partitions;
pub mod schema;
pub mod storage;
pub mod table_properties;
pub mod table_state;
pub mod time_utils;

Expand Down
21 changes: 14 additions & 7 deletions rust/src/operations/create.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ use super::transaction::commit;
use super::{MAX_SUPPORTED_READER_VERSION, MAX_SUPPORTED_WRITER_VERSION};
use crate::action::{Action, DeltaOperation, MetaData, Protocol, SaveMode};
use crate::builder::ensure_table_uri;
use crate::delta_config::DeltaConfigKey;
use crate::schema::{SchemaDataType, SchemaField, SchemaTypeStruct};
use crate::storage::DeltaObjectStore;
use crate::{DeltaResult, DeltaTable, DeltaTableError};
Expand Down Expand Up @@ -152,19 +153,25 @@ impl CreateBuilder {
}

/// Set configuration on created table
pub fn with_configuration(mut self, configuration: HashMap<String, Option<String>>) -> Self {
self.configuration = configuration;
pub fn with_configuration(
mut self,
configuration: HashMap<DeltaConfigKey, Option<impl Into<String>>>,
) -> Self {
self.configuration = configuration
.into_iter()
.map(|(k, v)| (k.as_ref().into(), v.map(|s| s.into())))
.collect();
self
}

/// Specify a table property in the table configuration
pub fn with_configuration_property(
mut self,
key: impl Into<String>,
key: DeltaConfigKey,
value: Option<impl Into<String>>,
) -> Self {
self.configuration
.insert(key.into(), value.map(|v| v.into()));
.insert(key.as_ref().into(), value.map(|v| v.into()));
self
}

Expand Down Expand Up @@ -311,8 +318,8 @@ impl std::future::IntoFuture for CreateBuilder {
#[cfg(all(test, feature = "parquet"))]
mod tests {
use super::*;
use crate::delta_config::DeltaConfigKey;
use crate::operations::DeltaOps;
use crate::table_properties::APPEND_ONLY;
use crate::writer::test_utils::get_delta_schema;
use tempdir::TempDir;

Expand Down Expand Up @@ -396,14 +403,14 @@ mod tests {
let table = CreateBuilder::new()
.with_location("memory://")
.with_columns(schema.get_fields().clone())
.with_configuration_property(APPEND_ONLY, Some("true"))
.with_configuration_property(DeltaConfigKey::AppendOnly, Some("true"))
.await
.unwrap();
let append = table
.get_metadata()
.unwrap()
.configuration
.get(APPEND_ONLY)
.get(DeltaConfigKey::AppendOnly.as_ref())
.unwrap()
.as_ref()
.unwrap()
Expand Down
26 changes: 3 additions & 23 deletions rust/src/operations/optimize.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ use crate::storage::ObjectStoreRef;
use crate::table_state::DeltaTableState;
use crate::writer::utils::arrow_schema_without_partitions;
use crate::writer::utils::PartitionPath;
use crate::{table_properties, DeltaDataTypeVersion};
use crate::DeltaDataTypeVersion;
use crate::{
DeltaDataTypeLong, DeltaResult, DeltaTable, DeltaTableError, ObjectMeta, PartitionFilter,
};
use arrow::datatypes::{Schema as ArrowSchema, SchemaRef as ArrowSchemaRef};
use futures::future::BoxFuture;
use futures::StreamExt;
use log::{debug, error};
use log::debug;
use parquet::arrow::async_reader::{ParquetObjectReader, ParquetRecordBatchStreamBuilder};
use parquet::file::properties::WriterProperties;
use serde::{Deserialize, Serialize};
Expand Down Expand Up @@ -380,34 +380,14 @@ impl MergePlan {
}
}

fn get_target_file_size(snapshot: &DeltaTableState) -> DeltaDataTypeLong {
let mut target_size = 268_435_456;
if let Some(meta) = snapshot.current_metadata() {
let config_str = meta.configuration.get(table_properties::TARGET_FILE_SIZE);
if let Some(s) = config_str {
if let Some(s) = s {
let r = s.parse::<i64>();
if let Ok(size) = r {
target_size = size;
} else {
error!("Unable to parse value of 'delta.targetFileSize'. Using default value");
}
} else {
error!("Check your configuration of 'delta.targetFileSize'. Using default value");
}
}
}
target_size
}

/// Build a Plan on which files to merge together. See [OptimizeBuilder]
pub fn create_merge_plan(
snapshot: &DeltaTableState,
filters: &[PartitionFilter<'_, &str>],
target_size: Option<DeltaDataTypeLong>,
writer_properties: WriterProperties,
) -> Result<MergePlan, DeltaTableError> {
let target_size = target_size.unwrap_or_else(|| get_target_file_size(snapshot));
let target_size = target_size.unwrap_or_else(|| snapshot.table_config().target_file_size());
let mut candidates = HashMap::new();
let mut operations: HashMap<PartitionPath, PartitionMergePlan> = HashMap::new();
let mut metrics = Metrics::default();
Expand Down
69 changes: 0 additions & 69 deletions rust/src/table_properties.rs

This file was deleted.

34 changes: 25 additions & 9 deletions rust/src/table_state.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
//! The module for delta table state.
use crate::action::{self, Action, Add};
use crate::delta_config;
use crate::delta_config::TableConfig;
use crate::partitions::{DeltaTablePartition, PartitionFilter};
use crate::schema::SchemaDataType;
use crate::Schema;
use crate::{
ApplyLogError, DeltaDataTypeLong, DeltaDataTypeVersion, DeltaTable, DeltaTableError,
DeltaTableMetaData,
};
use chrono::Utc;
use lazy_static::lazy_static;
use object_store::{path::Path, ObjectStore};
use serde::{Deserialize, Serialize};
use serde_json::{Map, Value};
Expand Down Expand Up @@ -228,6 +230,22 @@ impl DeltaTableState {
self.current_metadata.as_ref()
}

/// The table schema
pub fn schema(&self) -> Option<&Schema> {
self.current_metadata.as_ref().map(|m| &m.schema)
}

/// Well known table configuration
pub fn table_config(&self) -> TableConfig<'_> {
lazy_static! {
static ref DUMMY_CONF: HashMap<String, Option<String>> = HashMap::new();
}
self.current_metadata
.as_ref()
.map(|meta| TableConfig(&meta.configuration))
.unwrap_or_else(|| TableConfig(&DUMMY_CONF))
}

/// Merges new state information into our state
///
/// The DeltaTableState also carries the version information for the given state,
Expand Down Expand Up @@ -322,14 +340,12 @@ impl DeltaTableState {
action::Action::metaData(v) => {
let md = DeltaTableMetaData::try_from(v)
.map_err(|e| ApplyLogError::InvalidJson { source: e })?;
self.tombstone_retention_millis = delta_config::TOMBSTONE_RETENTION
.get_interval_from_metadata(&md)?
.as_millis() as i64;
self.log_retention_millis = delta_config::LOG_RETENTION
.get_interval_from_metadata(&md)?
.as_millis() as i64;
self.enable_expired_log_cleanup =
delta_config::ENABLE_EXPIRED_LOG_CLEANUP.get_boolean_from_metadata(&md)?;
let table_config = TableConfig(&md.configuration);
self.tombstone_retention_millis =
table_config.deleted_file_retention_duration().as_millis() as i64;
self.log_retention_millis =
table_config.log_retention_duration().as_millis() as i64;
self.enable_expired_log_cleanup = table_config.enable_expired_log_cleanup();
self.current_metadata = Some(md);
}
action::Action::txn(v) => {
Expand Down
12 changes: 7 additions & 5 deletions rust/tests/checkpoint_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,7 @@ mod delete_expired_delta_log_in_checkpoint {

use ::object_store::path::Path as ObjectStorePath;
use chrono::Utc;
use deltalake::delta_config::DeltaConfigKey;
use deltalake::*;
use maplit::hashmap;

Expand All @@ -99,8 +100,8 @@ mod delete_expired_delta_log_in_checkpoint {
let mut table = fs_common::create_table(
"./tests/data/checkpoints_with_expired_logs/expired",
Some(hashmap! {
delta_config::LOG_RETENTION.key.clone() => Some("interval 10 minute".to_string()),
delta_config::ENABLE_EXPIRED_LOG_CLEANUP.key.clone() => Some("true".to_string())
DeltaConfigKey::LogRetentionDuration.as_ref().into() => Some("interval 10 minute".to_string()),
DeltaConfigKey::EnableExpiredLogCleanup.as_ref().into() => Some("true".to_string())
}),
)
.await;
Expand Down Expand Up @@ -163,8 +164,8 @@ mod delete_expired_delta_log_in_checkpoint {
let mut table = fs_common::create_table(
"./tests/data/checkpoints_with_expired_logs/not_delete_expired",
Some(hashmap! {
delta_config::LOG_RETENTION.key.clone() => Some("interval 1 second".to_string()),
delta_config::ENABLE_EXPIRED_LOG_CLEANUP.key.clone() => Some("false".to_string())
DeltaConfigKey::LogRetentionDuration.as_ref().into() => Some("interval 1 second".to_string()),
DeltaConfigKey::EnableExpiredLogCleanup.as_ref().into() => Some("false".to_string())
}),
)
.await;
Expand Down Expand Up @@ -212,6 +213,7 @@ mod checkpoints_with_tombstones {
use ::object_store::path::Path as ObjectStorePath;
use chrono::Utc;
use deltalake::action::*;
use deltalake::delta_config::DeltaConfigKey;
use deltalake::*;
use maplit::hashmap;
use parquet::file::reader::{FileReader, SerializedFileReader};
Expand All @@ -237,7 +239,7 @@ mod checkpoints_with_tombstones {
#[tokio::test]
async fn test_expired_tombstones() {
let mut table = fs_common::create_table("./tests/data/checkpoints_tombstones/expired", Some(hashmap! {
delta_config::TOMBSTONE_RETENTION.key.clone() => Some("interval 1 minute".to_string())
DeltaConfigKey::DeletedFileRetentionDuration.as_ref().into() => Some("interval 1 minute".to_string())
})).await;

let a1 = fs_common::add(3 * 60 * 1000); // 3 mins ago,
Expand Down

0 comments on commit 01896f5

Please sign in to comment.