Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: honor appendOnly table config #1747

Merged
merged 2 commits into from
Oct 25, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ of features outlined in the Delta [protocol][protocol] is also [tracked](#protoc

| Writer Version | Requirement | Status |
| -------------- | --------------------------------------------- | :------------------: |
| Version 2 | Append Only Tables | [![open]][roadmap] |
| Version 2 | Append Only Tables | ![done]
| Version 2 | Column Invariants | ![done] |
| Version 3 | Enforce `delta.checkpoint.writeStatsAsJson` | [![open]][writer-rs] |
| Version 3 | Enforce `delta.checkpoint.writeStatsAsStruct` | [![open]][writer-rs] |
Expand Down
20 changes: 18 additions & 2 deletions rust/src/operations/delete.rs
Original file line number Diff line number Diff line change
Expand Up @@ -312,11 +312,14 @@ impl std::future::IntoFuture for DeleteBuilder {

#[cfg(test)]
mod tests {

use crate::operations::DeltaOps;
use crate::protocol::*;
use crate::writer::test_utils::datafusion::get_data;
use crate::writer::test_utils::{get_arrow_schema, get_delta_schema};
use crate::writer::test_utils::{
get_arrow_schema, get_delta_schema, get_record_batch, setup_table_with_configuration,
write_batch,
};
use crate::DeltaConfigKey;
use crate::DeltaTable;
use arrow::array::Int32Array;
use arrow::datatypes::{Field, Schema};
Expand All @@ -339,6 +342,19 @@ mod tests {
table
}

#[tokio::test]
async fn test_delete_when_delta_table_is_append_only() {
let table = setup_table_with_configuration(DeltaConfigKey::AppendOnly, Some("true")).await;
let batch = get_record_batch(None, false);
// append some data
let table = write_batch(table, batch).await;
// delete
let _err = DeltaOps(table)
.delete()
.await
.expect_err("Remove action is included when Delta table is append-only. Should error");
}

#[tokio::test]
async fn test_delete_default() {
let schema = get_arrow_schema(&None);
Expand Down
43 changes: 31 additions & 12 deletions rust/src/operations/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -532,7 +532,7 @@ impl MergeOperationConfig {
}
}

#[derive(Default, Serialize)]
#[derive(Default, Serialize, Debug)]
/// Metrics for the Merge Operation
pub struct MergeMetrics {
/// Number of rows in the source data
Expand Down Expand Up @@ -1245,12 +1245,13 @@ impl std::future::IntoFuture for MergeBuilder {

#[cfg(test)]
mod tests {

use crate::operations::DeltaOps;
use crate::protocol::*;
use crate::writer::test_utils::datafusion::get_data;
use crate::writer::test_utils::get_arrow_schema;
use crate::writer::test_utils::get_delta_schema;
use crate::writer::test_utils::setup_table_with_configuration;
use crate::DeltaConfigKey;
use crate::DeltaTable;
use arrow::datatypes::Schema as ArrowSchema;
use arrow::record_batch::RecordBatch;
Expand All @@ -1277,6 +1278,21 @@ mod tests {
table
}

#[tokio::test]
async fn test_merge_when_delta_table_is_append_only() {
let schema = get_arrow_schema(&None);
let table = setup_table_with_configuration(DeltaConfigKey::AppendOnly, Some("true")).await;
// append some data
let table = write_data(table, &schema).await;
// merge
let _err = DeltaOps(table)
.merge(merge_source(schema), col("target.id").eq(col("source.id")))
.with_source_alias("source")
.with_target_alias("target")
.await
.expect_err("Remove action is included when Delta table is append-only. Should error");
}

async fn write_data(table: DeltaTable, schema: &Arc<ArrowSchema>) -> DeltaTable {
let batch = RecordBatch::try_new(
Arc::clone(schema),
Expand All @@ -1300,14 +1316,7 @@ mod tests {
.unwrap()
}

async fn setup() -> (DeltaTable, DataFrame) {
let schema = get_arrow_schema(&None);
let table = setup_table(None).await;

let table = write_data(table, &schema).await;
assert_eq!(table.version(), 1);
assert_eq!(table.get_file_uris().count(), 1);

fn merge_source(schema: Arc<ArrowSchema>) -> DataFrame {
let ctx = SessionContext::new();
let batch = RecordBatch::try_new(
Arc::clone(&schema),
Expand All @@ -1322,8 +1331,18 @@ mod tests {
],
)
.unwrap();
let source = ctx.read_batch(batch).unwrap();
(table, source)
ctx.read_batch(batch).unwrap()
}

async fn setup() -> (DeltaTable, DataFrame) {
let schema = get_arrow_schema(&None);
let table = setup_table(None).await;

let table = write_data(table, &schema).await;
assert_eq!(table.version(), 1);
assert_eq!(table.get_file_uris().count(), 1);

(table, merge_source(schema))
}

async fn assert_merge(table: DeltaTable, metrics: MergeMetrics) {
Expand Down
1 change: 1 addition & 0 deletions rust/src/operations/restore.rs
Original file line number Diff line number Diff line change
Expand Up @@ -229,6 +229,7 @@ async fn execute(
datetime: datetime_to_restore.map(|time| -> i64 { time.timestamp_millis() }),
},
&actions,
&snapshot,
None,
)
.await?;
Expand Down
12 changes: 6 additions & 6 deletions rust/src/operations/transaction/conflict_checker.rs
Original file line number Diff line number Diff line change
Expand Up @@ -691,7 +691,7 @@ mod tests {
actions: Vec<Action>,
read_whole_table: bool,
) -> Result<(), CommitConflictError> {
let setup_actions = setup.unwrap_or_else(init_table_actions);
let setup_actions = setup.unwrap_or_else(|| init_table_actions(None));
let state = DeltaTableState::from_actions(setup_actions, 0).unwrap();
let transaction_info = TransactionInfo::new(&state, reads, &actions, read_whole_table);
let summary = WinningCommitSummary {
Expand All @@ -717,7 +717,7 @@ mod tests {
// the concurrent transaction deletes a file that the current transaction did NOT read
let file_not_read = tu::create_add_action("file_not_read", true, get_stats(1, 10));
let file_read = tu::create_add_action("file_read", true, get_stats(100, 10000));
let mut setup_actions = init_table_actions();
let mut setup_actions = init_table_actions(None);
setup_actions.push(file_not_read);
setup_actions.push(file_read);
let result = execute_test(
Expand All @@ -733,7 +733,7 @@ mod tests {
// concurrently add file, that the current transaction would not have read
let file_added = tu::create_add_action("file_added", true, get_stats(1, 10));
let file_read = tu::create_add_action("file_read", true, get_stats(100, 10000));
let mut setup_actions = init_table_actions();
let mut setup_actions = init_table_actions(None);
setup_actions.push(file_read);
let result = execute_test(
Some(setup_actions),
Expand Down Expand Up @@ -797,7 +797,7 @@ mod tests {
// delete / read
// transaction reads a file that is removed by concurrent transaction
let file_read = tu::create_add_action("file_read", true, get_stats(1, 10));
let mut setup_actions = init_table_actions();
let mut setup_actions = init_table_actions(None);
setup_actions.push(file_read);
let result = execute_test(
Some(setup_actions),
Expand Down Expand Up @@ -842,7 +842,7 @@ mod tests {
let file_part1 = tu::create_add_action("file_part1", true, get_stats(1, 10));
let file_part2 = tu::create_add_action("file_part2", true, get_stats(11, 100));
let file_part3 = tu::create_add_action("file_part3", true, get_stats(101, 1000));
let mut setup_actions = init_table_actions();
let mut setup_actions = init_table_actions(None);
setup_actions.push(file_part1);
let result = execute_test(
Some(setup_actions),
Expand All @@ -858,7 +858,7 @@ mod tests {
// `read_whole_table` should disallow any concurrent remove actions
let file_part1 = tu::create_add_action("file_part1", true, get_stats(1, 10));
let file_part2 = tu::create_add_action("file_part2", true, get_stats(11, 100));
let mut setup_actions = init_table_actions();
let mut setup_actions = init_table_actions(None);
setup_actions.push(file_part1);
let result = execute_test(
Some(setup_actions),
Expand Down
58 changes: 52 additions & 6 deletions rust/src/operations/transaction/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,11 @@ pub enum TransactionError {
/// Error returned when maximum number of commit trioals is exceeded
#[error("Failed to commit transaction: {0}")]
MaxCommitAttempts(i32),
/// The transaction includes Remove action with data change but Delta table is append-only
#[error(
"The transaction includes Remove action with data change but Delta table is append-only"
)]
DeltaTableAppendOnly,
}

impl From<TransactionError> for DeltaTableError {
Expand All @@ -68,9 +73,18 @@ impl From<TransactionError> for DeltaTableError {
// Convert actions to their json representation
fn log_entry_from_actions<'a>(
actions: impl IntoIterator<Item = &'a Action>,
read_snapshot: &DeltaTableState,
) -> Result<String, TransactionError> {
let append_only = read_snapshot.table_config().append_only();
let mut jsons = Vec::<String>::new();
for action in actions {
if append_only {
if let Action::remove(remove) = action {
if remove.data_change {
return Err(TransactionError::DeltaTableAppendOnly);
}
}
}
let json = serde_json::to_string(action)
.map_err(|e| TransactionError::SerializeLogJson { json_err: e })?;
jsons.push(json);
Expand All @@ -81,6 +95,7 @@ fn log_entry_from_actions<'a>(
pub(crate) fn get_commit_bytes(
operation: &DeltaOperation,
actions: &Vec<Action>,
read_snapshot: &DeltaTableState,
app_metadata: Option<Map<String, Value>>,
) -> Result<bytes::Bytes, TransactionError> {
if !actions.iter().any(|a| matches!(a, Action::commitInfo(..))) {
Expand All @@ -99,9 +114,13 @@ pub(crate) fn get_commit_bytes(
actions
.iter()
.chain(std::iter::once(&Action::commitInfo(commit_info))),
read_snapshot,
)?))
} else {
Ok(bytes::Bytes::from(log_entry_from_actions(actions)?))
Ok(bytes::Bytes::from(log_entry_from_actions(
actions,
read_snapshot,
)?))
}
}

Expand All @@ -112,10 +131,11 @@ pub(crate) async fn prepare_commit<'a>(
storage: &dyn ObjectStore,
operation: &DeltaOperation,
actions: &Vec<Action>,
read_snapshot: &DeltaTableState,
app_metadata: Option<Map<String, Value>>,
) -> Result<Path, TransactionError> {
// Serialize all actions that are part of this log entry.
let log_entry = get_commit_bytes(operation, actions, app_metadata)?;
let log_entry = get_commit_bytes(operation, actions, read_snapshot, app_metadata)?;

// Write delta log entry as temporary file to storage. For the actual commit,
// the temporary file is moved (atomic rename) to the delta log folder within `commit` function.
Expand Down Expand Up @@ -177,7 +197,8 @@ pub async fn commit_with_retries(
app_metadata: Option<Map<String, Value>>,
max_retries: usize,
) -> DeltaResult<i64> {
let tmp_commit = prepare_commit(storage, &operation, actions, app_metadata).await?;
let tmp_commit =
prepare_commit(storage, &operation, actions, read_snapshot, app_metadata).await?;

let mut attempt_number = 1;

Expand Down Expand Up @@ -218,9 +239,11 @@ pub async fn commit_with_retries(

#[cfg(all(test, feature = "parquet"))]
mod tests {
use self::test_utils::init_table_actions;
use self::test_utils::{create_remove_action, init_table_actions};
use super::*;
use crate::DeltaConfigKey;
use object_store::memory::InMemory;
use std::collections::HashMap;

#[test]
fn test_commit_uri_from_version() {
Expand All @@ -232,13 +255,36 @@ mod tests {

#[test]
fn test_log_entry_from_actions() {
let actions = init_table_actions();
let entry = log_entry_from_actions(&actions).unwrap();
let actions = init_table_actions(None);
let state = DeltaTableState::from_actions(actions.clone(), 0).unwrap();
let entry = log_entry_from_actions(&actions, &state).unwrap();
let lines: Vec<_> = entry.lines().collect();
// writes every action to a line
assert_eq!(actions.len(), lines.len())
}

fn remove_action_exists_when_delta_table_is_append_only(
data_change: bool,
) -> Result<String, TransactionError> {
let remove = create_remove_action("test_append_only", data_change);
let mut actions = init_table_actions(Some(HashMap::from([(
DeltaConfigKey::AppendOnly.as_ref().to_string(),
Some("true".to_string()),
)])));
actions.push(remove);
let state =
DeltaTableState::from_actions(actions.clone(), 0).expect("Failed to get table state");
log_entry_from_actions(&actions, &state)
}

#[test]
fn test_remove_action_exists_when_delta_table_is_append_only() {
let _err = remove_action_exists_when_delta_table_is_append_only(true)
.expect_err("Remove action is included when Delta table is append-only. Should error");
let _actions = remove_action_exists_when_delta_table_is_append_only(false)
.expect("Data is not changed by the Remove action. Should succeed");
}

#[tokio::test]
async fn test_try_commit_transaction() {
let store = InMemory::new();
Expand Down
4 changes: 2 additions & 2 deletions rust/src/operations/transaction/state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -334,7 +334,7 @@ mod tests {

#[test]
fn test_parse_predicate_expression() {
let snapshot = DeltaTableState::from_actions(init_table_actions(), 0).unwrap();
let snapshot = DeltaTableState::from_actions(init_table_actions(None), 0).unwrap();
let session = SessionContext::new();
let state = session.state();

Expand All @@ -361,7 +361,7 @@ mod tests {

#[test]
fn test_files_matching_predicate() {
let mut actions = init_table_actions();
let mut actions = init_table_actions(None);
actions.push(create_add_action("excluded", true, Some("{\"numRecords\":10,\"minValues\":{\"value\":1},\"maxValues\":{\"value\":10},\"nullCount\":{\"value\":0}}".into())));
actions.push(create_add_action("included-1", true, Some("{\"numRecords\":10,\"minValues\":{\"value\":1},\"maxValues\":{\"value\":100},\"nullCount\":{\"value\":0}}".into())));
actions.push(create_add_action("included-2", true, Some("{\"numRecords\":10,\"minValues\":{\"value\":-10},\"maxValues\":{\"value\":3},\"nullCount\":{\"value\":0}}".into())));
Expand Down
10 changes: 5 additions & 5 deletions rust/src/operations/transaction/test_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ pub fn create_metadata_action(
Action::metaData(MetaData::try_from(metadata).unwrap())
}

pub fn init_table_actions() -> Vec<Action> {
pub fn init_table_actions(configuration: Option<HashMap<String, Option<String>>>) -> Vec<Action> {
let raw = r#"
{
"timestamp": 1670892998177,
Expand All @@ -96,7 +96,7 @@ pub fn init_table_actions() -> Vec<Action> {
vec![
Action::commitInfo(commit_info),
create_protocol_action(None, None),
create_metadata_action(None, None),
create_metadata_action(None, configuration),
]
}

Expand Down Expand Up @@ -127,7 +127,7 @@ pub async fn create_initialized_table(
HashMap::new(),
),
]);
let state = DeltaTableState::from_actions(init_table_actions(), 0).unwrap();
let state = DeltaTableState::from_actions(init_table_actions(None), 0).unwrap();
let operation = DeltaOperation::Create {
mode: SaveMode::ErrorIfExists,
location: "location".into(),
Expand All @@ -144,8 +144,8 @@ pub async fn create_initialized_table(
configuration.unwrap_or_default(),
),
};
let actions = init_table_actions();
let prepared_commit = prepare_commit(storage.as_ref(), &operation, &actions, None)
let actions = init_table_actions(None);
let prepared_commit = prepare_commit(storage.as_ref(), &operation, &actions, &state, None)
.await
.unwrap();
try_commit_transaction(storage.as_ref(), &prepared_commit, 0)
Expand Down
Loading