From 16f21a5a0d8b6cff190b9644af1f30d20f880fea Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 23 Feb 2022 20:32:57 -0800 Subject: [PATCH 01/28] Sketch writer implementation --- python/deltalake/table.py | 15 +++++++++++- python/deltalake/writer.py | 35 ++++++++++++++++++++++++++++ python/docs/source/usage.rst | 45 +++++++++++++++++++++++++++++++++++- 3 files changed, 93 insertions(+), 2 deletions(-) create mode 100644 python/deltalake/writer.py diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 1c8cc8b4e7..c8f03e37e7 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -1,12 +1,14 @@ import json import warnings from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Tuple, Union import pyarrow import pyarrow.fs as pa_fs from pyarrow.dataset import FileSystemDataset, ParquetFileFormat +from python.deltalake.writer import write_deltalake + if TYPE_CHECKING: import pandas @@ -330,3 +332,14 @@ def update_incremental(self) -> None: newer versions. """ self._table.update_incremental() + + def write(self, data, mode: Literal['append', 'overwrite'] = 'append', backend: str = 'pyarrow'): + write_deltalake(self, data, mode, backend) + + def delete_where(self, where_expr, backend: str = 'pyarrow'): + '''Delete rows matching the expression''' + pass + + def update(self, where_expr, set_values: Dict[str, Any], backend: str = 'pyarrow'): + '''Modify values in rows matching the expression''' + pass diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py new file mode 100644 index 0000000000..1029ba49c7 --- /dev/null +++ b/python/deltalake/writer.py @@ -0,0 +1,35 @@ +from typing import Iterator, Union +from deltalake import DeltaTable +import pyarrow as pa + + +class DeltaTableWriter: + def __init__(self, table: DeltaTable): + self.table = table + + def append(self, batches: Iterator[pa.RecordBatch]): + pass + + def overwrite(self, batches: Iterator[pa.RecordBatch]): + pass + + def delete(self, where): + pass + + def update(self, on, batches: Iterator[pa.RecordBatch]): + pass + + # TODO: merge + + +def create_empty_table(uri: str, schema: pa.Schema, backend: str = 'pyarrow'): + pass + + +def write_deltalake( + table: Union[str, DeltaTable], + data, + mode: Union['append', 'overwrite'] = 'append', + backend: str = 'pyarrow' +): + pass diff --git a/python/docs/source/usage.rst b/python/docs/source/usage.rst index e72aa802c2..f42d2eae22 100644 --- a/python/docs/source/usage.rst +++ b/python/docs/source/usage.rst @@ -328,4 +328,47 @@ Optimizing tables is not currently supported. Writing Delta Tables -------------------- -Writing Delta tables is not currently supported. +For overwrites and appends, use :func:`write_deltalake`. If the table does not +already exist, it will be created. The ``data`` parameter will accept a Pandas +DataFrame, a PyArrow Table, or an iterator of PyArrow Record Batches. + +.. code-block:: python + + >>> from deltalake.writer import write_deltalake + >>> df = pd.DataFrame({'x': [1, 2, 3]}) + >>> write_deltalake('path/to/table', df) + +By default, writes append to the table. To overwrite, pass in ``mode='overwrite'``: + +.. code-block:: python + + >>> write_deltalake('path/to/table', df, mode='overwrite') + +If you have a :class:`DeltaTable` object, you can also call the :meth:`DeltaTable.write` +method: + +.. code-block:: python + + >>> DeltaTable('path/to/table').write(df, mode='overwrite') + +To delete rows based on an expression, use :meth:`DeltaTable.delete` + +.. code-block:: python + + >>> from deltalake.writer import delete_deltalake + >>> import pyarrow.dataset as ds + >>> DeltaTable('path/to/table').delete(ds.field('x') == 2) + +To update a subset of rows with new values, use + +.. code-block:: python + + >>> from deltalake.writer import delete_deltalake + >>> import pyarrow.dataset as ds + >>> # Increment y where x = 2 + >>> DeltaTable('path/to/table').update( + where_expr=ds.field('x') == 2, + set_values={ + 'y': ds.field('y') + 1 + } + ) \ No newline at end of file From e6ffab05e7e78c6b2c187f426ac25159ffae46ab Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 27 Feb 2022 09:41:04 -0800 Subject: [PATCH 02/28] Draft writer implementation --- .gitignore | 4 +- python/deltalake/table.py | 8 +- python/deltalake/writer.py | 129 +++++++++++++++++++++++++------ python/src/lib.rs | 149 +++++++++++++++++++++++++++++++++++- python/tests/test_writer.py | 0 rust/Cargo.toml | 5 +- rust/src/delta_arrow.rs | 94 +++++++++++++++++++++++ 7 files changed, 362 insertions(+), 27 deletions(-) create mode 100644 python/tests/test_writer.py diff --git a/.gitignore b/.gitignore index 776885c858..ff22decf25 100644 --- a/.gitignore +++ b/.gitignore @@ -10,4 +10,6 @@ tlaplus/*.toolbox/*/MC.cfg tlaplus/*.toolbox/*/[0-9]*-[0-9]*-[0-9]*-[0-9]*-[0-9]*-[0-9]*/ /.idea .vscode -.env \ No newline at end of file +.env +**/.DS_Store +**/.python-version \ No newline at end of file diff --git a/python/deltalake/table.py b/python/deltalake/table.py index c8f03e37e7..1f3a25a0c4 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -7,7 +7,7 @@ import pyarrow.fs as pa_fs from pyarrow.dataset import FileSystemDataset, ParquetFileFormat -from python.deltalake.writer import write_deltalake +from deltalake.writer import write_deltalake if TYPE_CHECKING: import pandas @@ -89,6 +89,12 @@ def __init__( ) self._metadata = Metadata(self._table) + @classmethod + def _from_raw(cls, raw_table: RawDeltaTable) -> "DeltaTable": + self = cls.__new__(cls) + self._table = raw_table + self._metadata = Metadata(self._table) + @classmethod def from_data_catalog( cls, diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 1029ba49c7..50045a202a 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -1,35 +1,120 @@ -from typing import Iterator, Union +from dataclasses import dataclass +from datetime import datetime +import json +from typing import Dict, Iterable, List, Literal, Optional, Union, overload +import uuid from deltalake import DeltaTable +from .deltalake import _create_empty_table import pyarrow as pa +import pyarrow.dataset as ds -class DeltaTableWriter: - def __init__(self, table: DeltaTable): - self.table = table +@dataclass +class AddAction: + path: str + size: int + partition_values: Dict[str, Optional[str]] + modification_time: int + data_change: bool + stats: str - def append(self, batches: Iterator[pa.RecordBatch]): - pass - def overwrite(self, batches: Iterator[pa.RecordBatch]): - pass +def create_empty_table(uri: str, schema: pa.Schema, partition_columns: List[str]) -> DeltaTable: + return DeltaTable._from_raw(_create_empty_table(uri, schema, partition_columns)) - def delete(self, where): - pass - def update(self, on, batches: Iterator[pa.RecordBatch]): - pass +@overload +def write_deltalake( + table_or_uri: Union[str, DeltaTable], + data: Iterable[pa.RecordBatch], + schema: pa.Schema, + partition_by: Optional[Iterable[str]], + mode: Literal['error', 'append', 'overwrite', 'ignore'] = 'error' +): ... - # TODO: merge +@overload +def write_deltalake( + table_or_uri: Union[str, DeltaTable], + data: Union[pa.Table, pa.RecordBatch, pa.RecordBatchReader], + schema: Optional[pa.Schema], + partition_by: Optional[Iterable[str]], + mode: Literal['error', 'append', 'overwrite', 'ignore'] = 'error' +): ... -def create_empty_table(uri: str, schema: pa.Schema, backend: str = 'pyarrow'): - pass +def write_deltalake(table_or_uri, data, schema, partition_by=None, mode='error'): + """Write to a Delta Lake table -def write_deltalake( - table: Union[str, DeltaTable], - data, - mode: Union['append', 'overwrite'] = 'append', - backend: str = 'pyarrow' -): - pass + If the table does not already exist, it will be created. + + :param table: URI of a table or a DeltaTable object. + :param data: Data to write. If passing iterable, the schema must also be given. + :param schema: Optional schema to write. + :param mode: How to handle existing data. Default is to error if table + already exists. If 'append', will add new data. If 'overwrite', will + replace table with new data. If 'ignore', will not write anything if + table already exists. + """ + if isinstance(data, Iterable) and schema is None: + return ValueError("You must provide schema if data is Iterable") + elif not isinstance(data, Iterable): + schema = data.schema + + if isinstance(table_or_uri, str): + try: + table = DeltaTable(table_or_uri) + except KeyError: # TODO update to error we should get if doesn't exist + # Create it + if mode == 'error': + raise AssertionError("DeltaTable already exists.") + + table = create_empty_table( + table_or_uri, schema, partition_by or []) + else: + if mode == 'ignore': + # table already exists + return + else: + table = table_or_uri + + if partition_by: + partitioning = ds.partitioning(field_names=partition_by, flavor="hive") + else: + partitioning = None + + add_actions: List[AddAction] = [] + + def visitor(written_file): + # TODO: Get partition values from path + partition_values = {} + # TODO: Record column statistics + # NOTE: will need to aggregate over row groups. Access with + # written_file.metadata.row_group(i).column(j).statistics + stats = {"numRecords": written_file.metadata.num_rows} + add_actions.append(AddAction( + written_file.path, + written_file.metadata.serialized_size, + partition_values, + int(datetime.now().timestamp()), + True, + json.dumps(stats) + )) + + # TODO: Pass through filesystem? Do we need to transform the URI as well? + ds.write_dataset( + data, + base_dir=table._table.table_uri(), + basename_template=f"{table.version}-{uuid.uuid4()}-{{i}}.parquet", + format="parquet", + partitioning=partitioning, + schema=schema, + file_visitor=visitor, + existing_data_behavior='overwrite_or_ignore', + ) + + table._table.create_write_transaction( + add_actions, + mode, + partition_by + ) diff --git a/python/src/lib.rs b/python/src/lib.rs index fcab8b11b0..18d07159bf 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -3,19 +3,28 @@ extern crate pyo3; use chrono::{DateTime, FixedOffset, Utc}; -use deltalake::action::Stats; -use deltalake::action::{ColumnCountStat, ColumnValueStat}; +use deltalake::action; +use deltalake::action::Action; +use deltalake::action::{ColumnCountStat, ColumnValueStat, DeltaOperation, SaveMode, Stats}; use deltalake::arrow::datatypes::Schema as ArrowSchema; +use deltalake::get_backend_for_uri; use deltalake::partitions::PartitionFilter; use deltalake::storage; +use deltalake::DeltaDataTypeLong; +use deltalake::DeltaDataTypeTimestamp; +use deltalake::DeltaTableMetaData; +use deltalake::DeltaTransactionOptions; use deltalake::{arrow, StorageBackend}; use pyo3::create_exception; use pyo3::exceptions::PyException; +use pyo3::exceptions::PyValueError; use pyo3::prelude::*; use pyo3::types::{PyBytes, PyTuple, PyType}; use std::collections::HashMap; use std::collections::HashSet; use std::convert::TryFrom; +use std::time::SystemTime; +use std::time::UNIX_EPOCH; create_exception!(deltalake, PyDeltaTableError, PyException); @@ -43,6 +52,10 @@ impl PyDeltaTableError { fn from_chrono(err: chrono::ParseError) -> pyo3::PyErr { PyDeltaTableError::new_err(format!("Parse date and time string failed: {}", err)) } + + fn from_serde(err: serde_json::Error) -> pyo3::PyErr { + PyDeltaTableError::new_err(err.to_string()) + } } #[inline] @@ -122,6 +135,50 @@ impl RawDeltaTable { Ok(table_uri) } + #[classmethod] + fn create_empty( + _cls: &PyType, + table_uri: String, + schema: ArrowSchema, + partition_columns: Vec, + commit: Option, + ) -> PyResult { + let mut table = deltalake::DeltaTable::new( + &table_uri, + get_backend_for_uri(&table_uri).map_err(PyDeltaTableError::from_storage)?, + deltalake::DeltaTableConfig::default(), + ) + .map_err(PyDeltaTableError::from_raw)?; + + let protocol = action::Protocol { + min_reader_version: 1, + min_writer_version: 1, // TODO: Make sure we comply with protocol + }; + + let metadata = DeltaTableMetaData::new( + None, + None, + None, + (&schema).try_into()?, + partition_columns, + HashMap::new(), + ); + + let mut transaction = table.create_transaction(Some(DeltaTransactionOptions::new(3))); + transaction.add_action(Action::metaData( + metadata.try_into().map_err(PyDeltaTableError::from_serde)?, + )); + transaction.add_action(Action::protocol(protocol)); + + if let Some(true) = commit { + rt()? + .block_on(transaction.commit(None)) + .map_err(PyDeltaTableError::from_raw)?; + } + + Ok(RawDeltaTable { _table: table }) + } + pub fn table_uri(&self) -> PyResult<&str> { Ok(&self._table.table_uri) } @@ -272,6 +329,50 @@ impl RawDeltaTable { }) .collect() } + + pub fn create_write_transaction( + &mut self, + add_actions: Vec, + mode: &str, + partition_by: Option>, + ) -> PyResult<()> { + let mode = save_mode_from_str(mode)?; + + let mut actions: Vec = add_actions + .iter() + .map(|add| Action::add(add.clone().into())) + .collect(); + + if let SaveMode::Overwrite = mode { + // Remove all current files + for old_add in self._table.get_state().files().iter() { + let remove_action = Action::remove(action::Remove { + path: old_add.path.clone(), + deletion_timestamp: Some(current_timestamp()), + data_change: true, + extended_file_metadata: Some(old_add.tags.is_some()), + partition_values: Some(old_add.partition_values.clone()), + size: Some(old_add.size), + tags: old_add.tags.clone(), + }); + actions.push(remove_action); + } + } + + let mut transaction = self + ._table + .create_transaction(Some(DeltaTransactionOptions::new(3))); + transaction.add_actions(actions); + rt()? + .block_on(transaction.commit(Some(DeltaOperation::Write { + mode, + partitionBy: partition_by, + predicate: None, + }))) + .map_err(PyDeltaTableError::from_raw)?; + + Ok(()) + } } fn json_value_to_py(value: &serde_json::Value, py: Python) -> PyObject { @@ -409,6 +510,50 @@ fn rust_core_version() -> &'static str { deltalake::crate_version() } +fn save_mode_from_str(value: &str) -> PyResult { + match value { + "append" => Ok(SaveMode::Append), + "overwrite" => Ok(SaveMode::Overwrite), + "error" => Ok(SaveMode::ErrorIfExists), + "ignore" => Ok(SaveMode::Ignore), + _ => Err(PyValueError::new_err("Invalid save mode")), + } +} + +fn current_timestamp() -> DeltaDataTypeTimestamp { + let start = SystemTime::now(); + let since_the_epoch = start + .duration_since(UNIX_EPOCH) + .expect("Time went backwards"); + since_the_epoch.as_millis().try_into().unwrap() +} + +#[derive(FromPyObject)] +pub struct PyAddAction { + path: String, + size: DeltaDataTypeLong, + partition_values: HashMap>, + modification_time: DeltaDataTypeTimestamp, + data_change: bool, + stats: Option, +} + +impl From<&PyAddAction> for action::Add { + fn from(action: &PyAddAction) -> Self { + action::Add { + path: action.path.clone(), + size: action.size, + partition_values: action.partition_values.clone(), + partition_values_parsed: None, + modification_time: action.modification_time, + data_change: action.data_change, + stats: action.stats.clone(), + stats_parsed: None, + tags: None, + } + } +} + #[pymodule] // module name need to match project name fn deltalake(py: Python, m: &PyModule) -> PyResult<()> { diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/rust/Cargo.toml b/rust/Cargo.toml index 2382506037..3e71a90585 100644 --- a/rust/Cargo.toml +++ b/rust/Cargo.toml @@ -26,7 +26,10 @@ lazy_static = "1" percent-encoding = "2" # HTTP Client -reqwest = { version = "0.11", default-features = false, features = ["rustls-tls", "stream"], optional = true} +reqwest = { version = "0.11", default-features = false, features = [ + "rustls-tls", + "stream", +], optional = true } # Azure azure_core = { version = "0.1", optional = true } diff --git a/rust/src/delta_arrow.rs b/rust/src/delta_arrow.rs index d2f1bd9ce9..5e8a8563e5 100644 --- a/rust/src/delta_arrow.rs +++ b/rust/src/delta_arrow.rs @@ -7,6 +7,7 @@ use arrow::datatypes::{ use arrow::error::ArrowError; use lazy_static::lazy_static; use regex::Regex; +use std::collections::HashMap; use std::convert::TryFrom; impl TryFrom<&schema::Schema> for ArrowSchema { @@ -162,6 +163,99 @@ impl TryFrom<&schema::SchemaDataType> for ArrowDataType { } } +impl TryFrom<&ArrowSchema> for schema::Schema { + type Error = ArrowError; + fn try_from(arrow_schema: &ArrowSchema) -> Result { + let new_fields: Result, _> = arrow_schema + .fields() + .iter() + .map(|field| field.try_into()) + .collect(); + Ok(schema::Schema::new(new_fields?)) + } +} + +impl TryFrom<&ArrowField> for schema::SchemaField { + type Error = ArrowError; + fn try_from(arrow_field: &ArrowField) -> Result { + Ok(schema::SchemaField::new( + arrow_field.name().clone(), + arrow_field.data_type().try_into()?, + arrow_field.is_nullable(), + arrow_field.metadata().as_ref().map_or_else( + || HashMap::new(), + |m| { + m.iter() + .map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone()))) + .collect() + }, + ), + )) + } +} + +impl TryFrom<&ArrowDataType> for schema::SchemaDataType { + type Error = ArrowError; + fn try_from(arrow_datatype: &ArrowDataType) -> Result { + match arrow_datatype { + ArrowDataType::Utf8 => Ok(schema::SchemaDataType::primitive("string".to_string())), + ArrowDataType::Int64 => Ok(schema::SchemaDataType::primitive("long".to_string())), // undocumented type + ArrowDataType::Int32 => Ok(schema::SchemaDataType::primitive("integer".to_string())), + ArrowDataType::Int16 => Ok(schema::SchemaDataType::primitive("short".to_string())), + ArrowDataType::Int8 => Ok(schema::SchemaDataType::primitive("byte".to_string())), + ArrowDataType::Float32 => Ok(schema::SchemaDataType::primitive("float".to_string())), + ArrowDataType::Float64 => Ok(schema::SchemaDataType::primitive("double".to_string())), + ArrowDataType::Boolean => Ok(schema::SchemaDataType::primitive("boolean".to_string())), + ArrowDataType::Binary => Ok(schema::SchemaDataType::primitive("binary".to_string())), + ArrowDataType::Decimal(p, s) => Ok(schema::SchemaDataType::primitive(format!( + "decimal({},{})", + p, s + ))), + ArrowDataType::Date32 => Ok(schema::SchemaDataType::primitive("date".to_string())), + ArrowDataType::Timestamp(TimeUnit::Microsecond, None) => { + Ok(schema::SchemaDataType::primitive("timestamp".to_string())) + } + ArrowDataType::Struct(fields) => { + let converted_fields: Result, _> = + fields.iter().map(|field| field.try_into()).collect(); + Ok(schema::SchemaDataType::r#struct( + schema::SchemaTypeStruct::new(converted_fields?), + )) + } + ArrowDataType::List(field) => { + Ok(schema::SchemaDataType::array(schema::SchemaTypeArray::new( + Box::new((*field).data_type().try_into()?), + (*field).is_nullable(), + ))) + } + ArrowDataType::FixedSizeList(field, _) => { + Ok(schema::SchemaDataType::array(schema::SchemaTypeArray::new( + Box::new((*field).data_type().try_into()?), + (*field).is_nullable(), + ))) + } + ArrowDataType::Map(field, _) => { + if let ArrowDataType::Struct(struct_fields) = field.data_type() { + let key_type = struct_fields[0].data_type().try_into()?; + let value_type = struct_fields[1].data_type().try_into()?; + let value_type_nullable = struct_fields[1].is_nullable(); + Ok(schema::SchemaDataType::map(schema::SchemaTypeMap::new( + Box::new(key_type), + Box::new(value_type), + value_type_nullable, + ))) + } else { + panic!("DataType::Map should contain a struct field child"); + } + } + s => Err(ArrowError::SchemaError(format!( + "Invalid data type for Delta Lake: {}", + s + ))), + } + } +} + /// Returns an arrow schema representing the delta log for use in checkpoints /// /// # Arguments From ca052014f9120d77241fa73e813e26190096c30b Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 27 Feb 2022 20:42:52 -0800 Subject: [PATCH 03/28] start testing --- python/deltalake/__init__.py | 1 + python/deltalake/table.py | 13 --- python/deltalake/writer.py | 42 +++++---- python/src/lib.rs | 175 +++++++++++++++++------------------ python/tests/test_writer.py | 64 +++++++++++++ 5 files changed, 176 insertions(+), 119 deletions(-) diff --git a/python/deltalake/__init__.py b/python/deltalake/__init__.py index aeb999c97a..eaa3c39c9c 100644 --- a/python/deltalake/__init__.py +++ b/python/deltalake/__init__.py @@ -2,3 +2,4 @@ from .deltalake import PyDeltaTableError, RawDeltaTable, rust_core_version from .schema import DataType, Field, Schema from .table import DeltaTable, Metadata +from .writer import write_deltalake diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 1f3a25a0c4..98f5c1daf7 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -7,8 +7,6 @@ import pyarrow.fs as pa_fs from pyarrow.dataset import FileSystemDataset, ParquetFileFormat -from deltalake.writer import write_deltalake - if TYPE_CHECKING: import pandas @@ -338,14 +336,3 @@ def update_incremental(self) -> None: newer versions. """ self._table.update_incremental() - - def write(self, data, mode: Literal['append', 'overwrite'] = 'append', backend: str = 'pyarrow'): - write_deltalake(self, data, mode, backend) - - def delete_where(self, where_expr, backend: str = 'pyarrow'): - '''Delete rows matching the expression''' - pass - - def update(self, where_expr, set_values: Dict[str, Any], backend: str = 'pyarrow'): - '''Modify values in rows matching the expression''' - pass diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 50045a202a..ab958d60b5 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -3,8 +3,8 @@ import json from typing import Dict, Iterable, List, Literal, Optional, Union, overload import uuid -from deltalake import DeltaTable -from .deltalake import _create_empty_table +from deltalake import DeltaTable, PyDeltaTableError +from .deltalake import RawDeltaTable, create_write_transaction as _create_write_transaction import pyarrow as pa import pyarrow.dataset as ds @@ -20,7 +20,7 @@ class AddAction: def create_empty_table(uri: str, schema: pa.Schema, partition_columns: List[str]) -> DeltaTable: - return DeltaTable._from_raw(_create_empty_table(uri, schema, partition_columns)) + return DeltaTable._from_raw(RawDeltaTable.create_empty(uri, schema, partition_columns)) @overload @@ -36,14 +36,14 @@ def write_deltalake( @overload def write_deltalake( table_or_uri: Union[str, DeltaTable], - data: Union[pa.Table, pa.RecordBatch, pa.RecordBatchReader], + data: Union[pa.Table, pa.RecordBatch], # TODO: there a type for a RecordBatchReader? schema: Optional[pa.Schema], partition_by: Optional[Iterable[str]], mode: Literal['error', 'append', 'overwrite', 'ignore'] = 'error' ): ... -def write_deltalake(table_or_uri, data, schema, partition_by=None, mode='error'): +def write_deltalake(table_or_uri, data, schema=None, partition_by=None, mode='error'): """Write to a Delta Lake table If the table does not already exist, it will be created. @@ -62,21 +62,27 @@ def write_deltalake(table_or_uri, data, schema, partition_by=None, mode='error') schema = data.schema if isinstance(table_or_uri, str): + table_uri = table_or_uri try: table = DeltaTable(table_or_uri) - except KeyError: # TODO update to error we should get if doesn't exist - # Create it + current_version = table.version + except PyDeltaTableError as err: + # branch to handle if not a delta table + if "Not a Delta table" not in str(err): + raise + table = None + current_version = -1 + else: if mode == 'error': raise AssertionError("DeltaTable already exists.") - - table = create_empty_table( - table_or_uri, schema, partition_by or []) - else: - if mode == 'ignore': + elif mode == 'ignore': # table already exists return + current_version = table.version() else: table = table_or_uri + table_uri = table._table.table_uri() + current_version = table.version if partition_by: partitioning = ds.partitioning(field_names=partition_by, flavor="hive") @@ -104,8 +110,8 @@ def visitor(written_file): # TODO: Pass through filesystem? Do we need to transform the URI as well? ds.write_dataset( data, - base_dir=table._table.table_uri(), - basename_template=f"{table.version}-{uuid.uuid4()}-{{i}}.parquet", + base_dir=table_uri, + basename_template=f"{current_version + 1}-{uuid.uuid4()}-{{i}}.parquet", format="parquet", partitioning=partitioning, schema=schema, @@ -113,8 +119,12 @@ def visitor(written_file): existing_data_behavior='overwrite_or_ignore', ) - table._table.create_write_transaction( + _create_write_transaction( + #table._table if table is not None else None, + table is None, + table_uri, + schema, add_actions, mode, - partition_by + partition_by or [], ) diff --git a/python/src/lib.rs b/python/src/lib.rs index 18d07159bf..62ac9516c7 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -135,50 +135,6 @@ impl RawDeltaTable { Ok(table_uri) } - #[classmethod] - fn create_empty( - _cls: &PyType, - table_uri: String, - schema: ArrowSchema, - partition_columns: Vec, - commit: Option, - ) -> PyResult { - let mut table = deltalake::DeltaTable::new( - &table_uri, - get_backend_for_uri(&table_uri).map_err(PyDeltaTableError::from_storage)?, - deltalake::DeltaTableConfig::default(), - ) - .map_err(PyDeltaTableError::from_raw)?; - - let protocol = action::Protocol { - min_reader_version: 1, - min_writer_version: 1, // TODO: Make sure we comply with protocol - }; - - let metadata = DeltaTableMetaData::new( - None, - None, - None, - (&schema).try_into()?, - partition_columns, - HashMap::new(), - ); - - let mut transaction = table.create_transaction(Some(DeltaTransactionOptions::new(3))); - transaction.add_action(Action::metaData( - metadata.try_into().map_err(PyDeltaTableError::from_serde)?, - )); - transaction.add_action(Action::protocol(protocol)); - - if let Some(true) = commit { - rt()? - .block_on(transaction.commit(None)) - .map_err(PyDeltaTableError::from_raw)?; - } - - Ok(RawDeltaTable { _table: table }) - } - pub fn table_uri(&self) -> PyResult<&str> { Ok(&self._table.table_uri) } @@ -329,50 +285,6 @@ impl RawDeltaTable { }) .collect() } - - pub fn create_write_transaction( - &mut self, - add_actions: Vec, - mode: &str, - partition_by: Option>, - ) -> PyResult<()> { - let mode = save_mode_from_str(mode)?; - - let mut actions: Vec = add_actions - .iter() - .map(|add| Action::add(add.clone().into())) - .collect(); - - if let SaveMode::Overwrite = mode { - // Remove all current files - for old_add in self._table.get_state().files().iter() { - let remove_action = Action::remove(action::Remove { - path: old_add.path.clone(), - deletion_timestamp: Some(current_timestamp()), - data_change: true, - extended_file_metadata: Some(old_add.tags.is_some()), - partition_values: Some(old_add.partition_values.clone()), - size: Some(old_add.size), - tags: old_add.tags.clone(), - }); - actions.push(remove_action); - } - } - - let mut transaction = self - ._table - .create_transaction(Some(DeltaTransactionOptions::new(3))); - transaction.add_actions(actions); - rt()? - .block_on(transaction.commit(Some(DeltaOperation::Write { - mode, - partitionBy: partition_by, - predicate: None, - }))) - .map_err(PyDeltaTableError::from_raw)?; - - Ok(()) - } } fn json_value_to_py(value: &serde_json::Value, py: Python) -> PyObject { @@ -538,8 +450,8 @@ pub struct PyAddAction { stats: Option, } -impl From<&PyAddAction> for action::Add { - fn from(action: &PyAddAction) -> Self { +impl From for action::Add { + fn from(action: PyAddAction) -> Self { action::Add { path: action.path.clone(), size: action.size, @@ -554,12 +466,95 @@ impl From<&PyAddAction> for action::Add { } } +#[pyfunction] +fn create_write_transaction( + create_new: bool, + table_uri: String, + schema: ArrowSchema, + add_actions: Vec, + mode: &str, + partition_by: Vec, +) -> PyResult<()> { + let mut actions: Vec = Vec::new(); + let mut table: deltalake::DeltaTable = match create_new { + false => { + // TODO: Can we avoid this reparsing of the log? + rt()?.block_on( + deltalake::DeltaTableBuilder::from_uri(&table_uri) + .map_err(PyDeltaTableError::from_raw)? + .load(), + ).map_err(PyDeltaTableError::from_raw)? + } + true => { + let table = deltalake::DeltaTable::new( + &table_uri, + get_backend_for_uri(&table_uri).map_err(PyDeltaTableError::from_storage)?, + deltalake::DeltaTableConfig::default(), + ) + .map_err(PyDeltaTableError::from_raw)?; + actions.push(action::Action::protocol(action::Protocol { + min_reader_version: 1, + min_writer_version: 1, // TODO: Make sure we comply with protocol + })); + let metadata = DeltaTableMetaData::new( + None, + None, + None, + (&schema).try_into()?, + partition_by.clone(), + HashMap::new(), + ); + + actions.push(action::Action::metaData( + metadata.try_into().map_err(PyDeltaTableError::from_serde)?, + )); + + table + } + }; + + let mode = save_mode_from_str(mode)?; + + for add_action in add_actions { + actions.push(Action::add(add_action.into())); + } + + if let SaveMode::Overwrite = mode { + // Remove all current files + for old_add in table.get_state().files().iter() { + let remove_action = Action::remove(action::Remove { + path: old_add.path.clone(), + deletion_timestamp: Some(current_timestamp()), + data_change: true, + extended_file_metadata: Some(old_add.tags.is_some()), + partition_values: Some(old_add.partition_values.clone()), + size: Some(old_add.size), + tags: old_add.tags.clone(), + }); + actions.push(remove_action); + } + } + + let mut transaction = table.create_transaction(Some(DeltaTransactionOptions::new(3))); + transaction.add_actions(actions); + rt()? + .block_on(transaction.commit(Some(DeltaOperation::Write { + mode, + partitionBy: Some(partition_by), + predicate: None, + }))) + .map_err(PyDeltaTableError::from_raw)?; + + Ok(()) +} + #[pymodule] // module name need to match project name fn deltalake(py: Python, m: &PyModule) -> PyResult<()> { env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("warn")).init(); m.add_function(pyo3::wrap_pyfunction!(rust_core_version, m)?)?; + m.add_function(pyo3::wrap_pyfunction!(create_write_transaction, m)?)?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index e69de29bb2..cf00af7860 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -0,0 +1,64 @@ +from asyncore import write +from datetime import date, datetime, timedelta +import os +import pathlib +import pytest +import pyarrow as pa +from decimal import Decimal + +from deltalake import DeltaTable, write_deltalake + + +@pytest.fixture() +def sample_data(): + nrows = 5 + return pa.table({ + 'utf8': pa.array([str(x) for x in range(nrows)]), + 'int64': pa.array(list(range(nrows)), pa.int64()), + 'int32': pa.array(list(range(nrows)), pa.int32()), + 'int16': pa.array(list(range(nrows)), pa.int16()), + 'int8': pa.array(list(range(nrows)), pa.int8()), + 'float32': pa.array([float(x) for x in range(nrows)], pa.float32()), + 'float64': pa.array([float(x) for x in range(nrows)], pa.float64()), + 'bool': pa.array([x % 2 == 0 for x in range(nrows)]), + 'binary': pa.array([str(x).encode() for x in range(nrows)]), + 'decimal': pa.array([Decimal("10.000") + x for x in range(nrows)]), + 'date32': pa.array([date(2022, 1, 1) + timedelta(days=x) for x in range(nrows)]), + 'timestamp': pa.array([datetime(2022, 1, 1) + timedelta(hours=x) for x in range(nrows)]), + 'struct': pa.array([{'x': x, 'y': str(x)} for x in range(nrows)]), + 'list': pa.array([list(range(x)) for x in range(nrows)]), + # NOTE: https://github.com/apache/arrow-rs/issues/477 + #'map': pa.array([[(str(y), y) for y in range(x)] for x in range(nrows)], pa.map_(pa.string(), pa.int64())), + }) + + +def test_handle_existing(tmp_path: pathlib.Path, sample_data: pa.Table): + # if uri points to a non-empty directory that isn't a delta table, error + tmp_path + p = tmp_path / "hello.txt" + p.write_text("hello") + + with pytest.raises(IOError) as exception: + write_deltalake(str(tmp_path), sample_data, mode='overwrite') + + assert "directory is not empty" in exception.message + + +# round trip, no partitioning + +def test_roundtrip_basic(tmp_path: pathlib.Path, sample_data): + write_deltalake(str(tmp_path), sample_data) + + assert ("0" * 20 + ".json") in os.listdir(tmp_path / "_delta_log") + + table = DeltaTable(str(tmp_path)).to_pyarrow_table() + + assert table == sample_data + + +# round trip, one partitioning, parameterized part column + +# round trip, nested partitioning + + +# test behaviors From 599b23bcb9f8bb5c171578cffddc02b8fb2d626c Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 27 Feb 2022 21:26:52 -0800 Subject: [PATCH 04/28] Refactor to create version 0 --- python/deltalake/writer.py | 25 +++--- python/src/lib.rs | 150 +++++++++++++++++------------------- python/tests/test_writer.py | 4 +- rust/src/delta.rs | 8 +- 4 files changed, 95 insertions(+), 92 deletions(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index ab958d60b5..53ede28967 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -4,7 +4,7 @@ from typing import Dict, Iterable, List, Literal, Optional, Union, overload import uuid from deltalake import DeltaTable, PyDeltaTableError -from .deltalake import RawDeltaTable, create_write_transaction as _create_write_transaction +from .deltalake import RawDeltaTable, write_new_deltalake as _write_new_deltalake import pyarrow as pa import pyarrow.dataset as ds @@ -119,12 +119,17 @@ def visitor(written_file): existing_data_behavior='overwrite_or_ignore', ) - _create_write_transaction( - #table._table if table is not None else None, - table is None, - table_uri, - schema, - add_actions, - mode, - partition_by or [], - ) + if table is None: + _write_new_deltalake( + table_uri, + schema, + add_actions, + mode, + partition_by or [] + ) + else: + table._table.write( + add_actions, + mode, + partition_by or [], + ) diff --git a/python/src/lib.rs b/python/src/lib.rs index 62ac9516c7..264eef8077 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -52,10 +52,6 @@ impl PyDeltaTableError { fn from_chrono(err: chrono::ParseError) -> pyo3::PyErr { PyDeltaTableError::new_err(format!("Parse date and time string failed: {}", err)) } - - fn from_serde(err: serde_json::Error) -> pyo3::PyErr { - PyDeltaTableError::new_err(err.to_string()) - } } #[inline] @@ -285,6 +281,45 @@ impl RawDeltaTable { }) .collect() } + + fn create_write_transaction( + &mut self, + add_actions: Vec, + mode: &str, + partition_by: Vec, + ) -> PyResult<()> { + let mode = save_mode_from_str(mode)?; + + let mut actions: Vec = add_actions.iter().map(|add| Action::add(add.into())).collect(); + + if let SaveMode::Overwrite = mode { + // Remove all current files + for old_add in self._table.get_state().files().iter() { + let remove_action = Action::remove(action::Remove { + path: old_add.path.clone(), + deletion_timestamp: Some(current_timestamp()), + data_change: true, + extended_file_metadata: Some(old_add.tags.is_some()), + partition_values: Some(old_add.partition_values.clone()), + size: Some(old_add.size), + tags: old_add.tags.clone(), + }); + actions.push(remove_action); + } + } + + let mut transaction = self._table.create_transaction(Some(DeltaTransactionOptions::new(3))); + transaction.add_actions(actions); + rt()? + .block_on(transaction.commit(Some(DeltaOperation::Write { + mode, + partitionBy: Some(partition_by), + predicate: None, + }))) + .map_err(PyDeltaTableError::from_raw)?; + + Ok(()) + } } fn json_value_to_py(value: &serde_json::Value, py: Python) -> PyObject { @@ -450,8 +485,8 @@ pub struct PyAddAction { stats: Option, } -impl From for action::Add { - fn from(action: PyAddAction) -> Self { +impl From<&PyAddAction> for action::Add { + fn from(action: &PyAddAction) -> Self { action::Add { path: action.path.clone(), size: action.size, @@ -467,83 +502,40 @@ impl From for action::Add { } #[pyfunction] -fn create_write_transaction( - create_new: bool, +fn write_new_deltalake( table_uri: String, schema: ArrowSchema, add_actions: Vec, - mode: &str, + _mode: &str, partition_by: Vec, ) -> PyResult<()> { - let mut actions: Vec = Vec::new(); - let mut table: deltalake::DeltaTable = match create_new { - false => { - // TODO: Can we avoid this reparsing of the log? - rt()?.block_on( - deltalake::DeltaTableBuilder::from_uri(&table_uri) - .map_err(PyDeltaTableError::from_raw)? - .load(), - ).map_err(PyDeltaTableError::from_raw)? - } - true => { - let table = deltalake::DeltaTable::new( - &table_uri, - get_backend_for_uri(&table_uri).map_err(PyDeltaTableError::from_storage)?, - deltalake::DeltaTableConfig::default(), - ) - .map_err(PyDeltaTableError::from_raw)?; - actions.push(action::Action::protocol(action::Protocol { - min_reader_version: 1, - min_writer_version: 1, // TODO: Make sure we comply with protocol - })); - let metadata = DeltaTableMetaData::new( - None, - None, - None, - (&schema).try_into()?, - partition_by.clone(), - HashMap::new(), - ); - - actions.push(action::Action::metaData( - metadata.try_into().map_err(PyDeltaTableError::from_serde)?, - )); - - table - } - }; - - let mode = save_mode_from_str(mode)?; - - for add_action in add_actions { - actions.push(Action::add(add_action.into())); - } - - if let SaveMode::Overwrite = mode { - // Remove all current files - for old_add in table.get_state().files().iter() { - let remove_action = Action::remove(action::Remove { - path: old_add.path.clone(), - deletion_timestamp: Some(current_timestamp()), - data_change: true, - extended_file_metadata: Some(old_add.tags.is_some()), - partition_values: Some(old_add.partition_values.clone()), - size: Some(old_add.size), - tags: old_add.tags.clone(), - }); - actions.push(remove_action); - } - } - - let mut transaction = table.create_transaction(Some(DeltaTransactionOptions::new(3))); - transaction.add_actions(actions); - rt()? - .block_on(transaction.commit(Some(DeltaOperation::Write { - mode, - partitionBy: Some(partition_by), - predicate: None, - }))) - .map_err(PyDeltaTableError::from_raw)?; + let mut table = deltalake::DeltaTable::new( + &table_uri, + get_backend_for_uri(&table_uri).map_err(PyDeltaTableError::from_storage)?, + deltalake::DeltaTableConfig::default(), + ) + .map_err(PyDeltaTableError::from_raw)?; + + let metadata = DeltaTableMetaData::new( + None, + None, + None, + (&schema).try_into()?, + partition_by.clone(), + HashMap::new(), + ); + + let fut = table.create( + metadata, + action::Protocol { + min_reader_version: 1, + min_writer_version: 1, // TODO: Make sure we comply with protocol + }, + None, // TODO + Some(add_actions.iter().map(|add| add.into()).collect()), + ); + + rt()?.block_on(fut).map_err(PyDeltaTableError::from_raw)?; Ok(()) } @@ -554,7 +546,7 @@ fn deltalake(py: Python, m: &PyModule) -> PyResult<()> { env_logger::Builder::from_env(env_logger::Env::default().default_filter_or("warn")).init(); m.add_function(pyo3::wrap_pyfunction!(rust_core_version, m)?)?; - m.add_function(pyo3::wrap_pyfunction!(create_write_transaction, m)?)?; + m.add_function(pyo3::wrap_pyfunction!(write_new_deltalake, m)?)?; m.add_class::()?; m.add_class::()?; m.add_class::()?; diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index cf00af7860..1080253209 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -38,10 +38,10 @@ def test_handle_existing(tmp_path: pathlib.Path, sample_data: pa.Table): p = tmp_path / "hello.txt" p.write_text("hello") - with pytest.raises(IOError) as exception: + with pytest.raises(OSError) as exception: write_deltalake(str(tmp_path), sample_data, mode='overwrite') - assert "directory is not empty" in exception.message + assert "directory is not empty" in str(exception) # round trip, no partitioning diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 085f076a33..8d2ef9a8d1 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -1293,6 +1293,7 @@ impl DeltaTable { metadata: DeltaTableMetaData, protocol: action::Protocol, commit_info: Option>, + add_actions: Option>, ) -> Result<(), DeltaTableError> { let meta = action::MetaData::try_from(metadata)?; @@ -1307,11 +1308,16 @@ impl DeltaTable { Value::Number(serde_json::Number::from(Utc::now().timestamp_millis())), ); - let actions = vec![ + let mut actions = vec![ Action::commitInfo(enriched_commit_info), Action::protocol(protocol), Action::metaData(meta), ]; + if let Some(add_actions) = add_actions { + for add_action in add_actions { + actions.push(Action::add(add_action)); + } + }; let mut transaction = self.create_transaction(None); transaction.add_actions(actions.clone()); From f196ce9629eb3568caa2609d4558a289bf55c251 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Mon, 28 Feb 2022 21:08:13 -0800 Subject: [PATCH 05/28] Implement stats writer; get first new test to pass --- python/deltalake/schema.py | 4 +-- python/deltalake/writer.py | 51 ++++++++++++++++++++++++++++++++++--- python/tests/test_schema.py | 4 +-- python/tests/test_writer.py | 4 ++- 4 files changed, 55 insertions(+), 8 deletions(-) diff --git a/python/deltalake/schema.py b/python/deltalake/schema.py index 9ac5123098..9363b6d097 100644 --- a/python/deltalake/schema.py +++ b/python/deltalake/schema.py @@ -205,7 +205,7 @@ def pyarrow_datatype_from_dict(json_dict: Dict[str, Any]) -> pyarrow.DataType: key, pyarrow.list_( pyarrow.field( - "element", pyarrow.struct([pyarrow_field_from_dict(value_type)]) + "entries", pyarrow.struct([pyarrow_field_from_dict(value_type)]) ) ), ) @@ -218,7 +218,7 @@ def pyarrow_datatype_from_dict(json_dict: Dict[str, Any]) -> pyarrow.DataType: elif type_class == "list": field = json_dict["children"][0] element_type = pyarrow_datatype_from_dict(field) - return pyarrow.list_(pyarrow.field("element", element_type)) + return pyarrow.list_(pyarrow.field("item", element_type)) elif type_class == "struct": fields = [pyarrow_field_from_dict(field) for field in json_dict["children"]] return pyarrow.struct(fields) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 53ede28967..146db1884d 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -1,5 +1,5 @@ from dataclasses import dataclass -from datetime import datetime +from datetime import date, datetime import json from typing import Dict, Iterable, List, Literal, Optional, Union, overload import uuid @@ -97,14 +97,47 @@ def visitor(written_file): # TODO: Record column statistics # NOTE: will need to aggregate over row groups. Access with # written_file.metadata.row_group(i).column(j).statistics - stats = {"numRecords": written_file.metadata.num_rows} + stats = { + "numRecords": written_file.metadata.num_rows, + "minValues": {}, + "maxValues": {}, + "nullCount": {}, + } + + def iter_groups(metadata): + for i in range(metadata.num_row_groups): + yield metadata.row_group(i) + + # TODO: What do nested columns look like? + for column_idx in range(written_file.metadata.num_columns): + name = written_file.metadata.schema.names[column_idx] + # If stats missing, then we can't know aggregate stats + if all(group.column(column_idx).is_stats_set + for group in iter_groups(written_file.metadata)): + stats["nullCount"][name] = sum( + group.column(column_idx).statistics.null_count + for group in iter_groups(written_file.metadata) + ) + + # I assume for now this is based on data type, and thus is + # consistent between groups + if written_file.metadata.row_group(0).column(column_idx).statistics.has_min_max: + stats["minValues"][name] = min( + group.column(column_idx).statistics.min + for group in iter_groups(written_file.metadata) + ) + stats["maxValues"][name] = max( + group.column(column_idx).statistics.max + for group in iter_groups(written_file.metadata) + ) + add_actions.append(AddAction( written_file.path, written_file.metadata.serialized_size, partition_values, int(datetime.now().timestamp()), True, - json.dumps(stats) + json.dumps(stats, cls=DeltaJSONEncoder) )) # TODO: Pass through filesystem? Do we need to transform the URI as well? @@ -133,3 +166,15 @@ def visitor(written_file): mode, partition_by or [], ) + + +class DeltaJSONEncoder(json.JSONEncoder): + def default(self, obj): + if isinstance(obj, bytes): + return obj.decode("unicode_escape") + elif isinstance(obj, date): + return obj.isoformat() + elif isinstance(obj, datetime): + return obj.isoformat() + # Let the base class default method raise the TypeError + return json.JSONEncoder.default(self, obj) \ No newline at end of file diff --git a/python/tests/test_schema.py b/python/tests/test_schema.py index 78eed79385..33455cb47e 100644 --- a/python/tests/test_schema.py +++ b/python/tests/test_schema.py @@ -233,7 +233,7 @@ def test_schema_pyarrow_types(): ) assert pyarrow_field.name == field_name assert pyarrow_field.type == pyarrow.list_( - pyarrow.field("element", pyarrow.int32()) + pyarrow.field("item", pyarrow.int32()) ) assert pyarrow_field.metadata == metadata assert pyarrow_field.nullable is False @@ -276,7 +276,7 @@ def test_schema_pyarrow_types(): pyarrow.int32(), pyarrow.list_( pyarrow.field( - "element", + "entries", pyarrow.struct( [pyarrow.field("val", pyarrow.int32(), False, metadata)] ), diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index 1080253209..f4579aa68c 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -51,8 +51,10 @@ def test_roundtrip_basic(tmp_path: pathlib.Path, sample_data): assert ("0" * 20 + ".json") in os.listdir(tmp_path / "_delta_log") - table = DeltaTable(str(tmp_path)).to_pyarrow_table() + delta_table = DeltaTable(str(tmp_path)) + assert delta_table.pyarrow_schema() == sample_data.schema + table = delta_table.to_pyarrow_table() assert table == sample_data From 69a7b5a37732ca5790dcdf9b41cbbc4635475801 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 1 Mar 2022 19:55:34 -0800 Subject: [PATCH 06/28] Refactor and add skip for fs blocker --- python/deltalake/writer.py | 219 +++++++++++++++++++----------------- python/src/lib.rs | 9 +- python/tests/test_schema.py | 4 +- python/tests/test_writer.py | 60 +++++----- 4 files changed, 158 insertions(+), 134 deletions(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 146db1884d..77ae1ad3b1 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -1,12 +1,18 @@ -from dataclasses import dataclass -from datetime import date, datetime import json -from typing import Dict, Iterable, List, Literal, Optional, Union, overload import uuid -from deltalake import DeltaTable, PyDeltaTableError -from .deltalake import RawDeltaTable, write_new_deltalake as _write_new_deltalake +from dataclasses import dataclass +from datetime import date, datetime +from typing import Dict, Iterable, List, Literal, Optional, Union + import pyarrow as pa import pyarrow.dataset as ds +import pyarrow.fs as pa_fs + +from deltalake import DeltaTable, PyDeltaTableError +from deltalake.fs import DeltaStorageHandler + +from .deltalake import RawDeltaTable +from .deltalake import write_new_deltalake as _write_new_deltalake @dataclass @@ -19,39 +25,36 @@ class AddAction: stats: str -def create_empty_table(uri: str, schema: pa.Schema, partition_columns: List[str]) -> DeltaTable: - return DeltaTable._from_raw(RawDeltaTable.create_empty(uri, schema, partition_columns)) - - -@overload -def write_deltalake( - table_or_uri: Union[str, DeltaTable], - data: Iterable[pa.RecordBatch], - schema: pa.Schema, - partition_by: Optional[Iterable[str]], - mode: Literal['error', 'append', 'overwrite', 'ignore'] = 'error' -): ... +def create_empty_table( + uri: str, schema: pa.Schema, partition_columns: List[str] +) -> DeltaTable: + return DeltaTable._from_raw( + RawDeltaTable.create_empty(uri, schema, partition_columns) + ) -@overload def write_deltalake( table_or_uri: Union[str, DeltaTable], - data: Union[pa.Table, pa.RecordBatch], # TODO: there a type for a RecordBatchReader? - schema: Optional[pa.Schema], - partition_by: Optional[Iterable[str]], - mode: Literal['error', 'append', 'overwrite', 'ignore'] = 'error' -): ... - - -def write_deltalake(table_or_uri, data, schema=None, partition_by=None, mode='error'): + data: Union[ + pa.Table, pa.RecordBatch, Iterable[pa.RecordBatch] + ], # TODO: there a type for a RecordBatchReader? + schema: Optional[pa.Schema] = None, + partition_by: Optional[Iterable[str]] = None, + filesystem: Optional[pa_fs.FileSystem] = None, + mode: Literal["error", "append", "overwrite", "ignore"] = "error", +): """Write to a Delta Lake table If the table does not already exist, it will be created. :param table: URI of a table or a DeltaTable object. :param data: Data to write. If passing iterable, the schema must also be given. - :param schema: Optional schema to write. - :param mode: How to handle existing data. Default is to error if table + :param schema: Optional schema to write. + :param partition_by: List of columns to partition the table by. Only required + when creating a new table. + :param filesystem: Optional filesystem to pass to PyArrow. If not provided will + be inferred from uri. + :param mode: How to handle existing data. Default is to error if table already exists. If 'append', will add new data. If 'overwrite', will replace table with new data. If 'ignore', will not write anything if table already exists. @@ -62,27 +65,32 @@ def write_deltalake(table_or_uri, data, schema=None, partition_by=None, mode='er schema = data.schema if isinstance(table_or_uri, str): + table = try_get_deltatable(table_or_uri) table_uri = table_or_uri - try: - table = DeltaTable(table_or_uri) - current_version = table.version - except PyDeltaTableError as err: - # branch to handle if not a delta table - if "Not a Delta table" not in str(err): - raise - table = None - current_version = -1 - else: - if mode == 'error': - raise AssertionError("DeltaTable already exists.") - elif mode == 'ignore': - # table already exists - return - current_version = table.version() else: table = table_or_uri - table_uri = table._table.table_uri() - current_version = table.version + table_uri = table_uri = table._table.table_uri() + + # TODO: Pass through filesystem once it is complete + # if filesystem is None: + # filesystem = pa_fs.PyFileSystem(DeltaStorageHandler(table_uri)) + + if table: # already exists + if mode == "error": + raise AssertionError("DeltaTable already exists.") + elif mode == "ignore": + return + + current_version = table.version() + + if partition_by: + assert partition_by == table.metadata.partition_columns + else: # creating a new table + current_version = -1 + + # TODO: Don't allow writing to non-empty directory + # Blocked on: Finish filesystem implementation in fs.py + # assert len(filesystem.get_file_info(pa_fs.FileSelector(table_uri, allow_not_found=True))) == 0 if partition_by: partitioning = ds.partitioning(field_names=partition_by, flavor="hive") @@ -94,53 +102,19 @@ def write_deltalake(table_or_uri, data, schema=None, partition_by=None, mode='er def visitor(written_file): # TODO: Get partition values from path partition_values = {} - # TODO: Record column statistics - # NOTE: will need to aggregate over row groups. Access with - # written_file.metadata.row_group(i).column(j).statistics - stats = { - "numRecords": written_file.metadata.num_rows, - "minValues": {}, - "maxValues": {}, - "nullCount": {}, - } - - def iter_groups(metadata): - for i in range(metadata.num_row_groups): - yield metadata.row_group(i) - - # TODO: What do nested columns look like? - for column_idx in range(written_file.metadata.num_columns): - name = written_file.metadata.schema.names[column_idx] - # If stats missing, then we can't know aggregate stats - if all(group.column(column_idx).is_stats_set - for group in iter_groups(written_file.metadata)): - stats["nullCount"][name] = sum( - group.column(column_idx).statistics.null_count - for group in iter_groups(written_file.metadata) - ) - - # I assume for now this is based on data type, and thus is - # consistent between groups - if written_file.metadata.row_group(0).column(column_idx).statistics.has_min_max: - stats["minValues"][name] = min( - group.column(column_idx).statistics.min - for group in iter_groups(written_file.metadata) - ) - stats["maxValues"][name] = max( - group.column(column_idx).statistics.max - for group in iter_groups(written_file.metadata) - ) - - add_actions.append(AddAction( - written_file.path, - written_file.metadata.serialized_size, - partition_values, - int(datetime.now().timestamp()), - True, - json.dumps(stats, cls=DeltaJSONEncoder) - )) - - # TODO: Pass through filesystem? Do we need to transform the URI as well? + stats = get_file_stats_from_metadata(written_file.metadata) + + add_actions.append( + AddAction( + written_file.path, + written_file.metadata.serialized_size, + partition_values, + int(datetime.now().timestamp()), + True, + json.dumps(stats, cls=DeltaJSONEncoder), + ) + ) + ds.write_dataset( data, base_dir=table_uri, @@ -149,17 +123,11 @@ def iter_groups(metadata): partitioning=partitioning, schema=schema, file_visitor=visitor, - existing_data_behavior='overwrite_or_ignore', + existing_data_behavior="overwrite_or_ignore", ) if table is None: - _write_new_deltalake( - table_uri, - schema, - add_actions, - mode, - partition_by or [] - ) + _write_new_deltalake(table_uri, schema, add_actions, mode, partition_by or []) else: table._table.write( add_actions, @@ -177,4 +145,51 @@ def default(self, obj): elif isinstance(obj, datetime): return obj.isoformat() # Let the base class default method raise the TypeError - return json.JSONEncoder.default(self, obj) \ No newline at end of file + return json.JSONEncoder.default(self, obj) + + +def try_get_deltatable(table_uri: str) -> Optional[DeltaTable]: + try: + return DeltaTable(table_uri) + except PyDeltaTableError as err: + if "Not a Delta table" not in str(err): + raise + return None + + +def get_file_stats_from_metadata(metadata): + stats = { + "numRecords": metadata.num_rows, + "minValues": {}, + "maxValues": {}, + "nullCount": {}, + } + + def iter_groups(metadata): + for i in range(metadata.num_row_groups): + yield metadata.row_group(i) + + # TODO: What do nested columns look like? + for column_idx in range(metadata.num_columns): + name = metadata.schema.names[column_idx] + # If stats missing, then we can't know aggregate stats + if all( + group.column(column_idx).is_stats_set for group in iter_groups(metadata) + ): + stats["nullCount"][name] = sum( + group.column(column_idx).statistics.null_count + for group in iter_groups(metadata) + ) + + # I assume for now this is based on data type, and thus is + # consistent between groups + if metadata.row_group(0).column(column_idx).statistics.has_min_max: + stats["minValues"][name] = min( + group.column(column_idx).statistics.min + for group in iter_groups(metadata) + ) + stats["maxValues"][name] = max( + group.column(column_idx).statistics.max + for group in iter_groups(metadata) + ) + return stats diff --git a/python/src/lib.rs b/python/src/lib.rs index 264eef8077..78c5f92345 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -290,7 +290,10 @@ impl RawDeltaTable { ) -> PyResult<()> { let mode = save_mode_from_str(mode)?; - let mut actions: Vec = add_actions.iter().map(|add| Action::add(add.into())).collect(); + let mut actions: Vec = add_actions + .iter() + .map(|add| Action::add(add.into())) + .collect(); if let SaveMode::Overwrite = mode { // Remove all current files @@ -308,7 +311,9 @@ impl RawDeltaTable { } } - let mut transaction = self._table.create_transaction(Some(DeltaTransactionOptions::new(3))); + let mut transaction = self + ._table + .create_transaction(Some(DeltaTransactionOptions::new(3))); transaction.add_actions(actions); rt()? .block_on(transaction.commit(Some(DeltaOperation::Write { diff --git a/python/tests/test_schema.py b/python/tests/test_schema.py index 33455cb47e..296a5d8c5f 100644 --- a/python/tests/test_schema.py +++ b/python/tests/test_schema.py @@ -232,9 +232,7 @@ def test_schema_pyarrow_types(): } ) assert pyarrow_field.name == field_name - assert pyarrow_field.type == pyarrow.list_( - pyarrow.field("item", pyarrow.int32()) - ) + assert pyarrow_field.type == pyarrow.list_(pyarrow.field("item", pyarrow.int32())) assert pyarrow_field.metadata == metadata assert pyarrow_field.nullable is False diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index f4579aa68c..12e8d4597a 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -1,37 +1,45 @@ -from asyncore import write -from datetime import date, datetime, timedelta import os import pathlib -import pytest -import pyarrow as pa +from asyncore import write +from datetime import date, datetime, timedelta from decimal import Decimal +import pyarrow as pa +import pytest + from deltalake import DeltaTable, write_deltalake @pytest.fixture() def sample_data(): nrows = 5 - return pa.table({ - 'utf8': pa.array([str(x) for x in range(nrows)]), - 'int64': pa.array(list(range(nrows)), pa.int64()), - 'int32': pa.array(list(range(nrows)), pa.int32()), - 'int16': pa.array(list(range(nrows)), pa.int16()), - 'int8': pa.array(list(range(nrows)), pa.int8()), - 'float32': pa.array([float(x) for x in range(nrows)], pa.float32()), - 'float64': pa.array([float(x) for x in range(nrows)], pa.float64()), - 'bool': pa.array([x % 2 == 0 for x in range(nrows)]), - 'binary': pa.array([str(x).encode() for x in range(nrows)]), - 'decimal': pa.array([Decimal("10.000") + x for x in range(nrows)]), - 'date32': pa.array([date(2022, 1, 1) + timedelta(days=x) for x in range(nrows)]), - 'timestamp': pa.array([datetime(2022, 1, 1) + timedelta(hours=x) for x in range(nrows)]), - 'struct': pa.array([{'x': x, 'y': str(x)} for x in range(nrows)]), - 'list': pa.array([list(range(x)) for x in range(nrows)]), - # NOTE: https://github.com/apache/arrow-rs/issues/477 - #'map': pa.array([[(str(y), y) for y in range(x)] for x in range(nrows)], pa.map_(pa.string(), pa.int64())), - }) - - + return pa.table( + { + "utf8": pa.array([str(x) for x in range(nrows)]), + "int64": pa.array(list(range(nrows)), pa.int64()), + "int32": pa.array(list(range(nrows)), pa.int32()), + "int16": pa.array(list(range(nrows)), pa.int16()), + "int8": pa.array(list(range(nrows)), pa.int8()), + "float32": pa.array([float(x) for x in range(nrows)], pa.float32()), + "float64": pa.array([float(x) for x in range(nrows)], pa.float64()), + "bool": pa.array([x % 2 == 0 for x in range(nrows)]), + "binary": pa.array([str(x).encode() for x in range(nrows)]), + "decimal": pa.array([Decimal("10.000") + x for x in range(nrows)]), + "date32": pa.array( + [date(2022, 1, 1) + timedelta(days=x) for x in range(nrows)] + ), + "timestamp": pa.array( + [datetime(2022, 1, 1) + timedelta(hours=x) for x in range(nrows)] + ), + "struct": pa.array([{"x": x, "y": str(x)} for x in range(nrows)]), + "list": pa.array([list(range(x)) for x in range(nrows)]), + # NOTE: https://github.com/apache/arrow-rs/issues/477 + #'map': pa.array([[(str(y), y) for y in range(x)] for x in range(nrows)], pa.map_(pa.string(), pa.int64())), + } + ) + + +@pytest.mark.skip(reason="Waiting on #570") def test_handle_existing(tmp_path: pathlib.Path, sample_data: pa.Table): # if uri points to a non-empty directory that isn't a delta table, error tmp_path @@ -39,13 +47,11 @@ def test_handle_existing(tmp_path: pathlib.Path, sample_data: pa.Table): p.write_text("hello") with pytest.raises(OSError) as exception: - write_deltalake(str(tmp_path), sample_data, mode='overwrite') + write_deltalake(str(tmp_path), sample_data, mode="overwrite") assert "directory is not empty" in str(exception) -# round trip, no partitioning - def test_roundtrip_basic(tmp_path: pathlib.Path, sample_data): write_deltalake(str(tmp_path), sample_data) From 823206e50f7203601f9c6de34c41fdf708368ebe Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 1 Mar 2022 20:36:33 -0800 Subject: [PATCH 07/28] Add basic partitioning support --- python/deltalake/writer.py | 20 +++++++++++++---- python/tests/test_writer.py | 44 +++++++++++++++++++++++++++++++++---- 2 files changed, 56 insertions(+), 8 deletions(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 77ae1ad3b1..aa3fcdd161 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -9,7 +9,6 @@ import pyarrow.fs as pa_fs from deltalake import DeltaTable, PyDeltaTableError -from deltalake.fs import DeltaStorageHandler from .deltalake import RawDeltaTable from .deltalake import write_new_deltalake as _write_new_deltalake @@ -93,15 +92,15 @@ def write_deltalake( # assert len(filesystem.get_file_info(pa_fs.FileSelector(table_uri, allow_not_found=True))) == 0 if partition_by: - partitioning = ds.partitioning(field_names=partition_by, flavor="hive") + partition_schema = pa.schema([schema.field(name) for name in partition_by]) + partitioning = ds.partitioning(partition_schema, flavor="hive") else: partitioning = None add_actions: List[AddAction] = [] def visitor(written_file): - # TODO: Get partition values from path - partition_values = {} + partition_values = get_partitions_from_path(table_uri, written_file.path) stats = get_file_stats_from_metadata(written_file.metadata) add_actions.append( @@ -157,6 +156,19 @@ def try_get_deltatable(table_uri: str) -> Optional[DeltaTable]: return None +def get_partitions_from_path(base_path: str, path: str) -> Dict[str, Optional[str]]: + path = path.split(base_path, maxsplit=1)[1] + parts = path.split("/") + parts.pop() # remove filename + out = {} + for part in parts: + if part == "": + continue + key, value = part.split("=", maxsplit=1) + out[key] = value + return out + + def get_file_stats_from_metadata(metadata): stats = { "numRecords": metadata.num_rows, diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index 12e8d4597a..0a7a1e8198 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -1,10 +1,10 @@ import os import pathlib -from asyncore import write from datetime import date, datetime, timedelta from decimal import Decimal import pyarrow as pa +import pyarrow.compute as pc import pytest from deltalake import DeltaTable, write_deltalake @@ -52,7 +52,7 @@ def test_handle_existing(tmp_path: pathlib.Path, sample_data: pa.Table): assert "directory is not empty" in str(exception) -def test_roundtrip_basic(tmp_path: pathlib.Path, sample_data): +def test_roundtrip_basic(tmp_path: pathlib.Path, sample_data: pa.Table): write_deltalake(str(tmp_path), sample_data) assert ("0" * 20 + ".json") in os.listdir(tmp_path / "_delta_log") @@ -64,9 +64,45 @@ def test_roundtrip_basic(tmp_path: pathlib.Path, sample_data): assert table == sample_data -# round trip, one partitioning, parameterized part column +@pytest.mark.parametrize( + "column", + [ + "utf8", + "int64", + "int32", + "int16", + "int8", + "float32", + "float64", + "bool", + "binary", + # TODO: Add decimal and date32 once #565 is merged + # "decimal", + # "date32", + ], +) +def test_roundtrip_partitioned( + tmp_path: pathlib.Path, sample_data: pa.Table, column: str +): + write_deltalake(str(tmp_path), sample_data, partition_by=[column]) -# round trip, nested partitioning + delta_table = DeltaTable(str(tmp_path)) + assert delta_table.pyarrow_schema() == sample_data.schema + + table = delta_table.to_pyarrow_table() + table = table.take(pc.sort_indices(table["int64"])) + assert table == sample_data + + +def test_roundtrip_multi_partitioned(tmp_path: pathlib.Path, sample_data: pa.Table): + write_deltalake(str(tmp_path), sample_data, partition_by=["int32", "bool"]) + + delta_table = DeltaTable(str(tmp_path)) + assert delta_table.pyarrow_schema() == sample_data.schema + + table = delta_table.to_pyarrow_table() + table = table.take(pc.sort_indices(table["int64"])) + assert table == sample_data # test behaviors From 73423864b45b8efe28caacd880311431c1b2adb1 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 2 Mar 2022 19:37:16 -0800 Subject: [PATCH 08/28] Test different modes --- python/deltalake/writer.py | 2 +- python/tests/test_writer.py | 19 ++++++++++++++++++- 2 files changed, 19 insertions(+), 2 deletions(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index aa3fcdd161..8fcd5b79f2 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -128,7 +128,7 @@ def visitor(written_file): if table is None: _write_new_deltalake(table_uri, schema, add_actions, mode, partition_by or []) else: - table._table.write( + table._table.create_write_transaction( add_actions, mode, partition_by or [], diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index 0a7a1e8198..ca1a56765c 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -105,4 +105,21 @@ def test_roundtrip_multi_partitioned(tmp_path: pathlib.Path, sample_data: pa.Tab assert table == sample_data -# test behaviors +def test_write_modes(tmp_path: pathlib.Path, sample_data: pa.Table): + path = str(tmp_path) + + write_deltalake(path, sample_data) + assert DeltaTable(path).to_pyarrow_table() == sample_data + + with pytest.raises(AssertionError): + write_deltalake(path, sample_data, mode="error") + + write_deltalake(path, sample_data, mode="ignore") + assert ("0" * 19 + "1.json") not in os.listdir(tmp_path / "_delta_log") + + write_deltalake(path, sample_data, mode="append") + expected = pa.concat_tables([sample_data, sample_data]) + assert DeltaTable(path).to_pyarrow_table() == expected + + write_deltalake(path, sample_data, mode="overwrite") + assert DeltaTable(path).to_pyarrow_table() == sample_data From 87d5238b14ca6a206ca57dd87bb3367f8a163388 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 2 Mar 2022 19:58:44 -0800 Subject: [PATCH 09/28] Improve code coverage --- python/deltalake/writer.py | 10 +--------- python/tests/__init__.py | 0 python/tests/test_writer.py | 24 ++++++++++++++++++++++++ 3 files changed, 25 insertions(+), 9 deletions(-) create mode 100644 python/tests/__init__.py diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 8fcd5b79f2..8267ff2880 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -24,14 +24,6 @@ class AddAction: stats: str -def create_empty_table( - uri: str, schema: pa.Schema, partition_columns: List[str] -) -> DeltaTable: - return DeltaTable._from_raw( - RawDeltaTable.create_empty(uri, schema, partition_columns) - ) - - def write_deltalake( table_or_uri: Union[str, DeltaTable], data: Union[ @@ -83,7 +75,7 @@ def write_deltalake( current_version = table.version() if partition_by: - assert partition_by == table.metadata.partition_columns + assert partition_by == table.metadata().partition_columns else: # creating a new table current_version = -1 diff --git a/python/tests/__init__.py b/python/tests/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index ca1a56765c..7ba44301f3 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -39,6 +39,13 @@ def sample_data(): ) +@pytest.fixture() +def existing_table(tmp_path: pathlib.Path, sample_data: pa.Table): + path = str(tmp_path) + write_deltalake(path, sample_data) + return DeltaTable(path) + + @pytest.mark.skip(reason="Waiting on #570") def test_handle_existing(tmp_path: pathlib.Path, sample_data: pa.Table): # if uri points to a non-empty directory that isn't a delta table, error @@ -123,3 +130,20 @@ def test_write_modes(tmp_path: pathlib.Path, sample_data: pa.Table): write_deltalake(path, sample_data, mode="overwrite") assert DeltaTable(path).to_pyarrow_table() == sample_data + + +def test_writer_with_table(existing_table: DeltaTable, sample_data: pa.Table): + write_deltalake(existing_table, sample_data, mode="overwrite") + existing_table.update_incremental() + assert existing_table.to_pyarrow_table() == sample_data + + +def test_fails_wrong_partitioning(existing_table: DeltaTable, sample_data: pa.Table): + with pytest.raises(AssertionError): + write_deltalake(existing_table, sample_data, mode="append", partition_by="int32") + + +def test_write_iterator(tmp_path: pathlib.Path, existing_table: DeltaTable, sample_data: pa.Table): + batches = existing_table.to_pyarrow_dataset().to_batches() + write_deltalake(str(tmp_path), batches) + assert DeltaTable(str(tmp_path)).to_pyarrow_table() == sample_data From 28dd5adcf3338c7d23168e61de67d507f97eb005 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Wed, 2 Mar 2022 20:37:14 -0800 Subject: [PATCH 10/28] Get RBR working --- python/deltalake/table.py | 6 ------ python/deltalake/writer.py | 36 ++++++++++++++++++++---------------- python/tests/test_writer.py | 24 +++++++++++++++++++++--- 3 files changed, 41 insertions(+), 25 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 98f5c1daf7..930c392314 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -87,12 +87,6 @@ def __init__( ) self._metadata = Metadata(self._table) - @classmethod - def _from_raw(cls, raw_table: RawDeltaTable) -> "DeltaTable": - self = cls.__new__(cls) - self._table = raw_table - self._metadata = Metadata(self._table) - @classmethod def from_data_catalog( cls, diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 8267ff2880..4ee89fdbcd 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -2,15 +2,15 @@ import uuid from dataclasses import dataclass from datetime import date, datetime -from typing import Dict, Iterable, List, Literal, Optional, Union +from typing import Any, Dict, Iterable, Iterator, List, Literal, Optional, Union import pyarrow as pa import pyarrow.dataset as ds import pyarrow.fs as pa_fs +from pyarrow.lib import RecordBatchReader from deltalake import DeltaTable, PyDeltaTableError -from .deltalake import RawDeltaTable from .deltalake import write_new_deltalake as _write_new_deltalake @@ -26,14 +26,12 @@ class AddAction: def write_deltalake( table_or_uri: Union[str, DeltaTable], - data: Union[ - pa.Table, pa.RecordBatch, Iterable[pa.RecordBatch] - ], # TODO: there a type for a RecordBatchReader? + data: Union[pa.Table, pa.RecordBatch, Iterable[pa.RecordBatch], RecordBatchReader], schema: Optional[pa.Schema] = None, partition_by: Optional[Iterable[str]] = None, filesystem: Optional[pa_fs.FileSystem] = None, mode: Literal["error", "append", "overwrite", "ignore"] = "error", -): +) -> None: """Write to a Delta Lake table If the table does not already exist, it will be created. @@ -50,10 +48,13 @@ def write_deltalake( replace table with new data. If 'ignore', will not write anything if table already exists. """ - if isinstance(data, Iterable) and schema is None: - return ValueError("You must provide schema if data is Iterable") - elif not isinstance(data, Iterable): - schema = data.schema + if schema is None: + if isinstance(data, RecordBatchReader): + schema = data.schema + elif isinstance(data, Iterable): + raise ValueError("You must provide schema if data is Iterable") + else: + schema = data.schema if isinstance(table_or_uri, str): table = try_get_deltatable(table_or_uri) @@ -91,7 +92,7 @@ def write_deltalake( add_actions: List[AddAction] = [] - def visitor(written_file): + def visitor(written_file: Any) -> None: partition_values = get_partitions_from_path(table_uri, written_file.path) stats = get_file_stats_from_metadata(written_file.metadata) @@ -112,7 +113,8 @@ def visitor(written_file): basename_template=f"{current_version + 1}-{uuid.uuid4()}-{{i}}.parquet", format="parquet", partitioning=partitioning, - schema=schema, + # It will not accept a schema if using a RBR + schema=schema if not isinstance(data, RecordBatchReader) else None, file_visitor=visitor, existing_data_behavior="overwrite_or_ignore", ) @@ -128,7 +130,7 @@ def visitor(written_file): class DeltaJSONEncoder(json.JSONEncoder): - def default(self, obj): + def default(self, obj: Any) -> Any: if isinstance(obj, bytes): return obj.decode("unicode_escape") elif isinstance(obj, date): @@ -148,7 +150,7 @@ def try_get_deltatable(table_uri: str) -> Optional[DeltaTable]: return None -def get_partitions_from_path(base_path: str, path: str) -> Dict[str, Optional[str]]: +def get_partitions_from_path(base_path: str, path: str) -> Dict[str, str]: path = path.split(base_path, maxsplit=1)[1] parts = path.split("/") parts.pop() # remove filename @@ -161,7 +163,9 @@ def get_partitions_from_path(base_path: str, path: str) -> Dict[str, Optional[st return out -def get_file_stats_from_metadata(metadata): +def get_file_stats_from_metadata( + metadata: Any, +) -> Dict[str, Union[int, Dict[str, Any]]]: stats = { "numRecords": metadata.num_rows, "minValues": {}, @@ -169,7 +173,7 @@ def get_file_stats_from_metadata(metadata): "nullCount": {}, } - def iter_groups(metadata): + def iter_groups(metadata: Any) -> Iterator[Any]: for i in range(metadata.num_row_groups): yield metadata.row_group(i) diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index 7ba44301f3..53206de227 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -6,6 +6,7 @@ import pyarrow as pa import pyarrow.compute as pc import pytest +from pyarrow.lib import RecordBatchReader from deltalake import DeltaTable, write_deltalake @@ -140,10 +141,27 @@ def test_writer_with_table(existing_table: DeltaTable, sample_data: pa.Table): def test_fails_wrong_partitioning(existing_table: DeltaTable, sample_data: pa.Table): with pytest.raises(AssertionError): - write_deltalake(existing_table, sample_data, mode="append", partition_by="int32") + write_deltalake( + existing_table, sample_data, mode="append", partition_by="int32" + ) -def test_write_iterator(tmp_path: pathlib.Path, existing_table: DeltaTable, sample_data: pa.Table): +def test_write_iterator( + tmp_path: pathlib.Path, existing_table: DeltaTable, sample_data: pa.Table +): + batches = existing_table.to_pyarrow_dataset().to_batches() + with pytest.raises(ValueError): + write_deltalake(str(tmp_path), batches, mode="overwrite") + + write_deltalake(str(tmp_path), batches, schema=sample_data.schema, mode="overwrite") + assert DeltaTable(str(tmp_path)).to_pyarrow_table() == sample_data + + +def test_write_recordbatchreader( + tmp_path: pathlib.Path, existing_table: DeltaTable, sample_data: pa.Table +): batches = existing_table.to_pyarrow_dataset().to_batches() - write_deltalake(str(tmp_path), batches) + reader = RecordBatchReader.from_batches(sample_data.schema, batches) + + write_deltalake(str(tmp_path), reader, mode="overwrite") assert DeltaTable(str(tmp_path)).to_pyarrow_table() == sample_data From 86883cdde3fa3fa2a4620d2ba80db47703e84c26 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 6 Mar 2022 13:42:38 -0800 Subject: [PATCH 11/28] Reference pyarrow writer umbrella issue in readme --- README.adoc | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.adoc b/README.adoc index db8c98e744..aa902a8303 100644 --- a/README.adoc +++ b/README.adoc @@ -73,7 +73,7 @@ link:https://github.com/rajasekarv/vega[vega], etc. It also provides bindings to | High-level file writer | -| +| link:https://github.com/delta-io/delta-rs/issues/542[#542] | | Optimize From 7b2a8e911c049aeb67863e891b31a685977b0495 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 6 Mar 2022 13:43:02 -0800 Subject: [PATCH 12/28] Update docs and link to other projects --- python/docs/source/api_reference.rst | 2 ++ python/docs/source/conf.py | 13 +++++++- python/docs/source/usage.rst | 47 ++++++++++------------------ 3 files changed, 30 insertions(+), 32 deletions(-) diff --git a/python/docs/source/api_reference.rst b/python/docs/source/api_reference.rst index 0fcb6579c3..6ea31b4b44 100644 --- a/python/docs/source/api_reference.rst +++ b/python/docs/source/api_reference.rst @@ -7,6 +7,8 @@ DeltaTable .. automodule:: deltalake.table :members: +.. autofunction:: deltalake.write_deltalake + DeltaSchema ----------- diff --git a/python/docs/source/conf.py b/python/docs/source/conf.py index 5fbd59eb17..90cb75c000 100644 --- a/python/docs/source/conf.py +++ b/python/docs/source/conf.py @@ -42,7 +42,12 @@ def get_release_version() -> str: # Add any Sphinx extension module names here, as strings. They can be # extensions coming with Sphinx (named 'sphinx.ext.*') or your custom # ones. -extensions = ["sphinx_rtd_theme", "sphinx.ext.autodoc", "edit_on_github"] +extensions = [ + "sphinx_rtd_theme", + "sphinx.ext.autodoc", + "sphinx.ext.intersphinx", + "edit_on_github", +] autodoc_typehints = "description" nitpicky = True nitpick_ignore = [ @@ -84,3 +89,9 @@ def get_release_version() -> str: edit_on_github_project = "delta-io/delta-rs" edit_on_github_branch = "main" page_source_prefix = "python/docs/source" + + +intersphinx_mapping = { + "pyarrow": ("https://arrow.apache.org/docs/", None), + "pyspark": ("https://spark.apache.org/docs/latest/api/python/", None), +} diff --git a/python/docs/source/usage.rst b/python/docs/source/usage.rst index f42d2eae22..249eee01c2 100644 --- a/python/docs/source/usage.rst +++ b/python/docs/source/usage.rst @@ -328,7 +328,13 @@ Optimizing tables is not currently supported. Writing Delta Tables -------------------- -For overwrites and appends, use :func:`write_deltalake`. If the table does not +.. py:currentmodule:: deltalake + +.. warning:: + The writer is currently *experimental*. Please use on test data first, not + on production data. Report any issues at https://github.com/delta-io/delta-rs/issues. + +For overwrites and appends, use :py:func:`write_deltalake`. If the table does not already exist, it will be created. The ``data`` parameter will accept a Pandas DataFrame, a PyArrow Table, or an iterator of PyArrow Record Batches. @@ -338,37 +344,16 @@ DataFrame, a PyArrow Table, or an iterator of PyArrow Record Batches. >>> df = pd.DataFrame({'x': [1, 2, 3]}) >>> write_deltalake('path/to/table', df) -By default, writes append to the table. To overwrite, pass in ``mode='overwrite'``: - -.. code-block:: python - - >>> write_deltalake('path/to/table', df, mode='overwrite') - -If you have a :class:`DeltaTable` object, you can also call the :meth:`DeltaTable.write` -method: - -.. code-block:: python - - >>> DeltaTable('path/to/table').write(df, mode='overwrite') - -To delete rows based on an expression, use :meth:`DeltaTable.delete` - -.. code-block:: python - - >>> from deltalake.writer import delete_deltalake - >>> import pyarrow.dataset as ds - >>> DeltaTable('path/to/table').delete(ds.field('x') == 2) +.. note:: + :py:func:`write_deltalake` accepts a Pandas DataFrame, but will convert it to + a Arrow table before writing. See caveats in :doc:`pyarrow:python/pandas`. -To update a subset of rows with new values, use +By default, writes create a new table and error if it already exists. This is +controlled by the ``mode`` parameter, which mirrors the behavior of Spark's +:py:meth:`pyspark.sql.DataFrameWriter.saveAsTable` DataFrame method. To overwrite pass in ``mode='overwrite'`` and +to append pass in ``mode='append'``: .. code-block:: python - >>> from deltalake.writer import delete_deltalake - >>> import pyarrow.dataset as ds - >>> # Increment y where x = 2 - >>> DeltaTable('path/to/table').update( - where_expr=ds.field('x') == 2, - set_values={ - 'y': ds.field('y') + 1 - } - ) \ No newline at end of file + >>> write_deltalake('path/to/table', df, mode='overwrite') + >>> write_deltalake('path/to/table', df, mode='append') From 1c68d571a49f2f338321bb44ffbf670866a5b7ec Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 6 Mar 2022 13:43:15 -0800 Subject: [PATCH 13/28] Add Pandas support --- python/deltalake/writer.py | 12 +++++++++++- python/tests/test_writer.py | 12 ++++++++++++ 2 files changed, 23 insertions(+), 1 deletion(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 4ee89fdbcd..575c7e77b5 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -4,6 +4,7 @@ from datetime import date, datetime from typing import Any, Dict, Iterable, Iterator, List, Literal, Optional, Union +import pandas as pd import pyarrow as pa import pyarrow.dataset as ds import pyarrow.fs as pa_fs @@ -26,7 +27,13 @@ class AddAction: def write_deltalake( table_or_uri: Union[str, DeltaTable], - data: Union[pa.Table, pa.RecordBatch, Iterable[pa.RecordBatch], RecordBatchReader], + data: Union[ + pd.DataFrame, + pa.Table, + pa.RecordBatch, + Iterable[pa.RecordBatch], + RecordBatchReader, + ], schema: Optional[pa.Schema] = None, partition_by: Optional[Iterable[str]] = None, filesystem: Optional[pa_fs.FileSystem] = None, @@ -48,6 +55,9 @@ def write_deltalake( replace table with new data. If 'ignore', will not write anything if table already exists. """ + if isinstance(data, pd.DataFrame): + data = pa.Table.from_pandas(data) + if schema is None: if isinstance(data, RecordBatchReader): schema = data.schema diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index 53206de227..2cbc3142ee 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -6,6 +6,7 @@ import pyarrow as pa import pyarrow.compute as pc import pytest +from pandas.testing import assert_frame_equal from pyarrow.lib import RecordBatchReader from deltalake import DeltaTable, write_deltalake @@ -146,6 +147,17 @@ def test_fails_wrong_partitioning(existing_table: DeltaTable, sample_data: pa.Ta ) +def test_write_pandas(tmp_path: pathlib.Path, sample_data: pa.Table): + # When timestamp is converted to Pandas, it gets casted to ns resolution, + # but Delta Lake schemas only support us resolution. + sample_pandas = sample_data.to_pandas().drop(["timestamp"], axis=1) + write_deltalake(str(tmp_path), sample_pandas) + + delta_table = DeltaTable(str(tmp_path)) + df = delta_table.to_pandas() + assert_frame_equal(df, sample_pandas) + + def test_write_iterator( tmp_path: pathlib.Path, existing_table: DeltaTable, sample_data: pa.Table ): From 1bba642ad416303af7b341426e462c173405363c Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 8 Mar 2022 12:20:50 -0800 Subject: [PATCH 14/28] Test writer stats and partitioning --- python/deltalake/table.py | 2 +- python/deltalake/writer.py | 5 ++- python/tests/test_writer.py | 65 +++++++++++++++++++++++++++++++++++++ 3 files changed, 68 insertions(+), 4 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 930c392314..1c8cc8b4e7 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -1,7 +1,7 @@ import json import warnings from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, List, Literal, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union import pyarrow import pyarrow.fs as pa_fs diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 575c7e77b5..a1d4ee6095 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -35,7 +35,7 @@ def write_deltalake( RecordBatchReader, ], schema: Optional[pa.Schema] = None, - partition_by: Optional[Iterable[str]] = None, + partition_by: Optional[List[str]] = None, filesystem: Optional[pa_fs.FileSystem] = None, mode: Literal["error", "append", "overwrite", "ignore"] = "error", ) -> None: @@ -187,9 +187,8 @@ def iter_groups(metadata: Any) -> Iterator[Any]: for i in range(metadata.num_row_groups): yield metadata.row_group(i) - # TODO: What do nested columns look like? for column_idx in range(metadata.num_columns): - name = metadata.schema.names[column_idx] + name = metadata.row_group(0).column(column_idx).path_in_schema # If stats missing, then we can't know aggregate stats if all( group.column(column_idx).is_stats_set for group in iter_groups(metadata) diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index 2cbc3142ee..ed46760235 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -1,7 +1,10 @@ +from ast import Assert +import json import os import pathlib from datetime import date, datetime, timedelta from decimal import Decimal +from attr import validate import pyarrow as pa import pyarrow.compute as pc @@ -177,3 +180,65 @@ def test_write_recordbatchreader( write_deltalake(str(tmp_path), reader, mode="overwrite") assert DeltaTable(str(tmp_path)).to_pyarrow_table() == sample_data + + +def test_writer_partitioning(tmp_path: pathlib.Path): + test_strings = [ + "a=b", + "hello world", + "hello%20world" + ] + data = pa.table({ + "p": pa.array(test_strings), + "x": pa.array(range(len(test_strings))) + }) + + write_deltalake(str(tmp_path), data) + + assert DeltaTable(str(tmp_path)).to_pyarrow_table() == data + + +# TODO: We should have sample data with nulls +def validate_writer_stats(table: DeltaTable, data: pa.Table): + log_path = table._table.table_uri() + "/_delta_log/" + ("0" * 20 + ".json") + + # Should only have single add entry + for line in open(log_path, 'r').readlines(): + data = json.loads(line) + + if 'add' in data: + stats = json.loads(data['add']['stats']) + break + else: + raise AssertionError("No add action found!") + + def get_original_array(col_name): + if col_name.startswith("struct."): + field = col_name.split('.', maxsplit=1)[1] + assert field in [f.name for f in data['struct'].type] + field_idx = data['struct'].type.get_field_index(field) + return data['struct'].chunk(0).field(field_idx) + elif col_name.startswith("list."): + list_name = col_name.split(".", maxsplit=1)[0] + assert list_name in data.column_names + return data[list_name].chunk(0).flatten() + else: + assert col_name in data.column_names + return data[col_name] + + for col, null_count in stats['nullCount'].items(): + assert null_count == get_original_array(col).null_count + + for col, col_min in stats['minValues'].items(): + assert col_min == min(get_original_array(col)) + + for col, col_max in stats['maxValues'].items(): + assert col_max == max(get_original_array(col)) + + +def test_writer_stats(table: DeltaTable, data: pa.Table): + validate_writer_stats(table, data) + + +def test_writer_null_stats(tmp_path: pathlib.Path): + pass \ No newline at end of file From 2a77c34b2bbc379dbacf0720e91b534bb573c3b6 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 11 Mar 2022 19:54:51 -0800 Subject: [PATCH 15/28] Test statistics --- python/deltalake/writer.py | 15 +++++ python/tests/test_writer.py | 116 ++++++++++++++++++++++-------------- 2 files changed, 86 insertions(+), 45 deletions(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index a1d4ee6095..4809ff2557 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -2,6 +2,7 @@ import uuid from dataclasses import dataclass from datetime import date, datetime +from decimal import Decimal from typing import Any, Dict, Iterable, Iterator, List, Literal, Optional, Union import pandas as pd @@ -147,6 +148,8 @@ def default(self, obj: Any) -> Any: return obj.isoformat() elif isinstance(obj, datetime): return obj.isoformat() + elif isinstance(obj, Decimal): + return str(obj) # Let the base class default method raise the TypeError return json.JSONEncoder.default(self, obj) @@ -201,6 +204,18 @@ def iter_groups(metadata: Any) -> Iterator[Any]: # I assume for now this is based on data type, and thus is # consistent between groups if metadata.row_group(0).column(column_idx).statistics.has_min_max: + # Min and Max are recorded in physical type, not logical type + # https://stackoverflow.com/questions/66753485/decoding-parquet-min-max-statistics-for-decimal-type + # TODO: Add logic to decode physical type for DATE, DECIMAL + logical_type = ( + metadata.row_group(0) + .column(column_idx) + .statistics.logical_type.type + ) + # + if logical_type not in ["STRING", "INT", "TIMESTAMP", "NONE"]: + continue + # import pdb; pdb.set_trace() stats["minValues"][name] = min( group.column(column_idx).statistics.min for group in iter_groups(metadata) diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index ed46760235..cef0072321 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -1,10 +1,9 @@ -from ast import Assert import json import os import pathlib +from ast import Assert from datetime import date, datetime, timedelta from decimal import Decimal -from attr import validate import pyarrow as pa import pyarrow.compute as pc @@ -37,7 +36,7 @@ def sample_data(): [datetime(2022, 1, 1) + timedelta(hours=x) for x in range(nrows)] ), "struct": pa.array([{"x": x, "y": str(x)} for x in range(nrows)]), - "list": pa.array([list(range(x)) for x in range(nrows)]), + "list": pa.array([list(range(x + 1)) for x in range(nrows)]), # NOTE: https://github.com/apache/arrow-rs/issues/477 #'map': pa.array([[(str(y), y) for y in range(x)] for x in range(nrows)], pa.map_(pa.string(), pa.int64())), } @@ -183,62 +182,89 @@ def test_write_recordbatchreader( def test_writer_partitioning(tmp_path: pathlib.Path): - test_strings = [ - "a=b", - "hello world", - "hello%20world" - ] - data = pa.table({ - "p": pa.array(test_strings), - "x": pa.array(range(len(test_strings))) - }) - + test_strings = ["a=b", "hello world", "hello%20world"] + data = pa.table( + {"p": pa.array(test_strings), "x": pa.array(range(len(test_strings)))} + ) + write_deltalake(str(tmp_path), data) assert DeltaTable(str(tmp_path)).to_pyarrow_table() == data -# TODO: We should have sample data with nulls -def validate_writer_stats(table: DeltaTable, data: pa.Table): +def get_stats(table: DeltaTable): log_path = table._table.table_uri() + "/_delta_log/" + ("0" * 20 + ".json") # Should only have single add entry - for line in open(log_path, 'r').readlines(): - data = json.loads(line) + for line in open(log_path, "r").readlines(): + log_entry = json.loads(line) - if 'add' in data: - stats = json.loads(data['add']['stats']) - break + if "add" in log_entry: + return json.loads(log_entry["add"]["stats"]) else: raise AssertionError("No add action found!") - - def get_original_array(col_name): - if col_name.startswith("struct."): - field = col_name.split('.', maxsplit=1)[1] - assert field in [f.name for f in data['struct'].type] - field_idx = data['struct'].type.get_field_index(field) - return data['struct'].chunk(0).field(field_idx) - elif col_name.startswith("list."): - list_name = col_name.split(".", maxsplit=1)[0] - assert list_name in data.column_names - return data[list_name].chunk(0).flatten() - else: - assert col_name in data.column_names - return data[col_name] - - for col, null_count in stats['nullCount'].items(): - assert null_count == get_original_array(col).null_count - for col, col_min in stats['minValues'].items(): - assert col_min == min(get_original_array(col)) - for col, col_max in stats['maxValues'].items(): - assert col_max == max(get_original_array(col)) +def test_writer_stats(existing_table: DeltaTable, sample_data: pa.Table): + stats = get_stats(existing_table) + + assert stats["numRecords"] == sample_data.num_rows + + assert all(null_count == 0 for null_count in stats["nullCount"].values()) + + expected_mins = { + "utf8": "0", + "int64": 0, + "int32": 0, + "int16": 0, + "int8": 0, + "float32": 0.0, + "float64": 0.0, + "bool": False, + "binary": "0", + # TODO: Writer needs special decoding for decimal and date32. + #'decimal': '10.000', + # "date32": '2022-01-01', + "timestamp": "2022-01-01T00:00:00", + "struct.x": 0, + "struct.y": "0", + "list.list.item": 0, + } + assert stats["minValues"] == expected_mins + + expected_maxs = { + "utf8": "4", + "int64": 4, + "int32": 4, + "int16": 4, + "int8": 4, + "float32": 4.0, + "float64": 4.0, + "bool": True, + "binary": "4", + #'decimal': '40.000', + # "date32": '2022-01-04', + "timestamp": "2022-01-01T04:00:00", + "struct.x": 4, + "struct.y": "4", + "list.list.item": 4, + } + assert stats["maxValues"] == expected_maxs -def test_writer_stats(table: DeltaTable, data: pa.Table): - validate_writer_stats(table, data) +def test_writer_null_stats(tmp_path: pathlib.Path): + data = pa.table( + { + "int32": pa.array([1, None, 2, None], pa.int32()), + "float64": pa.array([1.0, None, None, None], pa.float64()), + "str": pa.array([None] * 4, pa.string()), + } + ) + path = str(tmp_path) + write_deltalake(path, data) + table = DeltaTable(path) + stats = get_stats(table) -def test_writer_null_stats(tmp_path: pathlib.Path): - pass \ No newline at end of file + expected_nulls = {"int32": 2, "float64": 3, "str": 4} + assert stats["nullCount"] == expected_nulls From 237dff39022d43282cf3b57b2909d6b449746b4c Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 11 Mar 2022 20:22:12 -0800 Subject: [PATCH 16/28] Simplify tests since we can now roundtrip --- .../_delta_log/.00000000000000000000.json.crc | Bin 16 -> 0 bytes .../_delta_log/00000000000000000000.json | 4 ---- ...4007-b5ec-511b932751ea.c000.snappy.parquet.crc | Bin 12 -> 0 bytes ...903-4007-b5ec-511b932751ea.c000.snappy.parquet | Bin 500 -> 0 bytes .../_delta_log/.00000000000000000000.json.crc | Bin 16 -> 0 bytes .../_delta_log/00000000000000000000.json | 4 ---- ...44ea-bbfa-0479c4c1e704.c000.snappy.parquet.crc | Bin 12 -> 0 bytes ...9ce-44ea-bbfa-0479c4c1e704.c000.snappy.parquet | Bin 500 -> 0 bytes python/tests/test_table_read.py | 14 -------------- python/tests/test_writer.py | 4 +--- 10 files changed, 1 insertion(+), 25 deletions(-) delete mode 100644 python/tests/data/date_partitioned_df/_delta_log/.00000000000000000000.json.crc delete mode 100644 python/tests/data/date_partitioned_df/_delta_log/00000000000000000000.json delete mode 100644 python/tests/data/date_partitioned_df/date=2021-01-01/.part-00000-6ae76612-3903-4007-b5ec-511b932751ea.c000.snappy.parquet.crc delete mode 100644 python/tests/data/date_partitioned_df/date=2021-01-01/part-00000-6ae76612-3903-4007-b5ec-511b932751ea.c000.snappy.parquet delete mode 100644 python/tests/data/timestamp_partitioned_df/_delta_log/.00000000000000000000.json.crc delete mode 100644 python/tests/data/timestamp_partitioned_df/_delta_log/00000000000000000000.json delete mode 100644 python/tests/data/timestamp_partitioned_df/date=2021-01-01 00%3A00%3A00/.part-00000-6177a755-69ce-44ea-bbfa-0479c4c1e704.c000.snappy.parquet.crc delete mode 100644 python/tests/data/timestamp_partitioned_df/date=2021-01-01 00%3A00%3A00/part-00000-6177a755-69ce-44ea-bbfa-0479c4c1e704.c000.snappy.parquet diff --git a/python/tests/data/date_partitioned_df/_delta_log/.00000000000000000000.json.crc b/python/tests/data/date_partitioned_df/_delta_log/.00000000000000000000.json.crc deleted file mode 100644 index f141a1d1b77c482f0540175c8e170192b2f4e1a5..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 16 XcmYc;N@ieSU}DHPkYgZgvXTV=Aj$*% diff --git a/python/tests/data/date_partitioned_df/_delta_log/00000000000000000000.json b/python/tests/data/date_partitioned_df/_delta_log/00000000000000000000.json deleted file mode 100644 index 9c01cf24d8..0000000000 --- a/python/tests/data/date_partitioned_df/_delta_log/00000000000000000000.json +++ /dev/null @@ -1,4 +0,0 @@ -{"protocol":{"minReaderVersion":1,"minWriterVersion":2}} -{"metaData":{"id":"588135b2-b298-4d9f-aab6-6dd9bf90d575","format":{"provider":"parquet","options":{}},"schemaString":"{\"type\":\"struct\",\"fields\":[{\"name\":\"id\",\"type\":\"long\",\"nullable\":true,\"metadata\":{}},{\"name\":\"date\",\"type\":\"date\",\"nullable\":true,\"metadata\":{}}]}","partitionColumns":["date"],"configuration":{},"createdTime":1645893400586}} -{"add":{"path":"date=2021-01-01/part-00000-6ae76612-3903-4007-b5ec-511b932751ea.c000.snappy.parquet","partitionValues":{"date":"2021-01-01"},"size":500,"modificationTime":1645893404567,"dataChange":true}} -{"commitInfo":{"timestamp":1645893404671,"operation":"WRITE","operationParameters":{"mode":"ErrorIfExists","partitionBy":"[\"date\"]"},"isolationLevel":"Serializable","isBlindAppend":true,"operationMetrics":{"numFiles":"1","numOutputRows":"5","numOutputBytes":"500"},"engineInfo":"Apache-Spark/3.2.1 Delta-Lake/1.1.0"}} diff --git a/python/tests/data/date_partitioned_df/date=2021-01-01/.part-00000-6ae76612-3903-4007-b5ec-511b932751ea.c000.snappy.parquet.crc b/python/tests/data/date_partitioned_df/date=2021-01-01/.part-00000-6ae76612-3903-4007-b5ec-511b932751ea.c000.snappy.parquet.crc deleted file mode 100644 index 3271be8603d6a86d6b0c806f614ee9ae2315b70b..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 12 TcmYc;N@ieSU}9*k3_1e<5^n=n diff --git a/python/tests/data/date_partitioned_df/date=2021-01-01/part-00000-6ae76612-3903-4007-b5ec-511b932751ea.c000.snappy.parquet b/python/tests/data/date_partitioned_df/date=2021-01-01/part-00000-6ae76612-3903-4007-b5ec-511b932751ea.c000.snappy.parquet deleted file mode 100644 index c59e5b21d81dbadd3f198c879be88a813350a5eb..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 500 zcmZWm%SyvQ6un7n2}KuzGt5B1GGI}tgUP6UPy~0ymAG)DBGY85!L+GKT9Hy$uKXnN zAN&&W7reEtU3fQh&$*9thI@W`;SnI$q(i>H-apq|Qbt%J4uE7f02H=i!vbr=v8ZLW z3&_l(IDdd|+MHqUJTP4rh|MVgcd0|hm;mBhXFVJ0^x-xO?oD448%S}-W_A<;EpwN< zRmUYoe&j%j&sbv9GJUh?xazc5i&ttCcK_s7ENbEhP!E89SY(U7kQ11#DOVjj-a=0` z#**|->Y+v-^4F3an>34(0b5hjmmaXae;wIlEYbbr$mB9jo@C$TI@GylmlKgc4~=Mv zEKf&4E^49|nK;Zu>uMqfx<4yLA<~PsI2qg_8jRvtcVn#Ln5l{7LeZK`r#DkzXA8Og zep*@ht9a7$rC-_Yj-oiM)ayna1dW5HQjKF19QlC?dipp}huymADdl_0_k-r)9(P^5 L!K(#uj`#in3TH-apq|Qbt%J4uE7f02H=i!vbr=v8ZLW z3&_l(IDdd|+MHqUJTP4rh|MVgcd0|hm;mBhXFVJ0^x-xO?oD448%S}-W_A<;EpwN< zRmUYoe&j%j&sbv9GJUh?xazc5i&ttCcK_s7ENbEhP!E89SY(U7kQ11#DOVjj-a=0` z#**|->Y+v-^4F3an>34(0b5hjmmaXae;wIlEYbbr$mB9jo@C$TI@GylmlKgc4~=Mv zEKf&4E^49|nK;Zu>uMqfx<4yLA<~PsI2qg_8jRvtcVn#Ln5l{7LeZK`r#DkzXA8Og zep*@ht9a7$rC-_Yj-oiM)ayna1dW5HQjKF19QlC?dipp}huymADdl_0_k-r)9(P^5 L!K(#uj`#in3T Date: Fri, 11 Mar 2022 21:02:43 -0800 Subject: [PATCH 17/28] Enforce protocol version --- python/deltalake/table.py | 10 +++++++++- python/deltalake/writer.py | 18 +++++++++++++++++- python/docs/source/conf.py | 1 + python/src/lib.rs | 7 +++++++ python/tests/test_table_read.py | 8 ++++++++ python/tests/test_writer.py | 10 +++++++++- 6 files changed, 51 insertions(+), 3 deletions(-) diff --git a/python/deltalake/table.py b/python/deltalake/table.py index 1c8cc8b4e7..59096c38db 100644 --- a/python/deltalake/table.py +++ b/python/deltalake/table.py @@ -1,7 +1,7 @@ import json import warnings from dataclasses import dataclass -from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple, Union +from typing import TYPE_CHECKING, Any, Dict, List, NamedTuple, Optional, Tuple, Union import pyarrow import pyarrow.fs as pa_fs @@ -63,6 +63,11 @@ def __str__(self) -> str: ) +class ProtocolVersions(NamedTuple): + min_reader_version: int + min_writer_version: int + + @dataclass(init=False) class DeltaTable: """Create a DeltaTable instance.""" @@ -219,6 +224,9 @@ def metadata(self) -> Metadata: """ return self._metadata + def protocol(self) -> ProtocolVersions: + return ProtocolVersions(*self._table.protocol_versions()) + def history(self, limit: Optional[int] = None) -> List[Dict[str, Any]]: """ Run the history command on the DeltaTable. diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 4809ff2557..460747e00a 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -13,9 +13,14 @@ from deltalake import DeltaTable, PyDeltaTableError +from .deltalake import PyDeltaTableError from .deltalake import write_new_deltalake as _write_new_deltalake +class DeltaTableProtocolError(PyDeltaTableError): + pass + + @dataclass class AddAction: path: str @@ -44,7 +49,11 @@ def write_deltalake( If the table does not already exist, it will be created. - :param table: URI of a table or a DeltaTable object. + This function only supports protocol version 2 currently. If an attempting + to write to an existing table with a higher min_writer_version, this + function will throw an error. + + :param table_or_uri: URI of a table or a DeltaTable object. :param data: Data to write. If passing iterable, the schema must also be given. :param schema: Optional schema to write. :param partition_by: List of columns to partition the table by. Only required @@ -88,6 +97,13 @@ def write_deltalake( if partition_by: assert partition_by == table.metadata().partition_columns + + if table.protocol().min_writer_version > 1: + raise DeltaTableProtocolError( + "This table's min_writer_version is " + f"{table.protocol().min_writer_version}, " + "but this method only supports version 1." + ) else: # creating a new table current_version = -1 diff --git a/python/docs/source/conf.py b/python/docs/source/conf.py index 90cb75c000..8311715deb 100644 --- a/python/docs/source/conf.py +++ b/python/docs/source/conf.py @@ -94,4 +94,5 @@ def get_release_version() -> str: intersphinx_mapping = { "pyarrow": ("https://arrow.apache.org/docs/", None), "pyspark": ("https://spark.apache.org/docs/latest/api/python/", None), + "pandas": ("https://pandas.pydata.org/docs/", None), } diff --git a/python/src/lib.rs b/python/src/lib.rs index 78c5f92345..553bef8725 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -154,6 +154,13 @@ impl RawDeltaTable { }) } + pub fn protocol_versions(&self) -> PyResult<(i32, i32)> { + Ok(( + self._table.get_min_reader_version(), + self._table.get_min_writer_version(), + )) + } + pub fn load_version(&mut self, version: deltalake::DeltaDataTypeVersion) -> PyResult<()> { rt()? .block_on(self._table.load_version(version)) diff --git a/python/tests/test_table_read.py b/python/tests/test_table_read.py index e3d77c89b7..a5ab208a48 100644 --- a/python/tests/test_table_read.py +++ b/python/tests/test_table_read.py @@ -213,6 +213,14 @@ def test_read_partitioned_table_metadata(): assert metadata.configuration == {} +def test_read_partitioned_table_protocol(): + table_path = "../rust/tests/data/delta-0.8.0-partitioned" + dt = DeltaTable(table_path) + protocol = dt.protocol() + assert protocol.min_reader_version == 1 + assert protocol.min_writer_version == 2 + + def test_history_partitioned_table_metadata(): table_path = "../rust/tests/data/delta-0.8.0-partitioned" dt = DeltaTable(table_path) diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index a1f1170e0e..550d33f1e9 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -1,9 +1,9 @@ import json import os import pathlib -from ast import Assert from datetime import date, datetime, timedelta from decimal import Decimal +from unittest.mock import Mock import pyarrow as pa import pyarrow.compute as pc @@ -12,6 +12,8 @@ from pyarrow.lib import RecordBatchReader from deltalake import DeltaTable, write_deltalake +from deltalake.table import ProtocolVersions +from deltalake.writer import DeltaTableProtocolError @pytest.fixture() @@ -266,3 +268,9 @@ def test_writer_null_stats(tmp_path: pathlib.Path): expected_nulls = {"int32": 2, "float64": 3, "str": 4} assert stats["nullCount"] == expected_nulls + + +def test_writer_fails_on_protocol(existing_table: DeltaTable, sample_data: pa.Table): + existing_table.protocol = Mock(return_value=ProtocolVersions(1, 2)) + with pytest.raises(DeltaTableProtocolError): + write_deltalake(existing_table, sample_data, mode="overwrite") From 3bd74ed6890f1ddf21ae56c5842cbbecab7beb5f Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 11 Mar 2022 21:15:51 -0800 Subject: [PATCH 18/28] Fix docs --- python/deltalake/writer.py | 4 ++-- python/docs/source/api_reference.rst | 3 +++ 2 files changed, 5 insertions(+), 2 deletions(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 460747e00a..b0204f3166 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -49,9 +49,9 @@ def write_deltalake( If the table does not already exist, it will be created. - This function only supports protocol version 2 currently. If an attempting + This function only supports protocol version 1 currently. If an attempting to write to an existing table with a higher min_writer_version, this - function will throw an error. + function will throw DeltaTableProtocolError. :param table_or_uri: URI of a table or a DeltaTable object. :param data: Data to write. If passing iterable, the schema must also be given. diff --git a/python/docs/source/api_reference.rst b/python/docs/source/api_reference.rst index 6ea31b4b44..09659ebc10 100644 --- a/python/docs/source/api_reference.rst +++ b/python/docs/source/api_reference.rst @@ -7,6 +7,9 @@ DeltaTable .. automodule:: deltalake.table :members: +Writing DeltaTables +------------------- + .. autofunction:: deltalake.write_deltalake DeltaSchema From 9db9d743e02c271ab081cb098a3c612036a9f2b0 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Fri, 11 Mar 2022 21:18:36 -0800 Subject: [PATCH 19/28] Add experimental to docstring --- python/deltalake/writer.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index b0204f3166..31cebcb339 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -45,7 +45,7 @@ def write_deltalake( filesystem: Optional[pa_fs.FileSystem] = None, mode: Literal["error", "append", "overwrite", "ignore"] = "error", ) -> None: - """Write to a Delta Lake table + """Write to a Delta Lake table (Experimental) If the table does not already exist, it will be created. From 922a87ea16a9a7c22a9c8b3c3415e3af7c0dc1f8 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sat, 12 Mar 2022 19:39:43 -0800 Subject: [PATCH 20/28] Fix rust issues --- rust/src/delta.rs | 2 +- rust/src/delta_arrow.rs | 2 +- rust/tests/adls_gen2_table_test.rs | 2 +- rust/tests/concurrent_writes_test.rs | 2 +- rust/tests/fs_common/mod.rs | 2 +- 5 files changed, 5 insertions(+), 5 deletions(-) diff --git a/rust/src/delta.rs b/rust/src/delta.rs index 8d2ef9a8d1..d67637c65f 100644 --- a/rust/src/delta.rs +++ b/rust/src/delta.rs @@ -1818,7 +1818,7 @@ mod tests { serde_json::Value::String("test user".to_string()), ); // Action - dt.create(delta_md.clone(), protocol.clone(), Some(commit_info)) + dt.create(delta_md.clone(), protocol.clone(), Some(commit_info), None) .await .unwrap(); diff --git a/rust/src/delta_arrow.rs b/rust/src/delta_arrow.rs index 5e8a8563e5..2ecc3e755b 100644 --- a/rust/src/delta_arrow.rs +++ b/rust/src/delta_arrow.rs @@ -183,7 +183,7 @@ impl TryFrom<&ArrowField> for schema::SchemaField { arrow_field.data_type().try_into()?, arrow_field.is_nullable(), arrow_field.metadata().as_ref().map_or_else( - || HashMap::new(), + HashMap::new, |m| { m.iter() .map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone()))) diff --git a/rust/tests/adls_gen2_table_test.rs b/rust/tests/adls_gen2_table_test.rs index 2f3b95efef..b1ad2f3d6b 100644 --- a/rust/tests/adls_gen2_table_test.rs +++ b/rust/tests/adls_gen2_table_test.rs @@ -88,7 +88,7 @@ mod adls_gen2_table { let (metadata, protocol) = table_info(); // Act 1 - dt.create(metadata.clone(), protocol.clone(), None) + dt.create(metadata.clone(), protocol.clone(), None, None) .await .unwrap(); diff --git a/rust/tests/concurrent_writes_test.rs b/rust/tests/concurrent_writes_test.rs index 14d378f929..be7851197c 100644 --- a/rust/tests/concurrent_writes_test.rs +++ b/rust/tests/concurrent_writes_test.rs @@ -83,7 +83,7 @@ async fn concurrent_writes_azure() { min_writer_version: 2, }; - dt.create(metadata.clone(), protocol.clone(), None) + dt.create(metadata.clone(), protocol.clone(), None, None) .await .unwrap(); diff --git a/rust/tests/fs_common/mod.rs b/rust/tests/fs_common/mod.rs index 63c1b2b017..80984eab04 100644 --- a/rust/tests/fs_common/mod.rs +++ b/rust/tests/fs_common/mod.rs @@ -53,7 +53,7 @@ pub async fn create_test_table( min_reader_version: 1, min_writer_version: 2, }; - table.create(md, protocol, None).await.unwrap(); + table.create(md, protocol, None, None).await.unwrap(); table } From f7be374eddbc34595e4447ab0ca0410b959acce2 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sat, 12 Mar 2022 20:23:07 -0800 Subject: [PATCH 21/28] Address mypy checks --- python/deltalake/writer.py | 8 ++++---- python/stubs/deltalake/deltalake.pyi | 11 +++++++++-- python/stubs/pyarrow/__init__.pyi | 1 + python/stubs/pyarrow/dataset.pyi | 1 + python/stubs/pyarrow/lib.pyi | 3 +++ 5 files changed, 18 insertions(+), 6 deletions(-) create mode 100644 python/stubs/pyarrow/lib.pyi diff --git a/python/deltalake/writer.py b/python/deltalake/writer.py index 31cebcb339..7c38887301 100644 --- a/python/deltalake/writer.py +++ b/python/deltalake/writer.py @@ -3,18 +3,18 @@ from dataclasses import dataclass from datetime import date, datetime from decimal import Decimal -from typing import Any, Dict, Iterable, Iterator, List, Literal, Optional, Union +from typing import Any, Dict, Iterable, Iterator, List, Mapping, Optional, Union import pandas as pd import pyarrow as pa import pyarrow.dataset as ds import pyarrow.fs as pa_fs from pyarrow.lib import RecordBatchReader - -from deltalake import DeltaTable, PyDeltaTableError +from typing_extensions import Literal from .deltalake import PyDeltaTableError from .deltalake import write_new_deltalake as _write_new_deltalake +from .table import DeltaTable class DeltaTableProtocolError(PyDeltaTableError): @@ -25,7 +25,7 @@ class DeltaTableProtocolError(PyDeltaTableError): class AddAction: path: str size: int - partition_values: Dict[str, Optional[str]] + partition_values: Mapping[str, Optional[str]] modification_time: int data_change: bool stats: str diff --git a/python/stubs/deltalake/deltalake.pyi b/python/stubs/deltalake/deltalake.pyi index f420569e5d..b872e61bd3 100644 --- a/python/stubs/deltalake/deltalake.pyi +++ b/python/stubs/deltalake/deltalake.pyi @@ -1,6 +1,13 @@ -from typing import Any, Callable +from typing import Any, Callable, List + +import pyarrow as pa + +from deltalake.writer import AddAction RawDeltaTable: Any -PyDeltaTableError: Any rust_core_version: Callable[[], str] DeltaStorageFsBackend: Any + +write_new_deltalake: Callable[[str, pa.Schema, List[AddAction], str, List[str]], None] + +class PyDeltaTableError(BaseException): ... diff --git a/python/stubs/pyarrow/__init__.pyi b/python/stubs/pyarrow/__init__.pyi index 3b28a2a714..c7cef34ba9 100644 --- a/python/stubs/pyarrow/__init__.pyi +++ b/python/stubs/pyarrow/__init__.pyi @@ -2,6 +2,7 @@ from typing import Any, Callable Schema: Any Table: Any +RecordBatch: Any Field: Any DataType: Any schema: Any diff --git a/python/stubs/pyarrow/dataset.pyi b/python/stubs/pyarrow/dataset.pyi index 5d9683dee1..d06f843246 100644 --- a/python/stubs/pyarrow/dataset.pyi +++ b/python/stubs/pyarrow/dataset.pyi @@ -5,3 +5,4 @@ dataset: Any partitioning: Any FileSystemDataset: Any ParquetFileFormat: Any +write_dataset: Any diff --git a/python/stubs/pyarrow/lib.pyi b/python/stubs/pyarrow/lib.pyi new file mode 100644 index 0000000000..fc97dea727 --- /dev/null +++ b/python/stubs/pyarrow/lib.pyi @@ -0,0 +1,3 @@ +from typing import Any + +RecordBatchReader: Any From e0a0cc34a9a8f2cc4f0ff4f365fe5a0c1835ffea Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sat, 12 Mar 2022 20:31:17 -0800 Subject: [PATCH 22/28] Fix formatting --- rust/src/delta_arrow.rs | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/rust/src/delta_arrow.rs b/rust/src/delta_arrow.rs index 2ecc3e755b..4df46fc48e 100644 --- a/rust/src/delta_arrow.rs +++ b/rust/src/delta_arrow.rs @@ -182,14 +182,14 @@ impl TryFrom<&ArrowField> for schema::SchemaField { arrow_field.name().clone(), arrow_field.data_type().try_into()?, arrow_field.is_nullable(), - arrow_field.metadata().as_ref().map_or_else( - HashMap::new, - |m| { + arrow_field + .metadata() + .as_ref() + .map_or_else(HashMap::new, |m| { m.iter() .map(|(k, v)| (k.clone(), serde_json::Value::String(v.clone()))) .collect() - }, - ), + }), )) } } From 4cf415ab0f5b4b4f8d5cb86217220f37f53c2c50 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sat, 12 Mar 2022 20:49:19 -0800 Subject: [PATCH 23/28] Remove unnecessary clone --- python/src/lib.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/python/src/lib.rs b/python/src/lib.rs index 553bef8725..64762a6beb 100644 --- a/python/src/lib.rs +++ b/python/src/lib.rs @@ -533,7 +533,7 @@ fn write_new_deltalake( None, None, (&schema).try_into()?, - partition_by.clone(), + partition_by, HashMap::new(), ); From 59028efff893b8f6dc82f41141a0ef9d29e65a4e Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 15 Mar 2022 16:30:22 -0700 Subject: [PATCH 24/28] Skip tests on unsupported platforms --- python/tests/test_writer.py | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/python/tests/test_writer.py b/python/tests/test_writer.py index 550d33f1e9..b87806aafc 100644 --- a/python/tests/test_writer.py +++ b/python/tests/test_writer.py @@ -1,6 +1,7 @@ import json import os import pathlib +import sys from datetime import date, datetime, timedelta from decimal import Decimal from unittest.mock import Mock @@ -16,6 +17,23 @@ from deltalake.writer import DeltaTableProtocolError +def _is_old_glibc_version(): + if "CS_GNU_LIBC_VERSION" in os.confstr_names: + version = os.confstr("CS_GNU_LIBC_VERSION").split(" ")[1] + return version < "2.28" + else: + return False + + +if sys.platform == "win32": + pytest.skip("Writer isn't yet supported on Windows", allow_module_level=True) + +if _is_old_glibc_version(): + pytest.skip( + "Writer isn't yet supported on Linux with glibc < 2.28", allow_module_level=True + ) + + @pytest.fixture() def sample_data(): nrows = 5 From 6bfefa440e45d727cf11c0407859713ec6a6a7de Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 15 Mar 2022 16:52:35 -0700 Subject: [PATCH 25/28] Make sphinx happy --- python/docs/source/conf.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/docs/source/conf.py b/python/docs/source/conf.py index 8311715deb..bb6c11237b 100644 --- a/python/docs/source/conf.py +++ b/python/docs/source/conf.py @@ -57,6 +57,7 @@ def get_release_version() -> str: ("py:class", "pyarrow.lib.DataType"), ("py:class", "pyarrow.lib.Field"), ("py:class", "pyarrow.lib.NativeFile"), + ("py:class", "pyarrow.lib.RecordBatchReader"), ("py:class", "pyarrow._fs.FileSystem"), ("py:class", "pyarrow._fs.FileInfo"), ("py:class", "pyarrow._fs.FileSelector"), From 39044c57302131513154b8035ae80466db908059 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 15 Mar 2022 17:06:32 -0700 Subject: [PATCH 26/28] Need tying extensions for checking now --- python/pyproject.toml | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/python/pyproject.toml b/python/pyproject.toml index bd02df5790..7b096de6f5 100644 --- a/python/pyproject.toml +++ b/python/pyproject.toml @@ -36,7 +36,8 @@ devel = [ "sphinx", "sphinx-rtd-theme", "toml", - "pandas" + "pandas", + "typing-extensions" ] [project.urls] From ea6b954572d08488f85b19e2d38b99ddb1228c53 Mon Sep 17 00:00:00 2001 From: Will Jones Date: Tue, 15 Mar 2022 17:19:09 -0700 Subject: [PATCH 27/28] Add nipick ignore for typing_extensions --- python/docs/source/conf.py | 1 + 1 file changed, 1 insertion(+) diff --git a/python/docs/source/conf.py b/python/docs/source/conf.py index bb6c11237b..c92f655c70 100644 --- a/python/docs/source/conf.py +++ b/python/docs/source/conf.py @@ -64,6 +64,7 @@ def get_release_version() -> str: ("py:class", "pyarrow._fs.FileSystemHandler"), ("py:class", "RawDeltaTable"), ("py:class", "pandas.DataFrame"), + ("py:class", "typing_extensions.Literal"), ] # Add any paths that contain templates here, relative to this directory. From a4ebf7f5eb4f3713ed413a9769b23d8e97be333c Mon Sep 17 00:00:00 2001 From: Will Jones Date: Sun, 20 Mar 2022 11:18:35 -0700 Subject: [PATCH 28/28] cleanup --- python/docs/source/conf.py | 1 - 1 file changed, 1 deletion(-) diff --git a/python/docs/source/conf.py b/python/docs/source/conf.py index c92f655c70..bb6c11237b 100644 --- a/python/docs/source/conf.py +++ b/python/docs/source/conf.py @@ -64,7 +64,6 @@ def get_release_version() -> str: ("py:class", "pyarrow._fs.FileSystemHandler"), ("py:class", "RawDeltaTable"), ("py:class", "pandas.DataFrame"), - ("py:class", "typing_extensions.Literal"), ] # Add any paths that contain templates here, relative to this directory.