Skip to content

Commit

Permalink
fix: duplicate files after update (#136)
Browse files Browse the repository at this point in the history
* move internal table state field into a new `DeltaTableState` struct
* clear state on `restore_checkpoint`
  • Loading branch information
dispanser authored Mar 19, 2021
1 parent 7ae4f69 commit 5b6e98f
Show file tree
Hide file tree
Showing 42 changed files with 122 additions and 64 deletions.
1 change: 1 addition & 0 deletions rust/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -58,3 +58,4 @@ s3 = ["rusoto_core", "rusoto_credential", "rusoto_s3", "rusoto_sts"]
[dev-dependencies]
utime = "0.3"
serial_test = "*"
pretty_assertions = "0"
90 changes: 54 additions & 36 deletions rust/src/delta.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,23 +178,29 @@ impl From<StorageError> for LoadCheckpointError {
}
}

pub struct DeltaTable {
pub version: DeltaDataTypeVersion,
#[derive(Default)]
struct DeltaTableState {
// A remove action should remain in the state of the table as a tombstone until it has expired
// vacuum operation is responsible for providing the retention threshold
pub tombstones: Vec<action::Remove>,
pub min_reader_version: i32,
pub min_writer_version: i32,
tombstones: Vec<action::Remove>,
files: Vec<String>,
commit_infos: Vec<Value>,
app_transaction_version: HashMap<String, DeltaDataTypeVersion>,
min_reader_version: i32,
min_writer_version: i32,
current_metadata: Option<DeltaTableMetaData>,
}

pub struct DeltaTable {
pub version: DeltaDataTypeVersion,
pub table_path: String,

state: DeltaTableState,

// metadata
// application_transactions
storage: Box<dyn StorageBackend>,

files: Vec<String>,
app_transaction_version: HashMap<String, DeltaDataTypeVersion>,
commit_infos: Vec<Value>,
current_metadata: Option<DeltaTableMetaData>,
last_check_point: Option<CheckPoint>,
log_path: String,
version_timestamp: HashMap<DeltaDataTypeVersion, i64>,
Expand Down Expand Up @@ -239,21 +245,24 @@ impl DeltaTable {
Ok(serde_json::from_slice(&data)?)
}

fn process_action(&mut self, action: &Action) -> Result<(), serde_json::error::Error> {
fn process_action(
state: &mut DeltaTableState,
action: &Action,
) -> Result<(), serde_json::error::Error> {
match action {
Action::add(v) => {
self.files.push(v.path.clone());
state.files.push(v.path.clone());
}
Action::remove(v) => {
self.files.retain(|e| *e != v.path);
self.tombstones.push(v.clone());
state.files.retain(|e| *e != v.path);
state.tombstones.push(v.clone());
}
Action::protocol(v) => {
self.min_reader_version = v.minReaderVersion;
self.min_writer_version = v.minWriterVersion;
state.min_reader_version = v.minReaderVersion;
state.min_writer_version = v.minWriterVersion;
}
Action::metaData(v) => {
self.current_metadata = Some(DeltaTableMetaData {
state.current_metadata = Some(DeltaTableMetaData {
id: v.id.clone(),
name: v.name.clone(),
description: v.description.clone(),
Expand All @@ -264,12 +273,13 @@ impl DeltaTable {
});
}
Action::txn(v) => {
self.app_transaction_version
state
.app_transaction_version
.entry(v.appId.clone())
.or_insert(v.version);
}
Action::commitInfo(v) => {
self.commit_infos.push(v.clone());
state.commit_infos.push(v.clone());
}
}

Expand Down Expand Up @@ -342,7 +352,7 @@ impl DeltaTable {
) -> Result<(), ApplyLogError> {
for line in reader.lines() {
let action: Action = serde_json::from_str(line?.as_str())?;
self.process_action(&action)?;
DeltaTable::process_action(&mut self.state, &action)?;
}

Ok(())
Expand All @@ -359,6 +369,7 @@ impl DeltaTable {
async fn restore_checkpoint(&mut self, check_point: CheckPoint) -> Result<(), DeltaTableError> {
let checkpoint_data_paths = self.get_checkpoint_data_paths(&check_point);
// process actions from checkpoint
self.state = DeltaTableState::default();
for f in &checkpoint_data_paths {
let obj = self.storage.get_obj(&f).await?;
let preader = SerializedFileReader::new(SliceableCursor::new(obj))?;
Expand All @@ -369,7 +380,10 @@ impl DeltaTable {
)));
}
for record in preader.get_row_iter(None)? {
self.process_action(&Action::from_parquet_record(&schema, &record)?)?;
DeltaTable::process_action(
&mut self.state,
&Action::from_parquet_record(&schema, &record)?,
)?;
}
}

Expand Down Expand Up @@ -548,32 +562,42 @@ impl DeltaTable {
}

pub fn get_files(&self) -> &Vec<String> {
&self.files
&self.state.files
}

pub fn get_file_paths(&self) -> Vec<String> {
self.files
self.state
.files
.iter()
.map(|fname| self.storage.join_path(&self.table_path, fname))
.collect()
}

pub fn get_metadata(&self) -> Result<&DeltaTableMetaData, DeltaTableError> {
self.current_metadata
self.state
.current_metadata
.as_ref()
.ok_or(DeltaTableError::NoMetadata)
}

pub fn get_tombstones(&self) -> &Vec<action::Remove> {
&self.tombstones
&self.state.tombstones
}

pub fn get_app_transaction_version(&self) -> &HashMap<String, DeltaDataTypeVersion> {
&self.app_transaction_version
&self.state.app_transaction_version
}

pub fn get_min_reader_version(&self) -> i32 {
self.state.min_reader_version
}

pub fn get_min_writer_version(&self) -> i32 {
self.state.min_writer_version
}

pub fn schema(&self) -> Option<&Schema> {
self.current_metadata.as_ref().map(|m| &m.schema)
self.state.current_metadata.as_ref().map(|m| &m.schema)
}

pub fn get_schema(&self) -> Result<&Schema, DeltaTableError> {
Expand All @@ -594,15 +618,9 @@ impl DeltaTable {
let log_path_normalized = storage_backend.join_path(table_path, "_delta_log");
Ok(Self {
version: 0,
files: Vec::new(),
state: DeltaTableState::default(),
storage: storage_backend,
tombstones: Vec::new(),
table_path: table_path.to_string(),
min_reader_version: 0,
min_writer_version: 0,
current_metadata: None,
commit_infos: Vec::new(),
app_transaction_version: HashMap::new(),
last_check_point: None,
log_path: log_path_normalized,
version_timestamp: HashMap::new(),
Expand Down Expand Up @@ -650,7 +668,7 @@ impl fmt::Display for DeltaTable {
fn fmt(&self, f: &mut fmt::Formatter) -> fmt::Result {
writeln!(f, "DeltaTable({})", self.table_path)?;
writeln!(f, "\tversion: {}", self.version)?;
match self.current_metadata.as_ref() {
match self.state.current_metadata.as_ref() {
Some(metadata) => {
writeln!(f, "\tmetadata: {}", metadata)?;
}
Expand All @@ -661,9 +679,9 @@ impl fmt::Display for DeltaTable {
writeln!(
f,
"\tmin_version: read={}, write={}",
self.min_reader_version, self.min_writer_version
self.state.min_reader_version, self.state.min_writer_version
)?;
writeln!(f, "\tfiles count: {}", self.files.len())
writeln!(f, "\tfiles count: {}", self.state.files.len())
}
}

Expand Down
4 changes: 2 additions & 2 deletions rust/tests/azure_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,8 +16,8 @@ mod azure {
.await
.unwrap();
assert_eq!(table.version, 4);
assert_eq!(table.min_writer_version, 2);
assert_eq!(table.min_reader_version, 1);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(
table.get_files(),
&vec![
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
{"commitInfo":{"timestamp":1615751699523,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"442","numOutputRows":"1"}}}
{"protocol":{"minReaderVersion":1,"minWriterVersion":2}}
{"metaData":{"id":"cf3741a3-5f93-434f-99ac-9a4bebcdf06c","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"version\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":[],"configuration":{},"createdTime":1615751699422}}
{"add":{"path":"part-00000-3810fbe0-9892-431d-bcfd-7de5788dfe8d-c000.snappy.parquet","partitionValues":{},"size":442,"modificationTime":1615751699515,"dataChange":true}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1615751700281,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":0,"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"442","numOutputRows":"1"}}}
{"add":{"path":"part-00000-1abe25d3-0da6-46c5-98c1-7a69872fd797-c000.snappy.parquet","partitionValues":{},"size":442,"modificationTime":1615751700275,"dataChange":true}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1615751701120,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":1,"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"442","numOutputRows":"1"}}}
{"add":{"path":"part-00000-136c36f5-639d-4e95-bb0f-15cde3fb14eb-c000.snappy.parquet","partitionValues":{},"size":442,"modificationTime":1615751701112,"dataChange":true}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1615751701854,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":2,"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"442","numOutputRows":"1"}}}
{"add":{"path":"part-00000-8e7dc8c1-337b-40b8-a411-46d4295da531-c000.snappy.parquet","partitionValues":{},"size":442,"modificationTime":1615751701848,"dataChange":true}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1615751702764,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":3,"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"442","numOutputRows":"1"}}}
{"add":{"path":"part-00000-e93060ad-9c8c-4170-a9da-7c6f53f6406b-c000.snappy.parquet","partitionValues":{},"size":442,"modificationTime":1615751702758,"dataChange":true}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1615751703539,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":4,"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"442","numOutputRows":"1"}}}
{"add":{"path":"part-00000-e9c6df9a-e585-4c70-bc1f-de9bd8ae025b-c000.snappy.parquet","partitionValues":{},"size":442,"modificationTime":1615751703532,"dataChange":true}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1615751704301,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":5,"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"442","numOutputRows":"1"}}}
{"add":{"path":"part-00000-7d239c98-d74b-4b02-b3f6-9f256992c633-c000.snappy.parquet","partitionValues":{},"size":442,"modificationTime":1615751704295,"dataChange":true}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1615751705073,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":6,"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"442","numOutputRows":"1"}}}
{"add":{"path":"part-00000-72ecc4d6-2e44-4df4-99e6-23f1ac2b7b7c-c000.snappy.parquet","partitionValues":{},"size":442,"modificationTime":1615751705065,"dataChange":true}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1615751705959,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":7,"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"442","numOutputRows":"1"}}}
{"add":{"path":"part-00000-3fa65c69-4e55-4b18-a195-5f1ae583e553-c000.snappy.parquet","partitionValues":{},"size":442,"modificationTime":1615751705952,"dataChange":true}}
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1615751706703,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":8,"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"442","numOutputRows":"1"}}}
{"add":{"path":"part-00000-9afd9224-729f-4420-a05e-8032113a6568-c000.snappy.parquet","partitionValues":{},"size":442,"modificationTime":1615751706698,"dataChange":true}}
Binary file not shown.
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
{"commitInfo":{"timestamp":1615751716705,"operation":"WRITE","operationParameters":{"mode":"Append","partitionBy":"[]"},"readVersion":9,"isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputBytes":"442","numOutputRows":"1"}}}
{"add":{"path":"part-00000-f0e955c5-a1e3-4eec-834e-dcc098fc9005-c000.snappy.parquet","partitionValues":{},"size":442,"modificationTime":1615751716698,"dataChange":true}}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
{"version":10,"size":13}
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
34 changes: 24 additions & 10 deletions rust/tests/read_delta_test.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
extern crate deltalake;

use pretty_assertions::assert_eq;
use std::collections::HashMap;

#[tokio::test]
Expand All @@ -8,8 +9,8 @@ async fn read_delta_2_0_table_without_version() {
.await
.unwrap();
assert_eq!(table.version, 3);
assert_eq!(table.min_writer_version, 2);
assert_eq!(table.min_reader_version, 1);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(
table.get_files(),
&vec![
Expand All @@ -31,14 +32,27 @@ async fn read_delta_2_0_table_without_version() {
);
}

#[tokio::test]
async fn read_delta_table_with_update() {
let path = "./tests/data/simple_table_with_checkpoint/";
let table_newest_version = deltalake::open_table(path).await.unwrap();
let mut table_to_update = deltalake::open_table_with_version(path, 0).await.unwrap();
table_to_update.update().await.unwrap();

assert_eq!(
table_newest_version.get_files(),
table_to_update.get_files()
);
}

#[tokio::test]
async fn read_delta_2_0_table_with_version() {
let mut table = deltalake::open_table_with_version("./tests/data/delta-0.2.0", 0)
.await
.unwrap();
assert_eq!(table.version, 0);
assert_eq!(table.min_writer_version, 2);
assert_eq!(table.min_reader_version, 1);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(
table.get_files(),
&vec![
Expand All @@ -51,8 +65,8 @@ async fn read_delta_2_0_table_with_version() {
.await
.unwrap();
assert_eq!(table.version, 2);
assert_eq!(table.min_writer_version, 2);
assert_eq!(table.min_reader_version, 1);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(
table.get_files(),
&vec![
Expand All @@ -65,8 +79,8 @@ async fn read_delta_2_0_table_with_version() {
.await
.unwrap();
assert_eq!(table.version, 3);
assert_eq!(table.min_writer_version, 2);
assert_eq!(table.min_reader_version, 1);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(
table.get_files(),
&vec![
Expand All @@ -83,8 +97,8 @@ async fn read_delta_8_0_table_without_version() {
.await
.unwrap();
assert_eq!(table.version, 1);
assert_eq!(table.min_writer_version, 2);
assert_eq!(table.min_reader_version, 1);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(
table.get_files(),
&vec![
Expand Down
16 changes: 8 additions & 8 deletions rust/tests/read_simple_table_test.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ async fn read_simple_table() {
.await
.unwrap();
assert_eq!(table.version, 4);
assert_eq!(table.min_writer_version, 2);
assert_eq!(table.min_reader_version, 1);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(
table.get_files(),
&vec![
Expand Down Expand Up @@ -43,8 +43,8 @@ async fn read_simple_table_with_version() {
.await
.unwrap();
assert_eq!(table.version, 0);
assert_eq!(table.min_writer_version, 2);
assert_eq!(table.min_reader_version, 1);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(
table.get_files(),
&vec![
Expand All @@ -61,8 +61,8 @@ async fn read_simple_table_with_version() {
.await
.unwrap();
assert_eq!(table.version, 2);
assert_eq!(table.min_writer_version, 2);
assert_eq!(table.min_reader_version, 1);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(
table.get_files(),
&vec![
Expand All @@ -79,8 +79,8 @@ async fn read_simple_table_with_version() {
.await
.unwrap();
assert_eq!(table.version, 3);
assert_eq!(table.min_writer_version, 2);
assert_eq!(table.min_reader_version, 1);
assert_eq!(table.get_min_writer_version(), 2);
assert_eq!(table.get_min_reader_version(), 1);
assert_eq!(
table.get_files(),
&vec![
Expand Down
Loading

0 comments on commit 5b6e98f

Please sign in to comment.