diff --git a/.github/CODEOWNERS b/.github/CODEOWNERS
index 480bc5c521..274e1e161b 100644
--- a/.github/CODEOWNERS
+++ b/.github/CODEOWNERS
@@ -1,4 +1,6 @@
-* @houqp @xianwill @wjones127 @fvaleye @roeap @rtyler @mosyp
+rust/ @wjones127 @roeap @rtyler
proofs/ @houqp
-python/ @wjones127 @fvaleye @rtyler @roeap @houqp
+python/ @wjones127 @fvaleye @roeap
tlaplus/ @houqp
+.github/ @wjones127 @rtyler
+docs/ @MrPowers
diff --git a/.github/workflows/build.yml b/.github/workflows/build.yml
index ba2915cdc8..80dec2eaef 100644
--- a/.github/workflows/build.yml
+++ b/.github/workflows/build.yml
@@ -102,7 +102,7 @@ jobs:
AWS_ACCESS_KEY_ID: deltalake
AWS_SECRET_ACCESS_KEY: weloverust
AWS_ENDPOINT_URL: http://localhost:4566
- AWS_STORAGE_ALLOW_HTTP: "1"
+ AWS_ALLOW_HTTP: "1"
AZURE_USE_EMULATOR: "1"
AZURE_STORAGE_ALLOW_HTTP: "1"
AZURITE_BLOB_STORAGE_URL: "http://localhost:10000"
@@ -164,5 +164,5 @@ jobs:
- uses: Swatinem/rust-cache@v2
- name: Run tests
- working-directory: rust
+ working-directory: crates/deltalake-core
run: cargo test --no-default-features --features=parquet2
diff --git a/.github/workflows/dev_pr/labeler.yml b/.github/workflows/dev_pr/labeler.yml
index 5857c6f1c5..71c240950f 100644
--- a/.github/workflows/dev_pr/labeler.yml
+++ b/.github/workflows/dev_pr/labeler.yml
@@ -30,4 +30,3 @@ proofs:
tlaplus:
- tlaplus/**/*
-
diff --git a/.github/workflows/python_release.yml b/.github/workflows/python_release.yml
index 85af4ea95a..6793d129a0 100644
--- a/.github/workflows/python_release.yml
+++ b/.github/workflows/python_release.yml
@@ -114,8 +114,6 @@ jobs:
target: aarch64-unknown-linux-gnu
command: publish
args: --skip-existing -m python/Cargo.toml --no-sdist ${{ env.FEATURES_FLAG }}
- # for openssl build
- before-script-linux: yum install -y perl-IPC-Cmd
release-docs:
needs:
diff --git a/.gitignore b/.gitignore
index 5fe8f6cf0a..5ecd5a627d 100644
--- a/.gitignore
+++ b/.gitignore
@@ -27,3 +27,5 @@ Cargo.lock
!/delta-inspect/Cargo.lock
!/proofs/Cargo.lock
+justfile
+site
\ No newline at end of file
diff --git a/CHANGELOG.md b/CHANGELOG.md
index 6f62ca2dcd..922a49f47e 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -1,5 +1,86 @@
# Changelog
+## [rust-v0.16.0](https://github.com/delta-io/delta-rs/tree/rust-v0.16.0) (2023-09-27)
+
+[Full Changelog](https://github.com/delta-io/delta-rs/compare/rust-v0.15.0...rust-v0.16.0)
+
+**Implemented enhancements:**
+
+- Expose Optimize option min\_commit\_interval in Python [\#1640](https://github.com/delta-io/delta-rs/issues/1640)
+- Expose create\_checkpoint\_for [\#1513](https://github.com/delta-io/delta-rs/issues/1513)
+- integration tests regularly fail for HDFS [\#1428](https://github.com/delta-io/delta-rs/issues/1428)
+- Add Support for Microsoft OneLake [\#1418](https://github.com/delta-io/delta-rs/issues/1418)
+- add support for atomic rename in R2 [\#1356](https://github.com/delta-io/delta-rs/issues/1356)
+
+**Fixed bugs:**
+
+- Writing with large arrow types \(e.g. large\_utf8\), writes wrong partition encoding [\#1669](https://github.com/delta-io/delta-rs/issues/1669)
+- \[python\] Different stringification of partition values in reader and writer [\#1653](https://github.com/delta-io/delta-rs/issues/1653)
+- Unable to interface with data written from Spark Databricks [\#1651](https://github.com/delta-io/delta-rs/issues/1651)
+- `get_last_checkpoint` does some unnecessary listing [\#1643](https://github.com/delta-io/delta-rs/issues/1643)
+- `PartitionWriter`'s `buffer_len` doesn't include incomplete row groups [\#1637](https://github.com/delta-io/delta-rs/issues/1637)
+- Slack community invite link has expired [\#1636](https://github.com/delta-io/delta-rs/issues/1636)
+- delta-rs does not appear to support tables with liquid clustering [\#1626](https://github.com/delta-io/delta-rs/issues/1626)
+- Internal Parquet panic when using a Map type. [\#1619](https://github.com/delta-io/delta-rs/issues/1619)
+- partition\_by with "$" on local filesystem [\#1591](https://github.com/delta-io/delta-rs/issues/1591)
+- ProtocolChanged error when perfoming append write [\#1585](https://github.com/delta-io/delta-rs/issues/1585)
+- Unable to `cargo update` using git tag or rev on Rust 1.70 [\#1580](https://github.com/delta-io/delta-rs/issues/1580)
+- NoMetadata error when reading detlatable [\#1562](https://github.com/delta-io/delta-rs/issues/1562)
+- Cannot read delta table: `Delta protocol violation` [\#1557](https://github.com/delta-io/delta-rs/issues/1557)
+- Update the CODEOWNERS to capture the current reviewers and contributors [\#1553](https://github.com/delta-io/delta-rs/issues/1553)
+- \[Python\] Incorrect file URIs when partition values contain escape character [\#1533](https://github.com/delta-io/delta-rs/issues/1533)
+- add documentation how to Query Delta natively from datafusion [\#1485](https://github.com/delta-io/delta-rs/issues/1485)
+- Python: write\_deltalake to ADLS Gen2 issue [\#1456](https://github.com/delta-io/delta-rs/issues/1456)
+- Partition values that have been url encoded cannot be read when using deltalake [\#1446](https://github.com/delta-io/delta-rs/issues/1446)
+- Error optimizing large table [\#1419](https://github.com/delta-io/delta-rs/issues/1419)
+- Cannot read partitions with special characters \(including space\) with pyarrow \>= 11 [\#1393](https://github.com/delta-io/delta-rs/issues/1393)
+- ImportError: deltalake/\_internal.abi3.so: cannot allocate memory in static TLS block [\#1380](https://github.com/delta-io/delta-rs/issues/1380)
+- Invalid JSON in log record missing field `schemaString` for DLT tables [\#1302](https://github.com/delta-io/delta-rs/issues/1302)
+- Special characters in partition path not handled locally [\#1299](https://github.com/delta-io/delta-rs/issues/1299)
+
+**Merged pull requests:**
+
+- chore: bump rust crate version [\#1675](https://github.com/delta-io/delta-rs/pull/1675) ([rtyler](https://github.com/rtyler))
+- fix: change partitioning schema from large to normal string for pyarrow\<12 [\#1671](https://github.com/delta-io/delta-rs/pull/1671) ([ion-elgreco](https://github.com/ion-elgreco))
+- feat: allow to set large dtypes for the schema check in `write_deltalake` [\#1668](https://github.com/delta-io/delta-rs/pull/1668) ([ion-elgreco](https://github.com/ion-elgreco))
+- docs: small consistency update in guide and readme [\#1666](https://github.com/delta-io/delta-rs/pull/1666) ([ion-elgreco](https://github.com/ion-elgreco))
+- fix: exception string in writer.py [\#1665](https://github.com/delta-io/delta-rs/pull/1665) ([sebdiem](https://github.com/sebdiem))
+- chore: increment python library version [\#1664](https://github.com/delta-io/delta-rs/pull/1664) ([wjones127](https://github.com/wjones127))
+- docs: fix some typos [\#1662](https://github.com/delta-io/delta-rs/pull/1662) ([ion-elgreco](https://github.com/ion-elgreco))
+- fix: more consistent handling of partition values and file paths [\#1661](https://github.com/delta-io/delta-rs/pull/1661) ([roeap](https://github.com/roeap))
+- docs: add docstring to protocol method [\#1660](https://github.com/delta-io/delta-rs/pull/1660) ([MrPowers](https://github.com/MrPowers))
+- docs: make docs.rs build docs with all features enabled [\#1658](https://github.com/delta-io/delta-rs/pull/1658) ([simonvandel](https://github.com/simonvandel))
+- fix: enable offset listing for s3 [\#1654](https://github.com/delta-io/delta-rs/pull/1654) ([eeroel](https://github.com/eeroel))
+- chore: fix the incorrect Slack link in our readme [\#1649](https://github.com/delta-io/delta-rs/pull/1649) ([rtyler](https://github.com/rtyler))
+- fix: compensate for invalid log files created by Delta Live Tables [\#1647](https://github.com/delta-io/delta-rs/pull/1647) ([rtyler](https://github.com/rtyler))
+- chore: proposed updated CODEOWNERS to allow better review notifications [\#1646](https://github.com/delta-io/delta-rs/pull/1646) ([rtyler](https://github.com/rtyler))
+- feat: expose min\_commit\_interval to `optimize.compact` and `optimize.z_order` [\#1645](https://github.com/delta-io/delta-rs/pull/1645) ([ion-elgreco](https://github.com/ion-elgreco))
+- fix: avoid excess listing of log files [\#1644](https://github.com/delta-io/delta-rs/pull/1644) ([eeroel](https://github.com/eeroel))
+- fix: introduce support for Microsoft OneLake [\#1642](https://github.com/delta-io/delta-rs/pull/1642) ([rtyler](https://github.com/rtyler))
+- fix: explicitly require chrono 0.4.31 or greater [\#1641](https://github.com/delta-io/delta-rs/pull/1641) ([rtyler](https://github.com/rtyler))
+- fix: include in-progress row group when calculating in-memory buffer length [\#1638](https://github.com/delta-io/delta-rs/pull/1638) ([BnMcG](https://github.com/BnMcG))
+- chore: relax chrono pin to 0.4 [\#1635](https://github.com/delta-io/delta-rs/pull/1635) ([houqp](https://github.com/houqp))
+- chore: update datafusion to 31, arrow to 46 and object\_store to 0.7 [\#1634](https://github.com/delta-io/delta-rs/pull/1634) ([houqp](https://github.com/houqp))
+- docs: update Readme [\#1633](https://github.com/delta-io/delta-rs/pull/1633) ([dennyglee](https://github.com/dennyglee))
+- chore: pin the chrono dependency [\#1631](https://github.com/delta-io/delta-rs/pull/1631) ([rtyler](https://github.com/rtyler))
+- feat: pass known file sizes to filesystem in Python [\#1630](https://github.com/delta-io/delta-rs/pull/1630) ([eeroel](https://github.com/eeroel))
+- feat: implement parsing for the new `domainMetadata` actions in the commit log [\#1629](https://github.com/delta-io/delta-rs/pull/1629) ([rtyler](https://github.com/rtyler))
+- ci: fix python release [\#1624](https://github.com/delta-io/delta-rs/pull/1624) ([wjones127](https://github.com/wjones127))
+- ci: extend azure timeout [\#1622](https://github.com/delta-io/delta-rs/pull/1622) ([wjones127](https://github.com/wjones127))
+- feat: allow multiple incremental commits in optimize [\#1621](https://github.com/delta-io/delta-rs/pull/1621) ([kvap](https://github.com/kvap))
+- fix: change map nullable value to false [\#1620](https://github.com/delta-io/delta-rs/pull/1620) ([cmackenzie1](https://github.com/cmackenzie1))
+- Introduce the changelog for the last couple releases [\#1617](https://github.com/delta-io/delta-rs/pull/1617) ([rtyler](https://github.com/rtyler))
+- chore: bump python version to 0.10.2 [\#1616](https://github.com/delta-io/delta-rs/pull/1616) ([wjones127](https://github.com/wjones127))
+- perf: avoid holding GIL in DeltaFileSystemHandler [\#1615](https://github.com/delta-io/delta-rs/pull/1615) ([wjones127](https://github.com/wjones127))
+- fix: don't re-encode paths [\#1613](https://github.com/delta-io/delta-rs/pull/1613) ([wjones127](https://github.com/wjones127))
+- feat: use url parsing from object store [\#1592](https://github.com/delta-io/delta-rs/pull/1592) ([roeap](https://github.com/roeap))
+- feat: buffered reading of transaction logs [\#1549](https://github.com/delta-io/delta-rs/pull/1549) ([eeroel](https://github.com/eeroel))
+- feat: merge operation [\#1522](https://github.com/delta-io/delta-rs/pull/1522) ([Blajda](https://github.com/Blajda))
+- feat: expose create\_checkpoint\_for to the public [\#1514](https://github.com/delta-io/delta-rs/pull/1514) ([haruband](https://github.com/haruband))
+- docs: update Readme [\#1440](https://github.com/delta-io/delta-rs/pull/1440) ([roeap](https://github.com/roeap))
+- refactor: re-organize top level modules [\#1434](https://github.com/delta-io/delta-rs/pull/1434) ([roeap](https://github.com/roeap))
+- feat: integrate unity catalog with datafusion [\#1338](https://github.com/delta-io/delta-rs/pull/1338) ([roeap](https://github.com/roeap))
+
## [rust-v0.15.0](https://github.com/delta-io/delta-rs/tree/rust-v0.15.0) (2023-09-06)
[Full Changelog](https://github.com/delta-io/delta-rs/compare/rust-v0.14.0...rust-v0.15.0)
diff --git a/Cargo.toml b/Cargo.toml
index f4910cf526..0b3862bd1f 100644
--- a/Cargo.toml
+++ b/Cargo.toml
@@ -1,6 +1,10 @@
[workspace]
-members = ["rust", "python"]
-exclude = ["proofs", "delta-inspect"]
+members = [
+ "crates/*",
+ "delta-inspect",
+ "python",
+]
+exclude = ["proofs"]
resolver = "2"
[profile.release-with-debug]
@@ -15,23 +19,23 @@ debug = "line-tables-only"
[workspace.dependencies]
# arrow
-arrow = { version = "45" }
-arrow-array = { version = "45" }
-arrow-buffer = { version = "45" }
-arrow-cast = { version = "45" }
-arrow-ord = { version = "45" }
-arrow-row = { version = "45" }
-arrow-schema = { version = "45" }
-arrow-select = { version = "45" }
-parquet = { version = "45" }
+arrow = { version = "47" }
+arrow-array = { version = "47" }
+arrow-buffer = { version = "47" }
+arrow-cast = { version = "47" }
+arrow-ord = { version = "47" }
+arrow-row = { version = "47" }
+arrow-schema = { version = "47" }
+arrow-select = { version = "47" }
+parquet = { version = "47" }
# datafusion
-datafusion = { version = "30" }
-datafusion-expr = { version = "30" }
-datafusion-common = { version = "30" }
-datafusion-proto = { version = "30" }
-datafusion-sql = { version = "30" }
-datafusion-physical-expr = { version = "30" }
+datafusion = { version = "32" }
+datafusion-expr = { version = "32" }
+datafusion-common = { version = "32" }
+datafusion-proto = { version = "32" }
+datafusion-sql = { version = "32" }
+datafusion-physical-expr = { version = "32" }
# serde
serde = { version = "1", features = ["derive"] }
@@ -39,7 +43,7 @@ serde_json = "1"
# "stdlib"
bytes = { version = "1" }
-chrono = { version = "0.4", default-features = false, features = ["clock"] }
+chrono = { version = "0.4.31", default-features = false, features = ["clock"] }
regex = { version = "1" }
thiserror = { version = "1" }
url = { version = "2" }
diff --git a/README.adoc b/README.adoc
deleted file mode 100644
index 7fa57e8f61..0000000000
--- a/README.adoc
+++ /dev/null
@@ -1,104 +0,0 @@
-:toc: macro
-
-= delta-rs
-
-image:https://github.com/delta-io/delta-rs/workflows/build/badge.svg[Build Status,link=https://github.com/delta-io/delta-rs/actions]
-image:https://img.shields.io/crates/v/deltalake.svg?style=flat-square[Crate,link=https://crates.io/crates/deltalake]
-image:https://img.shields.io/badge/docs-rust-blue.svg?style=flat-square[Docs,link=https://docs.rs/deltalake]
-image:https://img.shields.io/pypi/v/deltalake.svg?style=flat-square[Python binding,link=https://pypi.org/project/deltalake]
-image:https://img.shields.io/pypi/dm/deltalake?style=flat-square[PyPI - Downloads,link=https://pypi.org/project/deltalake]
-image:https://img.shields.io/badge/docs-python-blue.svg?style=flat-square[Docs,link=https://delta-io.github.io/delta-rs/python]
-
-image::logo.png[Delta-rs logo]
-A native interface to
-link:https://delta.io[Delta Lake].
-
-toc::[]
-
-== About
-
-This library provides low level access to Delta tables in Rust, which can be
-used with data processing frameworks like
-link:https://github.com/apache/arrow-datafusion[datafusion],
-link:https://github.com/apache/arrow-datafusion/tree/master/ballista[ballista],
-link:https://github.com/pola-rs/polars[polars],
-link:https://github.com/rajasekarv/vega[vega], etc. It also provides bindings to other higher level language link:https://delta-io.github.io/delta-rs/python/[Python].
-
-=== Features
-
-**Supported backends:**
-
-* Local file system
-* AWS S3
-* Azure Blob Storage / Azure Datalake Storage Gen2
-* Google Cloud Storage
-* HDFS
-
-.Support features
-|===
-| Operation/Feature | Rust | Python
-
-| Read table
-| :heavy_check_mark:
-| :heavy_check_mark:
-
-| Stream table update
-| :heavy_check_mark:
-| :heavy_check_mark:
-
-| Filter files with partitions
-| :heavy_check_mark:
-| :heavy_check_mark:
-
-| Vacuum (delete stale files)
-| :heavy_check_mark:
-| :heavy_check_mark:
-
-| History
-| :heavy_check_mark:
-| :heavy_check_mark:
-
-| Write transactions
-| :heavy_check_mark:
-|
-
-| Checkpoint creation
-| :heavy_check_mark:
-| :heavy_check_mark:
-
-| High-level file writer
-|
-| :heavy_check_mark:
-
-| Optimize
-| :heavy_check_mark:
-| :heavy_check_mark:
-
-|===
-
-
-== Get Involved
-
-Join link:https://go.delta.io/slack[#delta-rs in the Delta Lake Slack workspace]
-
-=== Development Meeting
-
-We have a standing development sync meeting for those that are interested. The meeting is held every two weeks at **9am PST** on Tuesday mornings. The direct meeting URL is shared in the Slack channel above :point_up: before the meeting.
-
-These meetings are also link:https://go.delta.io/youtube[streamed live via YouTube] if you just want to listen in.
-
-=== Development
-
-delta-rs requires the Rust compiler, which can be installed with the
-link:https://rustup.rs/[rustup]
-command.
-
-Running tests can be done with `cargo test` in the root directory, or one of the directories below:
-
-=== Rust
-
-The `rust/` directory contains core Rust APIs for accessing Delta Lake from Rust, or for higher-level language bindings.
-
-=== Python
-
-The `python/` directory contains the `deltalake` Python package built on top of delta-rs
diff --git a/README.md b/README.md
new file mode 100644
index 0000000000..b3dd824b77
--- /dev/null
+++ b/README.md
@@ -0,0 +1,189 @@
+
+
+
+
+
+
+ A native Rust library for Delta Lake, with bindings to Python
+
+ Python docs
+ ·
+ Rust docs
+ ·
+ Report a bug
+ ·
+ Request a feature
+ ·
+ Roadmap
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+
+The Delta Lake project aims to unlock the power of the Deltalake for as many users and projects as possible
+by providing native low-level APIs aimed at developers and integrators, as well as a high-level operations
+API that lets you query, inspect, and operate your Delta Lake with ease.
+
+| Source | Downloads | Installation Command | Docs |
+| --------------------- | --------------------------------- | ----------------------- | --------------- |
+| **[PyPi][pypi]** | [![Downloads][pypi-dl]][pypi] | `pip install deltalake` | [Docs][py-docs] |
+| **[Crates.io][pypi]** | [![Downloads][crates-dl]][crates] | `cargo add deltalake` | [Docs][rs-docs] |
+
+[pypi]: https://pypi.org/project/deltalake/
+[pypi-dl]: https://img.shields.io/pypi/dm/deltalake?style=flat-square&color=00ADD4
+[py-docs]: https://delta-io.github.io/delta-rs/python/
+[rs-docs]: https://docs.rs/deltalake/latest/deltalake/
+[crates]: https://crates.io/crates/deltalake
+[crates-dl]: https://img.shields.io/crates/d/deltalake?color=F75101
+
+## Table of contents
+
+- [Quick Start](#quick-start)
+- [Get Involved](#get-involved)
+- [Integrations](#integrations)
+- [Features](#features)
+
+## Quick Start
+
+The `deltalake` library aims to adopt patterns from other libraries in data processing,
+so getting started should look familiar.
+
+```py3
+from deltalake import DeltaTable, write_deltalake
+import pandas as pd
+
+# write some data into a delta table
+df = pd.DataFrame({"id": [1, 2], "value": ["foo", "boo"]})
+write_deltalake("./data/delta", df)
+
+# Load data from the delta table
+dt = DeltaTable("./data/delta")
+df2 = dt.to_pandas()
+
+assert df == df2
+```
+
+The same table can also be loaded using the core Rust crate:
+
+```rs
+use deltalake::{open_table, DeltaTableError};
+
+#[tokio::main]
+async fn main() -> Result<(), DeltaTableError> {
+ // open the table written in python
+ let table = open_table("./data/delta").await?;
+
+ // show all active files in the table
+ let files = table.get_files();
+ println!("{files}");
+
+ Ok(())
+}
+```
+
+You can also try Delta Lake docker at [DockerHub](https://go.delta.io/dockerhub) | [Docker Repo](https://go.delta.io/docker)
+
+## Get Involved
+
+We encourage you to reach out, and are [committed](https://github.com/delta-io/delta-rs/blob/main/CODE_OF_CONDUCT.md)
+to provide a welcoming community.
+
+- [Join us in our Slack workspace](https://join.slack.com/t/delta-users/shared_invite/zt-23h0xwez7-wDTm43ZVEW2ZcbKn6Bc8Fg)
+- [Report an issue](https://github.com/delta-io/delta-rs/issues/new?template=bug_report.md)
+- Looking to contribute? See our [good first issues](https://github.com/delta-io/delta-rs/contribute).
+
+## Integrations
+
+Libraries and frameworks that interoperate with delta-rs - in alphabetical order.
+
+- [AWS SDK for Pandas](https://github.com/aws/aws-sdk-pandas)
+- [ballista][ballista]
+- [datafusion][datafusion]
+- [Dask](https://github.com/dask-contrib/dask-deltatable)
+- [datahub](https://datahubproject.io/)
+- [DuckDB](https://duckdb.org/)
+- [polars](https://www.pola.rs/)
+- [Ray](https://github.com/delta-incubator/deltaray)
+
+## Features
+
+The following section outlines some core features like supported [storage backends](#cloud-integrations)
+and [operations](#supported-operations) that can be performed against tables. The state of implementation
+of features outlined in the Delta [protocol][protocol] is also [tracked](#protocol-support-level).
+
+### Cloud Integrations
+
+| Storage | Rust | Python | Comment |
+| -------------------- | :-------------------: | :-------------------: | ----------------------------------- |
+| Local | ![done] | ![done] | |
+| S3 - AWS | ![done] | ![done] | requires lock for concurrent writes |
+| S3 - MinIO | ![done] | ![done] | requires lock for concurrent writes |
+| S3 - R2 | ![done] | ![done] | requires lock for concurrent writes |
+| Azure Blob | ![done] | ![done] | |
+| Azure ADLS Gen2 | ![done] | ![done] | |
+| Microsoft OneLake | ![done] | ![done] | |
+| Google Cloud Storage | ![done] | ![done] | |
+
+### Supported Operations
+
+| Operation | Rust | Python | Description |
+| --------------------- | :----------------------: | :-----------------: | ------------------------------------------- |
+| Create | ![done] | ![done] | Create a new table |
+| Read | ![done] | ![done] | Read data from a table |
+| Vacuum | ![done] | ![done] | Remove unused files and log entries |
+| Delete - partitions | | ![done] | Delete a table partition |
+| Delete - predicates | ![done] | ![done] | Delete data based on a predicate |
+| Optimize - compaction | ![done] | ![done] | Harmonize the size of data file |
+| Optimize - Z-order | ![done] | ![done] | Place similar data into the same file |
+| Merge | [![semi-done]][merge-rs] | [![open]][merge-py] | Merge two tables (limited to full re-write) |
+| FS check | ![done] | | Remove corrupted files from table |
+
+### Protocol Support Level
+
+| Writer Version | Requirement | Status |
+| -------------- | --------------------------------------------- | :------------------: |
+| Version 2 | Append Only Tables | ![done]
+| Version 2 | Column Invariants | ![done] |
+| Version 3 | Enforce `delta.checkpoint.writeStatsAsJson` | [![open]][writer-rs] |
+| Version 3 | Enforce `delta.checkpoint.writeStatsAsStruct` | [![open]][writer-rs] |
+| Version 3 | CHECK constraints | [![open]][writer-rs] |
+| Version 4 | Change Data Feed | |
+| Version 4 | Generated Columns | |
+| Version 5 | Column Mapping | |
+| Version 6 | Identity Columns | |
+| Version 7 | Table Features | |
+
+| Reader Version | Requirement | Status |
+| -------------- | ----------------------------------- | ------ |
+| Version 2 | Column Mapping | |
+| Version 3 | Table Features (requires reader V7) | |
+
+[datafusion]: https://github.com/apache/arrow-datafusion
+[ballista]: https://github.com/apache/arrow-ballista
+[polars]: https://github.com/pola-rs/polars
+[open]: https://cdn.jsdelivr.net/gh/Readme-Workflows/Readme-Icons@main/icons/octicons/IssueNeutral.svg
+[semi-done]: https://cdn.jsdelivr.net/gh/Readme-Workflows/Readme-Icons@main/icons/octicons/ApprovedChangesGrey.svg
+[done]: https://cdn.jsdelivr.net/gh/Readme-Workflows/Readme-Icons@main/icons/octicons/ApprovedChanges.svg
+[roadmap]: https://github.com/delta-io/delta-rs/issues/1128
+[merge-py]: https://github.com/delta-io/delta-rs/issues/1357
+[merge-rs]: https://github.com/delta-io/delta-rs/issues/850
+[writer-rs]: https://github.com/delta-io/delta-rs/issues/851
+[onelake-rs]: https://github.com/delta-io/delta-rs/issues/1418
+[protocol]: https://github.com/delta-io/delta/blob/master/PROTOCOL.md
diff --git a/crates/README.md b/crates/README.md
new file mode 100644
index 0000000000..d5b17317ca
--- /dev/null
+++ b/crates/README.md
@@ -0,0 +1,3 @@
+# Delta Lake Rust crates
+
+This directory contains all of the crates published by the [delta-rs](https://github.com/delta-io/delta-rs) project. These crates were originally split based on the proposal in [#1713](https://github.com/delta-io/delta-rs/discussions/1713).
diff --git a/rust/.gitignore b/crates/deltalake-core/.gitignore
similarity index 100%
rename from rust/.gitignore
rename to crates/deltalake-core/.gitignore
diff --git a/rust/.ignore b/crates/deltalake-core/.ignore
similarity index 100%
rename from rust/.ignore
rename to crates/deltalake-core/.ignore
diff --git a/rust/Cargo.toml b/crates/deltalake-core/Cargo.toml
similarity index 83%
rename from rust/Cargo.toml
rename to crates/deltalake-core/Cargo.toml
index 4c8ebea213..e645b6bfd0 100644
--- a/rust/Cargo.toml
+++ b/crates/deltalake-core/Cargo.toml
@@ -1,6 +1,6 @@
[package]
-name = "deltalake"
-version = "0.15.0"
+name = "deltalake-core"
+version = "0.17.0"
rust-version = "1.64"
authors = ["Qingping Hou "]
homepage = "https://github.com/delta-io/delta.rs"
@@ -12,6 +12,11 @@ repository = "https://github.com/delta-io/delta.rs"
readme = "README.md"
edition = "2021"
+[package.metadata.docs.rs]
+# We cannot use all_features because TLS features are mutually exclusive.
+# We cannot use hdfs feature because it requires Java to be installed.
+features = ["azure", "datafusion", "gcs", "glue", "hdfs", "json", "python", "s3", "unity-experimental"]
+
[dependencies]
# arrow
arrow = { workspace = true, optional = true }
@@ -20,7 +25,7 @@ arrow-buffer = { workspace = true, optional = true }
arrow-cast = { workspace = true, optional = true }
arrow-ord = { workspace = true, optional = true }
arrow-row = { workspace = true, optional = true }
-arrow-schema = { workspace = true, optional = true }
+arrow-schema = { workspace = true, optional = true, features = ["serde"] }
arrow-select = { workspace = true, optional = true }
parquet = { workspace = true, features = [
"async",
@@ -62,22 +67,27 @@ tokio = { workspace = true, features = [
# other deps (these should be organized and pulled into workspace.dependencies as necessary)
cfg-if = "1"
-datafusion-objectstore-hdfs = { version = "0.1.3", default-features = false, features = [
- "hdfs3",
- "try_spawn_blocking",
-], optional = true }
errno = "0.3"
+hyper = { version = "0.14", optional = true }
itertools = "0.11"
lazy_static = "1"
log = "0"
libc = ">=0.2.90, <1"
num-bigint = "0.4"
num-traits = "0.2.15"
-object_store = "0.6.1"
+object_store = "0.7"
once_cell = "1.16.0"
parking_lot = "0.12"
parquet2 = { version = "0.17", optional = true }
percent-encoding = "2"
+tracing = { version = "0.1", optional = true }
+rand = "0.8"
+
+# hdfs
+datafusion-objectstore-hdfs = { version = "0.1.3", default-features = false, features = [
+ "hdfs3",
+ "try_spawn_blocking",
+], optional = true }
# S3 lock client
rusoto_core = { version = "0.47", default-features = false, optional = true }
@@ -93,16 +103,14 @@ reqwest = { version = "0.11.18", default-features = false, features = [
"rustls-tls",
"json",
], optional = true }
-reqwest-middleware = { version = "0.2.1", optional = true }
-reqwest-retry = { version = "0.2.2", optional = true }
# Datafusion
dashmap = { version = "5", optional = true }
-sqlparser = { version = "0.36", optional = true }
+sqlparser = { version = "0.38", optional = true }
# NOTE dependencies only for integration tests
-fs_extra = { version = "1.2.0", optional = true }
+fs_extra = { version = "1.3.0", optional = true }
tempdir = { version = "0", optional = true }
dynamodb_lock = { version = "0", default-features = false, optional = true }
@@ -117,6 +125,7 @@ tempdir = "0"
tempfile = "3"
tokio = { version = "1", features = ["macros", "rt-multi-thread"] }
utime = "0.3"
+hyper = { version = "0.14", features = ["server"] }
[features]
azure = ["object_store/azure"]
@@ -145,8 +154,8 @@ datafusion = [
]
datafusion-ext = ["datafusion"]
gcs = ["object_store/gcp"]
-glue = ["s3", "rusoto_glue/rustls"]
-glue-native-tls = ["s3-native-tls", "rusoto_glue"]
+glue = ["s3", "rusoto_glue/rustls", "tracing", "hyper"]
+glue-native-tls = ["s3-native-tls", "rusoto_glue", "tracing", "hyper"]
hdfs = ["datafusion-objectstore-hdfs"]
# used only for integration testing
integration_test = ["fs_extra", "tempdir"]
@@ -168,16 +177,8 @@ s3 = [
"dynamodb_lock/rustls",
"object_store/aws",
]
-unity-experimental = ["reqwest", "reqwest-middleware", "reqwest-retry"]
+unity-experimental = ["reqwest", "tracing", "hyper"]
[[bench]]
name = "read_checkpoint"
harness = false
-
-[[example]]
-name = "basic_operations"
-required-features = ["datafusion"]
-
-[[example]]
-name = "recordbatch-writer"
-required-features = ["arrow"]
diff --git a/rust/README.md b/crates/deltalake-core/README.md
similarity index 87%
rename from rust/README.md
rename to crates/deltalake-core/README.md
index 659de48566..b251148c69 100644
--- a/rust/README.md
+++ b/crates/deltalake-core/README.md
@@ -16,8 +16,11 @@ println!("{}", table.get_files());
### CLI
+Navigate into the `delta-inspect` directory first and run the following command
+Please noted that the test data is under `rust` instead of `delta-inspect`
+
```bash
-❯ cargo run --bin delta-inspect files ./tests/data/delta-0.2.0
+❯ cargo run --bin delta-inspect files ../rust/tests/data/delta-0.2.0
part-00000-cb6b150b-30b8-4662-ad28-ff32ddab96d2-c000.snappy.parquet
part-00000-7c2deba3-1994-4fb8-bc07-d46c948aa415-c000.snappy.parquet
part-00001-c373a5bd-85f0-4758-815e-7eb62007a15c-c000.snappy.parquet
@@ -33,7 +36,7 @@ DeltaTable(./tests/data/delta-0.2.0)
The examples folder shows how to use Rust API to manipulate Delta tables.
-Examples can be run using the `cargo run --example` command. For example:
+Navigate into the `rust` directory first and examples can be run using the `cargo run --example` command. For example:
```bash
cargo run --example read_delta_table
diff --git a/rust/benches/read_checkpoint.rs b/crates/deltalake-core/benches/read_checkpoint.rs
similarity index 91%
rename from rust/benches/read_checkpoint.rs
rename to crates/deltalake-core/benches/read_checkpoint.rs
index 9824f15eb0..2ecbee661b 100644
--- a/rust/benches/read_checkpoint.rs
+++ b/crates/deltalake-core/benches/read_checkpoint.rs
@@ -1,6 +1,6 @@
use criterion::{criterion_group, criterion_main, Criterion};
-use deltalake::delta::DeltaTableConfig;
-use deltalake::table_state::DeltaTableState;
+use deltalake::table::state::DeltaTableState;
+use deltalake::DeltaTableConfig;
use std::fs::File;
use std::io::Read;
diff --git a/crates/deltalake-core/src/data_catalog/client/backoff.rs b/crates/deltalake-core/src/data_catalog/client/backoff.rs
new file mode 100644
index 0000000000..473677f226
--- /dev/null
+++ b/crates/deltalake-core/src/data_catalog/client/backoff.rs
@@ -0,0 +1,135 @@
+//! Exponential backoff with jitter
+use rand::prelude::*;
+use std::time::Duration;
+
+/// Exponential backoff with jitter
+///
+/// See
+#[allow(missing_copy_implementations)]
+#[derive(Debug, Clone)]
+pub struct BackoffConfig {
+ /// The initial backoff duration
+ pub init_backoff: Duration,
+ /// The maximum backoff duration
+ pub max_backoff: Duration,
+ /// The base of the exponential to use
+ pub base: f64,
+}
+
+impl Default for BackoffConfig {
+ fn default() -> Self {
+ Self {
+ init_backoff: Duration::from_millis(100),
+ max_backoff: Duration::from_secs(15),
+ base: 2.,
+ }
+ }
+}
+
+/// [`Backoff`] can be created from a [`BackoffConfig`]
+///
+/// Consecutive calls to [`Backoff::tick`] will return the next backoff interval
+pub struct Backoff {
+ init_backoff: f64,
+ next_backoff_secs: f64,
+ max_backoff_secs: f64,
+ base: f64,
+ rng: Option>,
+}
+
+impl std::fmt::Debug for Backoff {
+ fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
+ f.debug_struct("Backoff")
+ .field("init_backoff", &self.init_backoff)
+ .field("next_backoff_secs", &self.next_backoff_secs)
+ .field("max_backoff_secs", &self.max_backoff_secs)
+ .field("base", &self.base)
+ .finish()
+ }
+}
+
+impl Backoff {
+ /// Create a new [`Backoff`] from the provided [`BackoffConfig`]
+ pub fn new(config: &BackoffConfig) -> Self {
+ Self::new_with_rng(config, None)
+ }
+
+ /// Creates a new `Backoff` with the optional `rng`
+ ///
+ /// Used [`rand::thread_rng()`] if no rng provided
+ pub fn new_with_rng(
+ config: &BackoffConfig,
+ rng: Option>,
+ ) -> Self {
+ let init_backoff = config.init_backoff.as_secs_f64();
+ Self {
+ init_backoff,
+ next_backoff_secs: init_backoff,
+ max_backoff_secs: config.max_backoff.as_secs_f64(),
+ base: config.base,
+ rng,
+ }
+ }
+
+ /// Returns the next backoff duration to wait for
+ pub fn tick(&mut self) -> Duration {
+ let range = self.init_backoff..(self.next_backoff_secs * self.base);
+
+ let rand_backoff = match self.rng.as_mut() {
+ Some(rng) => rng.gen_range(range),
+ None => thread_rng().gen_range(range),
+ };
+
+ let next_backoff = self.max_backoff_secs.min(rand_backoff);
+ Duration::from_secs_f64(std::mem::replace(&mut self.next_backoff_secs, next_backoff))
+ }
+}
+
+#[cfg(test)]
+mod tests {
+ use super::*;
+ use rand::rngs::mock::StepRng;
+
+ #[test]
+ fn test_backoff() {
+ let init_backoff_secs = 1.;
+ let max_backoff_secs = 500.;
+ let base = 3.;
+
+ let config = BackoffConfig {
+ init_backoff: Duration::from_secs_f64(init_backoff_secs),
+ max_backoff: Duration::from_secs_f64(max_backoff_secs),
+ base,
+ };
+
+ let assert_fuzzy_eq = |a: f64, b: f64| assert!((b - a).abs() < 0.0001, "{a} != {b}");
+
+ // Create a static rng that takes the minimum of the range
+ let rng = Box::new(StepRng::new(0, 0));
+ let mut backoff = Backoff::new_with_rng(&config, Some(rng));
+
+ for _ in 0..20 {
+ assert_eq!(backoff.tick().as_secs_f64(), init_backoff_secs);
+ }
+
+ // Create a static rng that takes the maximum of the range
+ let rng = Box::new(StepRng::new(u64::MAX, 0));
+ let mut backoff = Backoff::new_with_rng(&config, Some(rng));
+
+ for i in 0..20 {
+ let value = (base.powi(i) * init_backoff_secs).min(max_backoff_secs);
+ assert_fuzzy_eq(backoff.tick().as_secs_f64(), value);
+ }
+
+ // Create a static rng that takes the mid point of the range
+ let rng = Box::new(StepRng::new(u64::MAX / 2, 0));
+ let mut backoff = Backoff::new_with_rng(&config, Some(rng));
+
+ let mut value = init_backoff_secs;
+ for _ in 0..20 {
+ assert_fuzzy_eq(backoff.tick().as_secs_f64(), value);
+ value =
+ (init_backoff_secs + (value * base - init_backoff_secs) / 2.).min(max_backoff_secs);
+ }
+ }
+}
diff --git a/crates/deltalake-core/src/data_catalog/client/mock_server.rs b/crates/deltalake-core/src/data_catalog/client/mock_server.rs
new file mode 100644
index 0000000000..9bed67e75c
--- /dev/null
+++ b/crates/deltalake-core/src/data_catalog/client/mock_server.rs
@@ -0,0 +1,94 @@
+use std::collections::VecDeque;
+use std::convert::Infallible;
+use std::net::SocketAddr;
+use std::sync::Arc;
+
+use hyper::service::{make_service_fn, service_fn};
+use hyper::{Body, Request, Response, Server};
+use parking_lot::Mutex;
+use tokio::sync::oneshot;
+use tokio::task::JoinHandle;
+
+pub type ResponseFn = Box) -> Response + Send>;
+
+/// A mock server
+pub struct MockServer {
+ responses: Arc>>,
+ shutdown: oneshot::Sender<()>,
+ handle: JoinHandle<()>,
+ url: String,
+}
+
+impl Default for MockServer {
+ fn default() -> Self {
+ Self::new()
+ }
+}
+
+impl MockServer {
+ pub fn new() -> Self {
+ let responses: Arc>> =
+ Arc::new(Mutex::new(VecDeque::with_capacity(10)));
+
+ let r = Arc::clone(&responses);
+ let make_service = make_service_fn(move |_conn| {
+ let r = Arc::clone(&r);
+ async move {
+ Ok::<_, Infallible>(service_fn(move |req| {
+ let r = Arc::clone(&r);
+ async move {
+ Ok::<_, Infallible>(match r.lock().pop_front() {
+ Some(r) => r(req),
+ None => Response::new(Body::from("Hello World")),
+ })
+ }
+ }))
+ }
+ });
+
+ let (shutdown, rx) = oneshot::channel::<()>();
+ let server = Server::bind(&SocketAddr::from(([127, 0, 0, 1], 0))).serve(make_service);
+
+ let url = format!("http://{}", server.local_addr());
+
+ let handle = tokio::spawn(async move {
+ server
+ .with_graceful_shutdown(async {
+ rx.await.ok();
+ })
+ .await
+ .unwrap()
+ });
+
+ Self {
+ responses,
+ shutdown,
+ handle,
+ url,
+ }
+ }
+
+ /// The url of the mock server
+ pub fn url(&self) -> &str {
+ &self.url
+ }
+
+ /// Add a response
+ pub fn push(&self, response: Response) {
+ self.push_fn(|_| response)
+ }
+
+ /// Add a response function
+ pub fn push_fn(&self, f: F)
+ where
+ F: FnOnce(Request) -> Response + Send + 'static,
+ {
+ self.responses.lock().push_back(Box::new(f))
+ }
+
+ /// Shutdown the mock server
+ pub async fn shutdown(self) {
+ let _ = self.shutdown.send(());
+ self.handle.await.unwrap()
+ }
+}
diff --git a/crates/deltalake-core/src/data_catalog/client/mod.rs b/crates/deltalake-core/src/data_catalog/client/mod.rs
new file mode 100644
index 0000000000..c6cd838076
--- /dev/null
+++ b/crates/deltalake-core/src/data_catalog/client/mod.rs
@@ -0,0 +1,229 @@
+//! Generic utilities reqwest based Catalog implementations
+
+pub mod backoff;
+#[cfg(test)]
+pub mod mock_server;
+#[allow(unused)]
+pub mod pagination;
+pub mod retry;
+pub mod token;
+
+use reqwest::header::{HeaderMap, HeaderValue};
+use reqwest::{Client, ClientBuilder, Proxy};
+use std::time::Duration;
+
+fn map_client_error(e: reqwest::Error) -> super::DataCatalogError {
+ super::DataCatalogError::Generic {
+ catalog: "HTTP client",
+ source: Box::new(e),
+ }
+}
+
+static DEFAULT_USER_AGENT: &str = concat!(env!("CARGO_PKG_NAME"), "/", env!("CARGO_PKG_VERSION"),);
+
+/// HTTP client configuration for remote catalogs
+#[derive(Debug, Clone, Default)]
+pub struct ClientOptions {
+ user_agent: Option,
+ default_headers: Option,
+ proxy_url: Option,
+ allow_http: bool,
+ allow_insecure: bool,
+ timeout: Option,
+ connect_timeout: Option,
+ pool_idle_timeout: Option,
+ pool_max_idle_per_host: Option,
+ http2_keep_alive_interval: Option,
+ http2_keep_alive_timeout: Option,
+ http2_keep_alive_while_idle: bool,
+ http1_only: bool,
+ http2_only: bool,
+}
+
+impl ClientOptions {
+ /// Create a new [`ClientOptions`] with default values
+ pub fn new() -> Self {
+ Default::default()
+ }
+
+ /// Sets the User-Agent header to be used by this client
+ ///
+ /// Default is based on the version of this crate
+ pub fn with_user_agent(mut self, agent: HeaderValue) -> Self {
+ self.user_agent = Some(agent);
+ self
+ }
+
+ /// Sets the default headers for every request
+ pub fn with_default_headers(mut self, headers: HeaderMap) -> Self {
+ self.default_headers = Some(headers);
+ self
+ }
+
+ /// Sets what protocol is allowed. If `allow_http` is :
+ /// * false (default): Only HTTPS are allowed
+ /// * true: HTTP and HTTPS are allowed
+ pub fn with_allow_http(mut self, allow_http: bool) -> Self {
+ self.allow_http = allow_http;
+ self
+ }
+ /// Allows connections to invalid SSL certificates
+ /// * false (default): Only valid HTTPS certificates are allowed
+ /// * true: All HTTPS certificates are allowed
+ ///
+ /// # Warning
+ ///
+ /// You should think very carefully before using this method. If
+ /// invalid certificates are trusted, *any* certificate for *any* site
+ /// will be trusted for use. This includes expired certificates. This
+ /// introduces significant vulnerabilities, and should only be used
+ /// as a last resort or for testing
+ pub fn with_allow_invalid_certificates(mut self, allow_insecure: bool) -> Self {
+ self.allow_insecure = allow_insecure;
+ self
+ }
+
+ /// Only use http1 connections
+ pub fn with_http1_only(mut self) -> Self {
+ self.http1_only = true;
+ self
+ }
+
+ /// Only use http2 connections
+ pub fn with_http2_only(mut self) -> Self {
+ self.http2_only = true;
+ self
+ }
+
+ /// Set an HTTP proxy to use for requests
+ pub fn with_proxy_url(mut self, proxy_url: impl Into) -> Self {
+ self.proxy_url = Some(proxy_url.into());
+ self
+ }
+
+ /// Set a request timeout
+ ///
+ /// The timeout is applied from when the request starts connecting until the
+ /// response body has finished
+ pub fn with_timeout(mut self, timeout: Duration) -> Self {
+ self.timeout = Some(timeout);
+ self
+ }
+
+ /// Set a timeout for only the connect phase of a Client
+ pub fn with_connect_timeout(mut self, timeout: Duration) -> Self {
+ self.connect_timeout = Some(timeout);
+ self
+ }
+
+ /// Set the pool max idle timeout
+ ///
+ /// This is the length of time an idle connection will be kept alive
+ ///
+ /// Default is 90 seconds
+ pub fn with_pool_idle_timeout(mut self, timeout: Duration) -> Self {
+ self.pool_idle_timeout = Some(timeout);
+ self
+ }
+
+ /// Set the maximum number of idle connections per host
+ ///
+ /// Default is no limit
+ pub fn with_pool_max_idle_per_host(mut self, max: usize) -> Self {
+ self.pool_max_idle_per_host = Some(max);
+ self
+ }
+
+ /// Sets an interval for HTTP2 Ping frames should be sent to keep a connection alive.
+ ///
+ /// Default is disabled
+ pub fn with_http2_keep_alive_interval(mut self, interval: Duration) -> Self {
+ self.http2_keep_alive_interval = Some(interval);
+ self
+ }
+
+ /// Sets a timeout for receiving an acknowledgement of the keep-alive ping.
+ ///
+ /// If the ping is not acknowledged within the timeout, the connection will be closed.
+ /// Does nothing if http2_keep_alive_interval is disabled.
+ ///
+ /// Default is disabled
+ pub fn with_http2_keep_alive_timeout(mut self, interval: Duration) -> Self {
+ self.http2_keep_alive_timeout = Some(interval);
+ self
+ }
+
+ /// Enable HTTP2 keep alive pings for idle connections
+ ///
+ /// If disabled, keep-alive pings are only sent while there are open request/response
+ /// streams. If enabled, pings are also sent when no streams are active
+ ///
+ /// Default is disabled
+ pub fn with_http2_keep_alive_while_idle(mut self) -> Self {
+ self.http2_keep_alive_while_idle = true;
+ self
+ }
+
+ pub(crate) fn client(&self) -> super::DataCatalogResult {
+ let mut builder = ClientBuilder::new();
+
+ match &self.user_agent {
+ Some(user_agent) => builder = builder.user_agent(user_agent),
+ None => builder = builder.user_agent(DEFAULT_USER_AGENT),
+ }
+
+ if let Some(headers) = &self.default_headers {
+ builder = builder.default_headers(headers.clone())
+ }
+
+ if let Some(proxy) = &self.proxy_url {
+ let proxy = Proxy::all(proxy).map_err(map_client_error)?;
+ builder = builder.proxy(proxy);
+ }
+
+ if let Some(timeout) = self.timeout {
+ builder = builder.timeout(timeout)
+ }
+
+ if let Some(timeout) = self.connect_timeout {
+ builder = builder.connect_timeout(timeout)
+ }
+
+ if let Some(timeout) = self.pool_idle_timeout {
+ builder = builder.pool_idle_timeout(timeout)
+ }
+
+ if let Some(max) = self.pool_max_idle_per_host {
+ builder = builder.pool_max_idle_per_host(max)
+ }
+
+ if let Some(interval) = self.http2_keep_alive_interval {
+ builder = builder.http2_keep_alive_interval(interval)
+ }
+
+ if let Some(interval) = self.http2_keep_alive_timeout {
+ builder = builder.http2_keep_alive_timeout(interval)
+ }
+
+ if self.http2_keep_alive_while_idle {
+ builder = builder.http2_keep_alive_while_idle(true)
+ }
+
+ if self.http1_only {
+ builder = builder.http1_only()
+ }
+
+ if self.http2_only {
+ builder = builder.http2_prior_knowledge()
+ }
+
+ if self.allow_insecure {
+ builder = builder.danger_accept_invalid_certs(self.allow_insecure)
+ }
+
+ builder
+ .https_only(!self.allow_http)
+ .build()
+ .map_err(map_client_error)
+ }
+}
diff --git a/crates/deltalake-core/src/data_catalog/client/pagination.rs b/crates/deltalake-core/src/data_catalog/client/pagination.rs
new file mode 100644
index 0000000000..a5225237b4
--- /dev/null
+++ b/crates/deltalake-core/src/data_catalog/client/pagination.rs
@@ -0,0 +1,58 @@
+//! Handle paginates results
+use std::future::Future;
+
+use futures::Stream;
+
+use crate::data_catalog::DataCatalogResult;
+
+/// Takes a paginated operation `op` that when called with:
+///
+/// - A state `S`
+/// - An optional next token `Option`
+///
+/// Returns
+///
+/// - A response value `T`
+/// - The next state `S`
+/// - The next continuation token `Option`
+///
+/// And converts it into a `Stream>` which will first call `op(state, None)`, and yield
+/// the returned response `T`. If the returned continuation token was `None` the stream will then
+/// finish, otherwise it will continue to call `op(state, token)` with the values returned by the
+/// previous call to `op`, until a continuation token of `None` is returned
+///
+pub fn stream_paginated(state: S, op: F) -> impl Stream- >
+where
+ F: Fn(S, Option) -> Fut + Copy,
+ Fut: Future