Skip to content

Commit

Permalink
add async version
Browse files Browse the repository at this point in the history
  • Loading branch information
r3stl355 committed Jan 3, 2023
1 parent ad349b0 commit 4c314ce
Show file tree
Hide file tree
Showing 11 changed files with 370 additions and 56 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1,2 +1,4 @@
.DS_Store
local-*
/target
Cargo.lock
17 changes: 14 additions & 3 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,20 @@ edition = "2021"
[lib]
path = "src/lib.rs"

[features]
blocking = ["reqwest/blocking"]

[[example]]
name = "async"

[[example]]
name = "blocking"
required-features = ["blocking"]

[dependencies]
tokio = { version = "1", features = ["macros", "rt-multi-thread",] }
parquet = "14.0.0"
reqwest = { version = "0.11", features = ["blocking", "json"] }
reqwest = { version = "0.11", features = ["json"] }
url = "2.2"
rustc_version_runtime = "0.1"
serde = { version = "1.0", features = ["derive"] }
Expand All @@ -21,5 +32,5 @@ polars = { version = "0.22.8", features = ["lazy", "parquet"] }

[dev-dependencies]
wiremock = "0.5"
futures = "0.3"
uuid = { version = "1.1", features = ["v4"] }
uuid = { version = "1.1", features = ["v4"] }
tokio-test = { version = "0.4" }
16 changes: 7 additions & 9 deletions README.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
![experimental](https://github.com/GIScience/badges/raw/master/status/experimental.svg)
[![main](https://github.com/r3stl355/delta-sharing-rust-client/actions/workflows/main.yml/badge.svg?branch=main)](https://github.com/r3stl355/delta-sharing-rust-client/actions/workflows/main.yml)

# Delta Sharing client library for Rust

This is a simple library for Rust to access data published via Delta Sharing
This is a simple library for Rust to access data published via Delta Sharing. Has an async (`delta-sharing::application::Application`) version, and also a blocking (`delta-sharing::blocking::Application`) version for smaller operations, both exposing similar APIs.

## Features

- Retrieve Delta Sharing information (shares, schemas, tables and files)
- Query shared table data using [Polars](https://pola-rs.github.io/polars/polars/index.html). `get_dataframe` downloads the table's parquet files (and caches then locally for subsequent reads) and returns a lazy abstraction (logical plan) over an eager DataFrame. This lazy abstraction provides methods for incrementally modifying that logical plan until the final output is requested (via `collect`).
- Query shared table data using [Polars](https://pola-rs.github.io/polars/polars/index.html). `get_dataframe` downloads the table's parquet files (and caches then locally for subsequent queries) and returns a lazy abstraction (logical plan) over an eager DataFrame. This lazy abstraction provides methods for incrementally modifying that logical plan until output is requested (via `collect`).

## Pre-requisites

Expand All @@ -17,9 +14,10 @@ This is a simple library for Rust to access data published via Delta Sharing

## Quick start

Use this [sample project](https://github.com/r3stl355/delta-sharing-rust-cllient-use-example) for a quick start
- Clone this repo
- Set the `bearerToken` and `endpoint` values in the `config.json` to match your Delta Sharing information
- Run a simple example included with the library: `cargo run --example async`. This example is using an async version of the library. When executed, it will get and display all the data from the first Data Sharing table it finds. For an example using a blocking version of the client, run `cargo run --example blocking --features="delta-sharing/blocking"`

## TODO (Work in progress)
## TODO

- implement `async` mode (e.g. swap blocking `reqwest` client to async version) -
- write more tests
Need to add more tests
5 changes: 5 additions & 0 deletions config.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
{
"shareCredentialsVersion":1,
"bearerToken":"<your Delta Share access token>",
"endpoint":"<your Delta Share endpoinit URL>"
}
25 changes: 25 additions & 0 deletions examples/async.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
#[tokio::main]
async fn main() {

use std::{fs};

env_logger::init();

println!("An example using an async client");

let conf_str = &fs::read_to_string("./config.json").unwrap();
let config: delta_sharing::protocol::ProviderConfig = serde_json::from_str(conf_str).expect("Invalid configuration");
let mut app = delta_sharing::application::Application::new(config, None).await.unwrap();
let shares = app.list_shares().await.unwrap();
if shares.len() == 0 {
println!("At least 1 Delta Share is required");
} else {
let tables = app.list_all_tables(&shares[0]).await.unwrap();
if shares.len() == 0 {
println!("You need to have at least one table in share {}, or use a different share", shares[0].name);
} else {
let res = app.get_dataframe(&tables[0]).await.unwrap().collect().unwrap();
println!("Dataframe:\n {}", res);
}
}
}
25 changes: 25 additions & 0 deletions examples/blocking.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
fn main() {

use std::{fs};

env_logger::init();

println!("An example using a blocking client");

let conf_str = &fs::read_to_string("./config.json").unwrap();

let config: delta_sharing::protocol::ProviderConfig = serde_json::from_str(conf_str).expect("Invalid configuration");
let mut app = delta_sharing::blocking::Application::new(config, None).unwrap();
let shares = app.list_shares().unwrap();
if shares.len() == 0 {
println!("At least 1 Delta Share is required");
} else {
let tables = app.list_all_tables(&shares[0]).unwrap();
if shares.len() == 0 {
println!("You need to have at least one table in share {}, or use a different share", shares[0].name);
} else {
let res = app.get_dataframe(&tables[0]).unwrap().collect().unwrap();
println!("Dataframe:\n {}", res);
}
}
}
78 changes: 39 additions & 39 deletions src/application.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,14 +13,14 @@ use crate::reader::*;
const METADATA_FILE: &str = "metadata.json";

pub struct Application {
http_client: reqwest::blocking::Client,
http_client: reqwest::Client,
base_url: Url,
pub data_root: String,
cache: HashMap<String, FileCache>,
}

impl Application {
pub fn new(provider_config: ProviderConfig, data_root: Option<String>) -> Result<Self, anyhow::Error> {
pub async fn new(provider_config: ProviderConfig, data_root: Option<String>) -> Result<Self, anyhow::Error> {
if provider_config.share_credentials_version > CREDENTIALS_VERSION {
panic!("'share_credentials_version' in the provider configuration is {}, which is newer than the \
version {} supported by the current release. Please upgrade to a newer release.",
Expand All @@ -36,14 +36,14 @@ impl Application {
})
}

fn get_client(config: &ProviderConfig) -> Result<reqwest::blocking::Client, reqwest::Error> {
fn get_client(config: &ProviderConfig) -> Result<reqwest::Client, reqwest::Error> {
let rust_version: &str = &format!("{}", rustc_version_runtime::version());
let user_agent: &str = &format!("Delta-Sharing-Rust/{VERSION} Rust/{rust_version}");
let bearer_token = &format!("Bearer {}", config.bearer_token);
let mut headers = header::HeaderMap::new();
headers.insert(header::AUTHORIZATION, header::HeaderValue::from_str(bearer_token).unwrap());
headers.insert(header::USER_AGENT, header::HeaderValue::from_str(user_agent).unwrap());
reqwest::blocking::Client::builder().default_headers(headers).build()
reqwest::Client::builder().default_headers(headers).build()
}

fn build_base_url(endpoint: &String) -> Result<Url, url::ParseError> {
Expand All @@ -53,70 +53,70 @@ impl Application {
Url::parse(&root_path)
}

fn get(&self, target: &str) -> Result<String, reqwest::Error> {
async fn get(&self, target: &str) -> Result<String, reqwest::Error> {
let url = self.base_url.join(target).unwrap();
debug!("--> HTTP GET to: {}", &url);
let resp = self.http_client.get(url.as_str()).send()?;
let resp_text = resp.text()?;
let resp = self.http_client.get(url.as_str()).send().await?;
let resp_text = resp.text().await?;
debug!("--> Reponse body: {}", &resp_text);
return Ok(resp_text);
}

fn head(&self, target: &str, key: &str) -> Option<HeaderValue> {
async fn head(&self, target: &str, key: &str) -> Option<HeaderValue> {
let url = self.base_url.join(target).unwrap();
debug!("HTTP HEAD to: {}", &url);
let resp = self.http_client.head(url.as_str()).send().expect("Invalid request");
let resp = self.http_client.head(url.as_str()).send().await.expect("Invalid request");
let version = resp.headers().get(key);
match version {
Some(h) => Some(h.clone()),
None => None
}
}

fn post(&self, target: &str, json: &Map<String, Value>) -> Result<String, reqwest::Error> {
async fn post(&self, target: &str, json: &Map<String, Value>) -> Result<String, reqwest::Error> {
let url = self.base_url.join(target).unwrap();
debug!("--> HTTP POST to: {}", &url);
let resp = self.http_client.post(url.as_str()).json(json).send()?;
let resp_text = resp.text()?;
let resp = self.http_client.post(url.as_str()).json(json).send().await?;
let resp_text = resp.text().await?;
debug!("--> Reponse body: {}", &resp_text);
return Ok(resp_text);
}

fn download(&self, url: String, dest_path: &Path) {
async fn download(&self, url: String, dest_path: &Path) {
debug!("--> Download {} to {}", &url, dest_path.display());
let resp = reqwest::blocking::get(url).unwrap();
let resp = reqwest::get(url).await.unwrap();
let mut out = fs::File::create(dest_path).expect("Failed to create an output file");
let content = resp.bytes().unwrap();
let content = resp.bytes().await.unwrap();
io::copy(&mut content.as_bytes(), &mut out).expect("Failed to save the content to output file");
// Ok(())
}

pub fn list_shares(&self) -> Result<Vec<Share>, anyhow::Error> {
let shares = self.get("shares")?;
pub async fn list_shares(&self) -> Result<Vec<Share>, anyhow::Error> {
let shares = self.get("shares").await?;
let parsed: ShareResponse = serde_json::from_str(&shares).expect("Invalid response");
return Ok(parsed.items.clone());
}

pub fn list_schemas(&self, share: &Share) -> Result<Vec<Schema>, anyhow::Error> {
let schemas = self.get(&format!("shares/{}/schemas", share.name))?;
pub async fn list_schemas(&self, share: &Share) -> Result<Vec<Schema>, anyhow::Error> {
let schemas = self.get(&format!("shares/{}/schemas", share.name)).await?;
let parsed: SchemaResponse = serde_json::from_str(&schemas).expect("Invalid response");
return Ok(parsed.items.clone());
}

pub fn list_tables(&self, schema: &Schema) -> Result<Vec<Table>, anyhow::Error> {
let tables = self.get(&format!("shares/{}/schemas/{}/tables", schema.share, schema.name))?;
pub async fn list_tables(&self, schema: &Schema) -> Result<Vec<Table>, anyhow::Error> {
let tables = self.get(&format!("shares/{}/schemas/{}/tables", schema.share, schema.name)).await?;
let parsed: TableResponse = serde_json::from_str(&tables).expect("Invalid response");
return Ok(parsed.items.clone());
}

pub fn list_all_tables(&self, share: &Share) -> Result<Vec<Table>, anyhow::Error> {
let tables = self.get(&format!("shares/{}/all-tables", share.name))?;
pub async fn list_all_tables(&self, share: &Share) -> Result<Vec<Table>, anyhow::Error> {
let tables = self.get(&format!("shares/{}/all-tables", share.name)).await?;
let parsed: TableResponse = serde_json::from_str(&tables).expect("Invalid response");
return Ok(parsed.items.clone());
}

pub fn get_table_metadata(&self, table: &Table) -> Result<TableMetadata, anyhow::Error> {
let meta = self.get(&format!("shares/{}/schemas/{}/tables/{}/metadata", table.share, table.schema, table.name))?;
pub async fn get_table_metadata(&self, table: &Table) -> Result<TableMetadata, anyhow::Error> {
let meta = self.get(&format!("shares/{}/schemas/{}/tables/{}/metadata", table.share, table.schema, table.name)).await?;
let mut meta_lines = meta.lines();
let protocol: ProtocolResponse = serde_json::from_str( meta_lines.next().expect("Invalid response")).expect("Invalid protocol");
let metadata: MetadataResponse = serde_json::from_str( meta_lines.next().expect("Invalid response")).expect("Invalid metadata");
Expand All @@ -128,17 +128,17 @@ impl Application {
)
}

pub fn get_table_version(&self, table: &Table) -> i32 {
let version = self.head(&format!("shares/{}/schemas/{}/tables/{}", table.share, table.schema, table.name), "delta-table-version");
pub async fn get_table_version(&self, table: &Table) -> i32 {
let version = self.head(&format!("shares/{}/schemas/{}/tables/{}", table.share, table.schema, table.name), "delta-table-version").await;
match version {
Some(v) => v.to_str().expect("Invalid version number").parse::<i32>().expect("Invalid version number"),
None => -1
}
}

pub fn list_table_files(&self,
pub async fn list_table_files(&self,
table: &Table,
predicate_hints: Option<Vec<String>>,
predicate_hints: Option<Vec<String>>,
limit_hint: Option<i32>,
version: Option<i32>) -> Result<TableFiles, anyhow::Error> {

Expand All @@ -152,7 +152,7 @@ impl Application {
if version.is_some() {
map.insert("version".to_string(), Value::Number(Number::from(version.unwrap())));
}
let response = self.post(&format!("shares/{}/schemas/{}/tables/{}/query", table.share, table.schema, table.name), &map)?;
let response = self.post(&format!("shares/{}/schemas/{}/tables/{}/query", table.share, table.schema, table.name), &map).await?;
let mut lines = response.lines();
let protocol: ProtocolResponse = serde_json::from_str(lines.next().expect("Invalid response")).expect("Invalid protocol");
let metadata: MetadataResponse = serde_json::from_str(lines.next().expect("Invalid response")).expect("Invalid metadata");
Expand All @@ -172,21 +172,21 @@ impl Application {
)
}

fn download_files(&self, table_path: &PathBuf, table_files: &TableFiles) -> Vec<PathBuf> {
async fn download_files(&self, table_path: &PathBuf, table_files: &TableFiles) -> Vec<PathBuf> {
if Path::exists(&table_path) {
fs::remove_dir_all(&table_path).unwrap();
}
fs::create_dir_all(&table_path).unwrap();
let mut file_paths: Vec<PathBuf> = Vec::new();
for file in table_files.files.clone() {
let dst_path = &table_path.join(format!("{}.snappy.parquet", &file.id));
self.download(file.url, &dst_path);
self.download(file.url, &dst_path).await;
file_paths.push(dst_path.clone());
}
file_paths.clone()
}

fn load_cached(&self, table_path: &PathBuf, table_files: &TableFiles) -> Option<Vec<PathBuf>> {
async fn load_cached(&self, table_path: &PathBuf, table_files: &TableFiles) -> Option<Vec<PathBuf>> {
// Check if the files exist, load and compare the files.
let metadata_path = &table_path.join(METADATA_FILE);
if Path::exists(&metadata_path) {
Expand Down Expand Up @@ -214,28 +214,28 @@ impl Application {
None
}

pub fn get_files(&mut self, table: &Table) -> Result<Vec<PathBuf>, anyhow::Error> {
pub async fn get_files(&mut self, table: &Table) -> Result<Vec<PathBuf>, anyhow::Error> {
let key = table.fully_qualified_name();
let mut download = true;
let table_path = Path::new(&self.data_root).join(table.fully_qualified_name());
let table_files = self.list_table_files(table, None, None, None).unwrap();
let table_files = self.list_table_files(table, None, None, None).await?;
if let Some(cached) = self.cache.get(&key) {
download = cached.table_files.metadata != table_files.metadata;
} else if let Some(cached) = self.load_cached(&table_path, &table_files) {
} else if let Some(cached) = self.load_cached(&table_path, &table_files).await {
download = false;
self.cache.insert(key.clone(), FileCache {table_files: table_files.clone(), file_paths: cached});
}
if download {
info!("--> Downloading data files to {}", &table_path.display());
let paths = self.download_files(&table_path, &table_files);
let paths = self.download_files(&table_path, &table_files).await;
serde_json::to_writer(&fs::File::create(&table_path.join(METADATA_FILE))?, &table_files.metadata)?;
self.cache.insert(key.clone(), FileCache {table_files: table_files, file_paths: paths});
}
Ok(self.cache.get(&key).unwrap().file_paths.clone())
}

pub fn get_dataframe(&mut self, table: &Table) -> PolarResult<LazyFrame> {
self.get_files(&table)?;
pub async fn get_dataframe(&mut self, table: &Table) -> PolarResult<LazyFrame> {
self.get_files(&table).await?;
let table_path = Path::new(&self.data_root).join(table.fully_qualified_name());
load_parquet_files_as_dataframe(&table_path)
}
Expand Down
Loading

0 comments on commit 4c314ce

Please sign in to comment.