diff --git a/.gitignore b/.gitignore index 96ef6c0..4ad9711 100644 --- a/.gitignore +++ b/.gitignore @@ -1,2 +1,4 @@ +.DS_Store +local-* /target Cargo.lock diff --git a/Cargo.toml b/Cargo.toml index a340c07..4824132 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -7,9 +7,20 @@ edition = "2021" [lib] path = "src/lib.rs" +[features] +blocking = ["reqwest/blocking"] + +[[example]] +name = "async" + +[[example]] +name = "blocking" +required-features = ["blocking"] + [dependencies] +tokio = { version = "1", features = ["macros", "rt-multi-thread",] } parquet = "14.0.0" -reqwest = { version = "0.11", features = ["blocking", "json"] } +reqwest = { version = "0.11", features = ["json"] } url = "2.2" rustc_version_runtime = "0.1" serde = { version = "1.0", features = ["derive"] } @@ -21,5 +32,5 @@ polars = { version = "0.22.8", features = ["lazy", "parquet"] } [dev-dependencies] wiremock = "0.5" -futures = "0.3" -uuid = { version = "1.1", features = ["v4"] } \ No newline at end of file +uuid = { version = "1.1", features = ["v4"] } +tokio-test = { version = "0.4" } \ No newline at end of file diff --git a/README.md b/README.md index fbb4abc..c3901d4 100644 --- a/README.md +++ b/README.md @@ -1,14 +1,11 @@ -![experimental](https://github.com/GIScience/badges/raw/master/status/experimental.svg) -[![main](https://github.com/r3stl355/delta-sharing-rust-client/actions/workflows/main.yml/badge.svg?branch=main)](https://github.com/r3stl355/delta-sharing-rust-client/actions/workflows/main.yml) - # Delta Sharing client library for Rust -This is a simple library for Rust to access data published via Delta Sharing +This is a simple library for Rust to access data published via Delta Sharing. Has an async (`delta-sharing::application::Application`) version, and also a blocking (`delta-sharing::blocking::Application`) version for smaller operations, both exposing similar APIs. ## Features - Retrieve Delta Sharing information (shares, schemas, tables and files) -- Query shared table data using [Polars](https://pola-rs.github.io/polars/polars/index.html). `get_dataframe` downloads the table's parquet files (and caches then locally for subsequent reads) and returns a lazy abstraction (logical plan) over an eager DataFrame. This lazy abstraction provides methods for incrementally modifying that logical plan until the final output is requested (via `collect`). +- Query shared table data using [Polars](https://pola-rs.github.io/polars/polars/index.html). `get_dataframe` downloads the table's parquet files (and caches then locally for subsequent queries) and returns a lazy abstraction (logical plan) over an eager DataFrame. This lazy abstraction provides methods for incrementally modifying that logical plan until output is requested (via `collect`). ## Pre-requisites @@ -17,9 +14,10 @@ This is a simple library for Rust to access data published via Delta Sharing ## Quick start -Use this [sample project](https://github.com/r3stl355/delta-sharing-rust-cllient-use-example) for a quick start +- Clone this repo +- Set the `bearerToken` and `endpoint` values in the `config.json` to match your Delta Sharing information +- Run a simple example included with the library: `cargo run --example async`. This example is using an async version of the library. When executed, it will get and display all the data from the first Data Sharing table it finds. For an example using a blocking version of the client, run `cargo run --example blocking --features="delta-sharing/blocking"` -## TODO (Work in progress) +## TODO -- implement `async` mode (e.g. swap blocking `reqwest` client to async version) - -- write more tests \ No newline at end of file +Need to add more tests \ No newline at end of file diff --git a/config.json b/config.json new file mode 100644 index 0000000..8203aa8 --- /dev/null +++ b/config.json @@ -0,0 +1,5 @@ +{ + "shareCredentialsVersion":1, + "bearerToken":"", + "endpoint":"" +} \ No newline at end of file diff --git a/examples/async.rs b/examples/async.rs new file mode 100644 index 0000000..a32a83b --- /dev/null +++ b/examples/async.rs @@ -0,0 +1,25 @@ +#[tokio::main] +async fn main() { + + use std::{fs}; + + env_logger::init(); + + println!("An example using an async client"); + + let conf_str = &fs::read_to_string("./config.json").unwrap(); + let config: delta_sharing::protocol::ProviderConfig = serde_json::from_str(conf_str).expect("Invalid configuration"); + let mut app = delta_sharing::application::Application::new(config, None).await.unwrap(); + let shares = app.list_shares().await.unwrap(); + if shares.len() == 0 { + println!("At least 1 Delta Share is required"); + } else { + let tables = app.list_all_tables(&shares[0]).await.unwrap(); + if shares.len() == 0 { + println!("You need to have at least one table in share {}, or use a different share", shares[0].name); + } else { + let res = app.get_dataframe(&tables[0]).await.unwrap().collect().unwrap(); + println!("Dataframe:\n {}", res); + } + } +} \ No newline at end of file diff --git a/examples/blocking.rs b/examples/blocking.rs new file mode 100644 index 0000000..d67461e --- /dev/null +++ b/examples/blocking.rs @@ -0,0 +1,25 @@ +fn main() { + + use std::{fs}; + + env_logger::init(); + + println!("An example using a blocking client"); + + let conf_str = &fs::read_to_string("./config.json").unwrap(); + + let config: delta_sharing::protocol::ProviderConfig = serde_json::from_str(conf_str).expect("Invalid configuration"); + let mut app = delta_sharing::blocking::Application::new(config, None).unwrap(); + let shares = app.list_shares().unwrap(); + if shares.len() == 0 { + println!("At least 1 Delta Share is required"); + } else { + let tables = app.list_all_tables(&shares[0]).unwrap(); + if shares.len() == 0 { + println!("You need to have at least one table in share {}, or use a different share", shares[0].name); + } else { + let res = app.get_dataframe(&tables[0]).unwrap().collect().unwrap(); + println!("Dataframe:\n {}", res); + } + } +} \ No newline at end of file diff --git a/src/application.rs b/src/application.rs index 58bad22..c8263a7 100644 --- a/src/application.rs +++ b/src/application.rs @@ -13,14 +13,14 @@ use crate::reader::*; const METADATA_FILE: &str = "metadata.json"; pub struct Application { - http_client: reqwest::blocking::Client, + http_client: reqwest::Client, base_url: Url, pub data_root: String, cache: HashMap, } impl Application { - pub fn new(provider_config: ProviderConfig, data_root: Option) -> Result { + pub async fn new(provider_config: ProviderConfig, data_root: Option) -> Result { if provider_config.share_credentials_version > CREDENTIALS_VERSION { panic!("'share_credentials_version' in the provider configuration is {}, which is newer than the \ version {} supported by the current release. Please upgrade to a newer release.", @@ -36,14 +36,14 @@ impl Application { }) } - fn get_client(config: &ProviderConfig) -> Result { + fn get_client(config: &ProviderConfig) -> Result { let rust_version: &str = &format!("{}", rustc_version_runtime::version()); let user_agent: &str = &format!("Delta-Sharing-Rust/{VERSION} Rust/{rust_version}"); let bearer_token = &format!("Bearer {}", config.bearer_token); let mut headers = header::HeaderMap::new(); headers.insert(header::AUTHORIZATION, header::HeaderValue::from_str(bearer_token).unwrap()); headers.insert(header::USER_AGENT, header::HeaderValue::from_str(user_agent).unwrap()); - reqwest::blocking::Client::builder().default_headers(headers).build() + reqwest::Client::builder().default_headers(headers).build() } fn build_base_url(endpoint: &String) -> Result { @@ -53,19 +53,19 @@ impl Application { Url::parse(&root_path) } - fn get(&self, target: &str) -> Result { + async fn get(&self, target: &str) -> Result { let url = self.base_url.join(target).unwrap(); debug!("--> HTTP GET to: {}", &url); - let resp = self.http_client.get(url.as_str()).send()?; - let resp_text = resp.text()?; + let resp = self.http_client.get(url.as_str()).send().await?; + let resp_text = resp.text().await?; debug!("--> Reponse body: {}", &resp_text); return Ok(resp_text); } - fn head(&self, target: &str, key: &str) -> Option { + async fn head(&self, target: &str, key: &str) -> Option { let url = self.base_url.join(target).unwrap(); debug!("HTTP HEAD to: {}", &url); - let resp = self.http_client.head(url.as_str()).send().expect("Invalid request"); + let resp = self.http_client.head(url.as_str()).send().await.expect("Invalid request"); let version = resp.headers().get(key); match version { Some(h) => Some(h.clone()), @@ -73,50 +73,50 @@ impl Application { } } - fn post(&self, target: &str, json: &Map) -> Result { + async fn post(&self, target: &str, json: &Map) -> Result { let url = self.base_url.join(target).unwrap(); debug!("--> HTTP POST to: {}", &url); - let resp = self.http_client.post(url.as_str()).json(json).send()?; - let resp_text = resp.text()?; + let resp = self.http_client.post(url.as_str()).json(json).send().await?; + let resp_text = resp.text().await?; debug!("--> Reponse body: {}", &resp_text); return Ok(resp_text); } - fn download(&self, url: String, dest_path: &Path) { + async fn download(&self, url: String, dest_path: &Path) { debug!("--> Download {} to {}", &url, dest_path.display()); - let resp = reqwest::blocking::get(url).unwrap(); + let resp = reqwest::get(url).await.unwrap(); let mut out = fs::File::create(dest_path).expect("Failed to create an output file"); - let content = resp.bytes().unwrap(); + let content = resp.bytes().await.unwrap(); io::copy(&mut content.as_bytes(), &mut out).expect("Failed to save the content to output file"); // Ok(()) } - pub fn list_shares(&self) -> Result, anyhow::Error> { - let shares = self.get("shares")?; + pub async fn list_shares(&self) -> Result, anyhow::Error> { + let shares = self.get("shares").await?; let parsed: ShareResponse = serde_json::from_str(&shares).expect("Invalid response"); return Ok(parsed.items.clone()); } - pub fn list_schemas(&self, share: &Share) -> Result, anyhow::Error> { - let schemas = self.get(&format!("shares/{}/schemas", share.name))?; + pub async fn list_schemas(&self, share: &Share) -> Result, anyhow::Error> { + let schemas = self.get(&format!("shares/{}/schemas", share.name)).await?; let parsed: SchemaResponse = serde_json::from_str(&schemas).expect("Invalid response"); return Ok(parsed.items.clone()); } - pub fn list_tables(&self, schema: &Schema) -> Result, anyhow::Error> { - let tables = self.get(&format!("shares/{}/schemas/{}/tables", schema.share, schema.name))?; + pub async fn list_tables(&self, schema: &Schema) -> Result, anyhow::Error> { + let tables = self.get(&format!("shares/{}/schemas/{}/tables", schema.share, schema.name)).await?; let parsed: TableResponse = serde_json::from_str(&tables).expect("Invalid response"); return Ok(parsed.items.clone()); } - pub fn list_all_tables(&self, share: &Share) -> Result, anyhow::Error> { - let tables = self.get(&format!("shares/{}/all-tables", share.name))?; + pub async fn list_all_tables(&self, share: &Share) -> Result, anyhow::Error> { + let tables = self.get(&format!("shares/{}/all-tables", share.name)).await?; let parsed: TableResponse = serde_json::from_str(&tables).expect("Invalid response"); return Ok(parsed.items.clone()); } - pub fn get_table_metadata(&self, table: &Table) -> Result { - let meta = self.get(&format!("shares/{}/schemas/{}/tables/{}/metadata", table.share, table.schema, table.name))?; + pub async fn get_table_metadata(&self, table: &Table) -> Result { + let meta = self.get(&format!("shares/{}/schemas/{}/tables/{}/metadata", table.share, table.schema, table.name)).await?; let mut meta_lines = meta.lines(); let protocol: ProtocolResponse = serde_json::from_str( meta_lines.next().expect("Invalid response")).expect("Invalid protocol"); let metadata: MetadataResponse = serde_json::from_str( meta_lines.next().expect("Invalid response")).expect("Invalid metadata"); @@ -128,17 +128,17 @@ impl Application { ) } - pub fn get_table_version(&self, table: &Table) -> i32 { - let version = self.head(&format!("shares/{}/schemas/{}/tables/{}", table.share, table.schema, table.name), "delta-table-version"); + pub async fn get_table_version(&self, table: &Table) -> i32 { + let version = self.head(&format!("shares/{}/schemas/{}/tables/{}", table.share, table.schema, table.name), "delta-table-version").await; match version { Some(v) => v.to_str().expect("Invalid version number").parse::().expect("Invalid version number"), None => -1 } } - pub fn list_table_files(&self, + pub async fn list_table_files(&self, table: &Table, - predicate_hints: Option>, + predicate_hints: Option>, limit_hint: Option, version: Option) -> Result { @@ -152,7 +152,7 @@ impl Application { if version.is_some() { map.insert("version".to_string(), Value::Number(Number::from(version.unwrap()))); } - let response = self.post(&format!("shares/{}/schemas/{}/tables/{}/query", table.share, table.schema, table.name), &map)?; + let response = self.post(&format!("shares/{}/schemas/{}/tables/{}/query", table.share, table.schema, table.name), &map).await?; let mut lines = response.lines(); let protocol: ProtocolResponse = serde_json::from_str(lines.next().expect("Invalid response")).expect("Invalid protocol"); let metadata: MetadataResponse = serde_json::from_str(lines.next().expect("Invalid response")).expect("Invalid metadata"); @@ -172,7 +172,7 @@ impl Application { ) } - fn download_files(&self, table_path: &PathBuf, table_files: &TableFiles) -> Vec { + async fn download_files(&self, table_path: &PathBuf, table_files: &TableFiles) -> Vec { if Path::exists(&table_path) { fs::remove_dir_all(&table_path).unwrap(); } @@ -180,13 +180,13 @@ impl Application { let mut file_paths: Vec = Vec::new(); for file in table_files.files.clone() { let dst_path = &table_path.join(format!("{}.snappy.parquet", &file.id)); - self.download(file.url, &dst_path); + self.download(file.url, &dst_path).await; file_paths.push(dst_path.clone()); } file_paths.clone() } - fn load_cached(&self, table_path: &PathBuf, table_files: &TableFiles) -> Option> { + async fn load_cached(&self, table_path: &PathBuf, table_files: &TableFiles) -> Option> { // Check if the files exist, load and compare the files. let metadata_path = &table_path.join(METADATA_FILE); if Path::exists(&metadata_path) { @@ -214,28 +214,28 @@ impl Application { None } - pub fn get_files(&mut self, table: &Table) -> Result, anyhow::Error> { + pub async fn get_files(&mut self, table: &Table) -> Result, anyhow::Error> { let key = table.fully_qualified_name(); let mut download = true; let table_path = Path::new(&self.data_root).join(table.fully_qualified_name()); - let table_files = self.list_table_files(table, None, None, None).unwrap(); + let table_files = self.list_table_files(table, None, None, None).await?; if let Some(cached) = self.cache.get(&key) { download = cached.table_files.metadata != table_files.metadata; - } else if let Some(cached) = self.load_cached(&table_path, &table_files) { + } else if let Some(cached) = self.load_cached(&table_path, &table_files).await { download = false; self.cache.insert(key.clone(), FileCache {table_files: table_files.clone(), file_paths: cached}); } if download { info!("--> Downloading data files to {}", &table_path.display()); - let paths = self.download_files(&table_path, &table_files); + let paths = self.download_files(&table_path, &table_files).await; serde_json::to_writer(&fs::File::create(&table_path.join(METADATA_FILE))?, &table_files.metadata)?; self.cache.insert(key.clone(), FileCache {table_files: table_files, file_paths: paths}); } Ok(self.cache.get(&key).unwrap().file_paths.clone()) } - pub fn get_dataframe(&mut self, table: &Table) -> PolarResult { - self.get_files(&table)?; + pub async fn get_dataframe(&mut self, table: &Table) -> PolarResult { + self.get_files(&table).await?; let table_path = Path::new(&self.data_root).join(table.fully_qualified_name()); load_parquet_files_as_dataframe(&table_path) } diff --git a/src/blocking/mod.rs b/src/blocking/mod.rs new file mode 100644 index 0000000..58bad22 --- /dev/null +++ b/src/blocking/mod.rs @@ -0,0 +1,247 @@ +use std::{io, fs, path::Path, path::PathBuf}; +use parquet::data_type::AsBytes; +use reqwest::{header, header::HeaderValue}; +use url::Url; +use serde_json::{Value, Map, Number}; +use std::collections::HashMap; +use std::env; +use polars::prelude::{Result as PolarResult, LazyFrame}; +use crate::protocol::*; +use crate::utils::*; +use crate::reader::*; + +const METADATA_FILE: &str = "metadata.json"; + +pub struct Application { + http_client: reqwest::blocking::Client, + base_url: Url, + pub data_root: String, + cache: HashMap, +} + +impl Application { + pub fn new(provider_config: ProviderConfig, data_root: Option) -> Result { + if provider_config.share_credentials_version > CREDENTIALS_VERSION { + panic!("'share_credentials_version' in the provider configuration is {}, which is newer than the \ + version {} supported by the current release. Please upgrade to a newer release.", + provider_config.share_credentials_version, + CREDENTIALS_VERSION); + } + let cache: HashMap = HashMap::new(); + Ok(Self { + http_client: Self::get_client(&provider_config)?, + base_url: Self::build_base_url(&provider_config.endpoint)?, + data_root: data_root.unwrap_or(env::temp_dir().as_path().join("delta_sharing").to_str().unwrap().to_string()), + cache: cache, + }) + } + + fn get_client(config: &ProviderConfig) -> Result { + let rust_version: &str = &format!("{}", rustc_version_runtime::version()); + let user_agent: &str = &format!("Delta-Sharing-Rust/{VERSION} Rust/{rust_version}"); + let bearer_token = &format!("Bearer {}", config.bearer_token); + let mut headers = header::HeaderMap::new(); + headers.insert(header::AUTHORIZATION, header::HeaderValue::from_str(bearer_token).unwrap()); + headers.insert(header::USER_AGENT, header::HeaderValue::from_str(user_agent).unwrap()); + reqwest::blocking::Client::builder().default_headers(headers).build() + } + + fn build_base_url(endpoint: &String) -> Result { + + let mut root_path = endpoint.trim_end_matches('/').to_string(); + root_path.push('/'); + Url::parse(&root_path) + } + + fn get(&self, target: &str) -> Result { + let url = self.base_url.join(target).unwrap(); + debug!("--> HTTP GET to: {}", &url); + let resp = self.http_client.get(url.as_str()).send()?; + let resp_text = resp.text()?; + debug!("--> Reponse body: {}", &resp_text); + return Ok(resp_text); + } + + fn head(&self, target: &str, key: &str) -> Option { + let url = self.base_url.join(target).unwrap(); + debug!("HTTP HEAD to: {}", &url); + let resp = self.http_client.head(url.as_str()).send().expect("Invalid request"); + let version = resp.headers().get(key); + match version { + Some(h) => Some(h.clone()), + None => None + } + } + + fn post(&self, target: &str, json: &Map) -> Result { + let url = self.base_url.join(target).unwrap(); + debug!("--> HTTP POST to: {}", &url); + let resp = self.http_client.post(url.as_str()).json(json).send()?; + let resp_text = resp.text()?; + debug!("--> Reponse body: {}", &resp_text); + return Ok(resp_text); + } + + fn download(&self, url: String, dest_path: &Path) { + debug!("--> Download {} to {}", &url, dest_path.display()); + let resp = reqwest::blocking::get(url).unwrap(); + let mut out = fs::File::create(dest_path).expect("Failed to create an output file"); + let content = resp.bytes().unwrap(); + io::copy(&mut content.as_bytes(), &mut out).expect("Failed to save the content to output file"); + // Ok(()) + } + + pub fn list_shares(&self) -> Result, anyhow::Error> { + let shares = self.get("shares")?; + let parsed: ShareResponse = serde_json::from_str(&shares).expect("Invalid response"); + return Ok(parsed.items.clone()); + } + + pub fn list_schemas(&self, share: &Share) -> Result, anyhow::Error> { + let schemas = self.get(&format!("shares/{}/schemas", share.name))?; + let parsed: SchemaResponse = serde_json::from_str(&schemas).expect("Invalid response"); + return Ok(parsed.items.clone()); + } + + pub fn list_tables(&self, schema: &Schema) -> Result, anyhow::Error> { + let tables = self.get(&format!("shares/{}/schemas/{}/tables", schema.share, schema.name))?; + let parsed: TableResponse = serde_json::from_str(&tables).expect("Invalid response"); + return Ok(parsed.items.clone()); + } + + pub fn list_all_tables(&self, share: &Share) -> Result, anyhow::Error> { + let tables = self.get(&format!("shares/{}/all-tables", share.name))?; + let parsed: TableResponse = serde_json::from_str(&tables).expect("Invalid response"); + return Ok(parsed.items.clone()); + } + + pub fn get_table_metadata(&self, table: &Table) -> Result { + let meta = self.get(&format!("shares/{}/schemas/{}/tables/{}/metadata", table.share, table.schema, table.name))?; + let mut meta_lines = meta.lines(); + let protocol: ProtocolResponse = serde_json::from_str( meta_lines.next().expect("Invalid response")).expect("Invalid protocol"); + let metadata: MetadataResponse = serde_json::from_str( meta_lines.next().expect("Invalid response")).expect("Invalid metadata"); + Ok( + TableMetadata { + protocol: protocol.protocol, + metadata: metadata.metadata, + } + ) + } + + pub fn get_table_version(&self, table: &Table) -> i32 { + let version = self.head(&format!("shares/{}/schemas/{}/tables/{}", table.share, table.schema, table.name), "delta-table-version"); + match version { + Some(v) => v.to_str().expect("Invalid version number").parse::().expect("Invalid version number"), + None => -1 + } + } + + pub fn list_table_files(&self, + table: &Table, + predicate_hints: Option>, + limit_hint: Option, + version: Option) -> Result { + + let mut map = Map::new(); + if predicate_hints.is_some() { + map.insert("predicateHints".to_string(), Value::Array(predicate_hints.unwrap().iter().map(|s| Value::String(s.to_string())).collect::>())); + } + if limit_hint.is_some() { + map.insert("limitHint".to_string(), Value::Number(Number::from(limit_hint.unwrap()))); + } + if version.is_some() { + map.insert("version".to_string(), Value::Number(Number::from(version.unwrap()))); + } + let response = self.post(&format!("shares/{}/schemas/{}/tables/{}/query", table.share, table.schema, table.name), &map)?; + let mut lines = response.lines(); + let protocol: ProtocolResponse = serde_json::from_str(lines.next().expect("Invalid response")).expect("Invalid protocol"); + let metadata: MetadataResponse = serde_json::from_str(lines.next().expect("Invalid response")).expect("Invalid metadata"); + let mut files: Vec = Vec::new(); + for l in lines { + let file: FileResponse = serde_json::from_str(l).expect("Invalid file info"); + files.push(file.file.clone()); + }; + Ok( + TableFiles { + metadata: TableMetadata { + protocol: protocol.protocol, + metadata: metadata.metadata, + }, + files, + } + ) + } + + fn download_files(&self, table_path: &PathBuf, table_files: &TableFiles) -> Vec { + if Path::exists(&table_path) { + fs::remove_dir_all(&table_path).unwrap(); + } + fs::create_dir_all(&table_path).unwrap(); + let mut file_paths: Vec = Vec::new(); + for file in table_files.files.clone() { + let dst_path = &table_path.join(format!("{}.snappy.parquet", &file.id)); + self.download(file.url, &dst_path); + file_paths.push(dst_path.clone()); + } + file_paths.clone() + } + + fn load_cached(&self, table_path: &PathBuf, table_files: &TableFiles) -> Option> { + // Check if the files exist, load and compare the files. + let metadata_path = &table_path.join(METADATA_FILE); + if Path::exists(&metadata_path) { + let metadata_str = &fs::read_to_string(&metadata_path).unwrap(); + let metadata: TableMetadata = serde_json::from_str(&metadata_str).expect(&format!("Invalid configuration in {}", metadata_path.display())); + let mut download = metadata != table_files.metadata; + + if !download { + let mut file_paths: Vec = Vec::new(); + for file in &table_files.files { + let file_path = &table_path.join(format!("{}.snappy.parquet", &file.id)); + if !Path::exists(&file_path) { + // File is missing, invalidate cache + download = true; + fs::remove_dir(&table_path).unwrap(); + break; + } + file_paths.push(file_path.clone()); + } + if !download { + return Some(file_paths.clone()); + } + } + } + None + } + + pub fn get_files(&mut self, table: &Table) -> Result, anyhow::Error> { + let key = table.fully_qualified_name(); + let mut download = true; + let table_path = Path::new(&self.data_root).join(table.fully_qualified_name()); + let table_files = self.list_table_files(table, None, None, None).unwrap(); + if let Some(cached) = self.cache.get(&key) { + download = cached.table_files.metadata != table_files.metadata; + } else if let Some(cached) = self.load_cached(&table_path, &table_files) { + download = false; + self.cache.insert(key.clone(), FileCache {table_files: table_files.clone(), file_paths: cached}); + } + if download { + info!("--> Downloading data files to {}", &table_path.display()); + let paths = self.download_files(&table_path, &table_files); + serde_json::to_writer(&fs::File::create(&table_path.join(METADATA_FILE))?, &table_files.metadata)?; + self.cache.insert(key.clone(), FileCache {table_files: table_files, file_paths: paths}); + } + Ok(self.cache.get(&key).unwrap().file_paths.clone()) + } + + pub fn get_dataframe(&mut self, table: &Table) -> PolarResult { + self.get_files(&table)?; + let table_path = Path::new(&self.data_root).join(table.fully_qualified_name()); + load_parquet_files_as_dataframe(&table_path) + } +} + + + + + diff --git a/src/lib.rs b/src/lib.rs index e24c86c..fd405ab 100644 --- a/src/lib.rs +++ b/src/lib.rs @@ -4,4 +4,6 @@ extern crate log; pub mod application; pub mod protocol; pub mod reader; -pub mod utils; \ No newline at end of file +pub mod utils; +#[cfg(feature="blocking")] +pub mod blocking; \ No newline at end of file diff --git a/tests/application.rs b/tests/application.rs index 9588e2d..cfbf6ab 100644 --- a/tests/application.rs +++ b/tests/application.rs @@ -2,7 +2,7 @@ mod common; use wiremock::matchers::{method, path}; use wiremock::{Mock, ResponseTemplate}; -use futures::executor::block_on; +use tokio_test::block_on; use common::spawn_test_app; #[test] @@ -10,7 +10,6 @@ fn parse_share_listing() { // Arrange let test_app = block_on(spawn_test_app()); let body = r#"{"items":[{"name":"share_1","id":"1"},{"name":"share_2","id":"2"}]}"#; - // let body = r#"{"items":[{"name":"share_1","id":"1"}]}"#; let response = ResponseTemplate::new(200).set_body_string(body); let mock = Mock::given(path("/shares")) .and(method("GET")) @@ -20,7 +19,7 @@ fn parse_share_listing() { block_on(mock); // Act - let shares = &test_app.app.list_shares().unwrap(); + let shares = block_on(test_app.app.list_shares()).unwrap(); // Assert assert_eq!(shares.len(), 2, "Expected a vector of {}, got {}", shares.len(), 2); diff --git a/tests/common/mod.rs b/tests/common/mod.rs index 62ee22f..7442f77 100644 --- a/tests/common/mod.rs +++ b/tests/common/mod.rs @@ -23,7 +23,7 @@ pub async fn spawn_test_app() -> TestApp { endpoint: server.uri(), bearer_token: token, }; - let app = Application::new(config, None).unwrap(); + let app = Application::new(config, None).await.unwrap(); let test_app = TestApp { app, server,