Skip to content

Commit

Permalink
allow to specify file request in get dataframe
Browse files Browse the repository at this point in the history
  • Loading branch information
adalpane committed Jul 12, 2024
1 parent 0a0dab4 commit 12573e7
Show file tree
Hide file tree
Showing 11 changed files with 41 additions and 41 deletions.
2 changes: 1 addition & 1 deletion examples/async.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ async fn main() {
);
} else {
let res = app
.get_dataframe(&tables[0])
.get_dataframe(&tables[0], None)
.await
.unwrap()
.collect()
Expand Down
2 changes: 1 addition & 1 deletion examples/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ fn main() {
shares[0].name
);
} else {
let res = app.get_dataframe(&tables[0]).unwrap().collect().unwrap();
let res = app.get_dataframe(&tables[0], None).unwrap().collect().unwrap();
println!("Dataframe:\n {}", res);
}
}
Expand Down
23 changes: 10 additions & 13 deletions src/blocking/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,29 +193,26 @@ impl Client {
pub fn list_table_files(
&self,
table: &Table,
predicate_hints: Option<Vec<String>>,
limit_hint: Option<i32>,
version: Option<i32>,
request: Option<FilesRequest>,
) -> Result<TableFiles, anyhow::Error> {
let mut map = Map::new();
if predicate_hints.is_some() {
if let Some(predicate_hints) = request.as_ref().and_then(|r| r.predicate_hints.as_ref()) {
map.insert(
"predicateHints".to_string(),
Value::Array(
predicate_hints
.map(|hints| hints.iter().map(|s| Value::String(s.to_string()))
.collect::<Vec<_>>())
.unwrap_or_default()
.iter().map(|s| Value::String(s.to_string()))
.collect::<Vec<_>>()
),
);
}
if let Some(limit_hint) = limit_hint {
if let Some(limit_hint) = request.as_ref().and_then(|r| r.limit_hint) {
map.insert(
"limitHint".to_string(),
Value::Number(Number::from(limit_hint)),
);
}
if let Some(version) = version {
if let Some(version) = request.as_ref().and_then(|r| r.version) {
map.insert(
"version".to_string(),
Value::Number(Number::from(version)),
Expand Down Expand Up @@ -297,11 +294,11 @@ impl Client {
Ok(None)
}

pub fn get_files(&mut self, table: &Table) -> Result<Vec<PathBuf>, anyhow::Error> {
pub fn get_files(&mut self, table: &Table, request: Option<FilesRequest>) -> Result<Vec<PathBuf>, anyhow::Error> {
let key = table.fully_qualified_name();
let mut download = true;
let table_path = Path::new(&self.data_root).join(table.fully_qualified_name());
let table_files = self.list_table_files(table, None, None, None)?;
let table_files = self.list_table_files(table, request)?;
if let Some(cached) = self.cache.get(&key) {
download = cached.table_files.metadata != table_files.metadata;
} else if let Some(cached) = self.load_cached(&table_path, &table_files)? {
Expand Down Expand Up @@ -332,8 +329,8 @@ impl Client {
Ok(self.cache.get(&key).ok_or(anyhow::anyhow!("Error reading {key} from cache"))?.file_paths.clone())
}

pub fn get_dataframe(&mut self, table: &Table) -> PolarResult<LazyFrame> {
self.get_files(&table)?;
pub fn get_dataframe(&mut self, table: &Table, request: Option<FilesRequest>) -> PolarResult<LazyFrame> {
self.get_files(&table, request)?;
let table_path = Path::new(&self.data_root).join(table.fully_qualified_name());
load_parquet_files_as_dataframe(&table_path)
}
Expand Down
2 changes: 1 addition & 1 deletion src/blocking/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
//! shares[0].name
//! );
//! } else {
//! let res = app.get_dataframe(&tables[0]).unwrap().collect().unwrap();
//! let res = app.get_dataframe(&tables[0], None).unwrap().collect().unwrap();
//! println!("Dataframe:\n {}", res);
//! }
//! }
Expand Down
23 changes: 10 additions & 13 deletions src/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -209,29 +209,26 @@ impl Client {
pub async fn list_table_files(
&self,
table: &Table,
predicate_hints: Option<Vec<String>>,
limit_hint: Option<i32>,
version: Option<i32>,
request: Option<FilesRequest>
) -> Result<TableFiles, anyhow::Error> {
let mut map = Map::new();
if predicate_hints.is_some() {
if let Some(predicate_hints) = request.as_ref().and_then(|r| r.predicate_hints.as_ref()) {
map.insert(
"predicateHints".to_string(),
Value::Array(
predicate_hints
.map(|hints| hints.iter().map(|s| Value::String(s.to_string()))
.collect::<Vec<_>>())
.unwrap_or_default()
.iter().map(|s| Value::String(s.to_string()))
.collect::<Vec<_>>()
),
);
}
if let Some(limit_hint) = limit_hint {
if let Some(limit_hint) = request.as_ref().and_then(|r| r.limit_hint) {
map.insert(
"limitHint".to_string(),
Value::Number(Number::from(limit_hint)),
);
}
if let Some(version) = version {
if let Some(version) = request.as_ref().and_then(|r| r.version) {
map.insert(
"version".to_string(),
Value::Number(Number::from(version)),
Expand Down Expand Up @@ -319,11 +316,11 @@ impl Client {
Ok(None)
}

pub async fn get_files(&mut self, table: &Table) -> Result<Vec<PathBuf>, anyhow::Error> {
pub async fn get_files(&mut self, table: &Table, request: Option<FilesRequest>) -> Result<Vec<PathBuf>, anyhow::Error> {
let key = table.fully_qualified_name();
let mut download = true;
let table_path = Path::new(&self.data_root).join(table.fully_qualified_name());
let table_files = self.list_table_files(table, None, None, None).await?;
let table_files = self.list_table_files(table, request).await?;
if let Some(cached) = self.cache.get(&key) {
download = cached.table_files.metadata != table_files.metadata;
} else if let Some(cached) = self.load_cached(&table_path, &table_files).await? {
Expand Down Expand Up @@ -354,8 +351,8 @@ impl Client {
Ok(self.cache.get(&key).ok_or(anyhow::anyhow!("Error reading {key} from cache"))?.file_paths.clone())
}

pub async fn get_dataframe(&mut self, table: &Table) -> PolarResult<LazyFrame> {
self.get_files(&table).await?;
pub async fn get_dataframe(&mut self, table: &Table, request: Option<FilesRequest>) -> PolarResult<LazyFrame> {
self.get_files(&table, request).await?;
let table_path = Path::new(&self.data_root).join(table.fully_qualified_name());
load_parquet_files_as_dataframe(&table_path)
}
Expand Down
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@
//! );
//! } else {
//! let res = app
//! .get_dataframe(&tables[0])
//! .get_dataframe(&tables[0], None)
//! .await
//! .unwrap()
//! .collect()
Expand Down
6 changes: 6 additions & 0 deletions src/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,9 @@ pub struct TableFiles {
pub metadata: TableMetadata,
pub files: Vec<File>,
}

pub struct FilesRequest {
pub predicate_hints: Option<Vec<String>>,
pub limit_hint: Option<i32>,
pub version: Option<i32>,
}
7 changes: 2 additions & 5 deletions src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,13 +36,10 @@ pub struct FileResponse {
pub file: File,
}

#[derive(Deserialize)]
pub struct FileActionResponse {
pub file: File,
}

#[derive(Deserialize, PartialEq, Serialize)]
pub struct FileCache {
pub table_files: TableFiles,
pub file_paths: Vec<PathBuf>,
}


4 changes: 2 additions & 2 deletions tests/blocking.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,11 +117,11 @@ fn get_dataframe() {
.unwrap()
.to_string();

let df = c.get_dataframe(&table).unwrap().collect().unwrap();
let df = c.get_dataframe(&table, None).unwrap().collect().unwrap();
assert_eq!(df.shape(), (5, 3), "Dataframe shape mismatch");

// Get the data again, this time it should be served from the local cache (enforced by Expections set on Mocks)
let df1 = c.get_dataframe(&table).unwrap().collect().unwrap();
let df1 = c.get_dataframe(&table, None).unwrap().collect().unwrap();
assert_eq!(df1.shape(), (5, 3), "Dataframe shape mismatch");
assert_eq!(
df1.get_row(0).0[1],
Expand Down
8 changes: 4 additions & 4 deletions tests/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ async fn list_all_table_files() {
let app = create_mocked_test_app(body, &url, method("POST")).await;
let files = app
.client
.list_table_files(&table, None, None, None)
.list_table_files(&table, None)
.await
.unwrap();

Expand Down Expand Up @@ -264,7 +264,7 @@ async fn get_files() {

assert!(!Path::exists(&expected_path), "File should not exist");

let files = c.get_files(&table).await.unwrap();
let files = c.get_files(&table, None).await.unwrap();

assert_eq!(files.len(), 1, "File count mismatch");
assert_eq!(files[0], expected_path, "File path mismatch");
Expand Down Expand Up @@ -330,11 +330,11 @@ async fn get_dataframe() {
.unwrap()
.to_string();

let df = c.get_dataframe(&table).await.unwrap().collect().unwrap();
let df = c.get_dataframe(&table, None).await.unwrap().collect().unwrap();
assert_eq!(df.shape(), (5, 3), "Dataframe shape mismatch");

// Get the data again, this time it should be served from the local cache (enforced by Expections set on Mocks)
let df1 = c.get_dataframe(&table).await.unwrap().collect().unwrap();
let df1 = c.get_dataframe(&table, None).await.unwrap().collect().unwrap();
assert_eq!(df1.shape(), (5, 3), "Dataframe shape mismatch");
assert_eq!(
df1.get_row(0).0[1],
Expand Down
3 changes: 3 additions & 0 deletions tests/common/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ use wiremock::matchers::{path, MethodExactMatcher};
use delta_sharing::protocol::*;
use delta_sharing::Client;

#[allow(dead_code)]
pub struct TestApp {
pub client: Client,
pub server: MockServer,
Expand All @@ -18,6 +19,7 @@ pub const TEST_PROTOCOL_RESPONSE: &str = r#"{ "minReaderVersion": 1 }"#;
pub const TEST_METADATA_RESPONSE: &str = r#"{ "id": "cf9c9342-b773-4c7b-a217-037d02ffe5d8", "format": { "provider": "parquet" }, "schemaString": "{\"type\":\"struct\",\"fields\":[{\"name\":\"int_field_1\",\"type\":\"integer\",\"nullable\":true,\"metadata\":{}},{\"name\":\"double_field_1\",\"type\":\"double\",\"nullable\":true,\"metadata\":{}}]}", "partitionColumns": [], "configuration": {"conf_1_name": "conf_1_value"} }"#;
pub const TEST_FILE_RESPONSE: &str = r#"{ "url": "<url>", "id": "1", "partitionValues": {}, "size": 2350, "stats": "{\"numRecords\":1}" }"#;

#[allow(dead_code)]
pub async fn create_test_app() -> TestApp {
let _ = env_logger::try_init();

Expand All @@ -33,6 +35,7 @@ pub async fn create_test_app() -> TestApp {
app
}

#[allow(dead_code)]
pub async fn create_mocked_test_app(
body: &str,
url: &str,
Expand Down

0 comments on commit 12573e7

Please sign in to comment.