Skip to content

Commit

Permalink
fix: windows tests
Browse files Browse the repository at this point in the history
  • Loading branch information
roeap committed Jan 3, 2024
1 parent 4647d65 commit df560a6
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 54 deletions.
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ datafusion-physical-expr = { version = "33.0.0" }


# serde
serde = { version = "1", features = ["derive"] }
serde = { version = "1.0.194", features = ["derive"] }
serde_json = "1"

# "stdlib"
Expand Down
2 changes: 1 addition & 1 deletion crates/benchmarks/src/bin/merge.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,7 +214,7 @@ async fn benchmark_merge_tpcds(
.filter(col("r").lt_eq(lit(parameters.sample_files)))?;

let file_sample = files.collect_partitioned().await?;
let schema = file_sample.get(0).unwrap().get(0).unwrap().schema();
let schema = file_sample.first().unwrap().first().unwrap().schema();
let mem_table = Arc::new(MemTable::try_new(schema, file_sample)?);
ctx.register_table("file_sample", mem_table)?;
let file_sample_count = ctx.table("file_sample").await?.count().await?;
Expand Down
22 changes: 11 additions & 11 deletions crates/deltalake-core/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -369,18 +369,18 @@ mod tests {
];

assert_eq!(
table.get_files_by_partitions(&filters).unwrap(),
vec![
Path::from("year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet"),
Path::from("year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet")
]
);
table.get_files_by_partitions(&filters).unwrap(),
vec![
Path::from("year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet"),
Path::from("year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet")
]
);
assert_eq!(
table.get_file_uris_by_partitions(&filters).unwrap(),
vec![
std::fs::canonicalize("../deltalake-test/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet").unwrap().as_path().to_string_lossy(),
std::fs::canonicalize("../deltalake-test/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet").unwrap().as_path().to_string_lossy(),
]
table.get_file_uris_by_partitions(&filters).unwrap().into_iter().map(|p| std::fs::canonicalize(p).unwrap()).collect::<Vec<_>>(),
vec![
std::fs::canonicalize("../deltalake-test/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet").unwrap(),
std::fs::canonicalize("../deltalake-test/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet").unwrap(),
]
);

let filters = vec![crate::PartitionFilter {
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/logstore/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ lazy_static! {
/// # use deltalake_core::logstore::*;
/// # use std::collections::HashMap;
/// # use url::Url;
/// let location = Url::parse("file:///tmp").expect("Failed to make location");
/// let location = Url::parse("memory:///").expect("Failed to make location");
/// let logstore = logstore_for(location, HashMap::new()).expect("Failed to get a logstore");
/// ```
pub fn logstore_for(
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/operations/merge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -663,7 +663,7 @@ impl ExtensionPlanner for MergeMetricExtensionPlanner {
let schema = barrier.input.schema();
let exec_schema: ArrowSchema = schema.as_ref().to_owned().into();
return Ok(Some(Arc::new(MergeBarrierExec::new(
physical_inputs.get(0).unwrap().clone(),
physical_inputs.first().unwrap().clone(),
barrier.file_column.clone(),
planner.create_physical_expr(&barrier.expr, schema, &exec_schema, session_state)?,
))));
Expand Down
2 changes: 1 addition & 1 deletion crates/deltalake-core/src/storage/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@ pub trait ObjectStoreFactory: Send + Sync {
}

#[derive(Clone, Debug, Default)]
struct DefaultObjectStoreFactory {}
pub(crate) struct DefaultObjectStoreFactory {}

impl ObjectStoreFactory for DefaultObjectStoreFactory {
fn parse_url_opts(
Expand Down
114 changes: 76 additions & 38 deletions crates/deltalake-core/src/table/builder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ use url::Url;
use super::DeltaTable;
use crate::errors::{DeltaResult, DeltaTableError};
use crate::logstore::LogStoreRef;
use crate::storage::StorageOptions;
use crate::storage::{factories, StorageOptions};

#[allow(dead_code)]
#[derive(Debug, thiserror::Error)]
Expand Down Expand Up @@ -163,7 +163,7 @@ impl DeltaTableBuilder {
///
/// ```rust
/// # use deltalake_core::table::builder::*;
/// let builder = DeltaTableBuilder::from_valid_uri("/tmp");
/// let builder = DeltaTableBuilder::from_valid_uri("memory:///");
/// assert!(builder.is_ok(), "Builder failed with {builder:?}");
/// ```
pub fn from_valid_uri(table_uri: impl AsRef<str>) -> DeltaResult<Self> {
Expand Down Expand Up @@ -323,17 +323,45 @@ impl DeltaTableBuilder {
}
}

fn create_filetree_from_path(path: &PathBuf) -> DeltaResult<()> {
if !path.exists() {
std::fs::create_dir_all(path).map_err(|err| {
let msg = format!(
"Could not create local directory: {:?}\nError: {:?}",
path, err
);
DeltaTableError::InvalidTableLocation(msg)
})?;
enum UriType {
LocalPath(PathBuf),
Url(Url),
}

/// Utility function to figure out whether string representation of the path
/// is either local path or some kind or URL.
///
/// Will return an error if the path is not valid.
fn resolve_uri_type(table_uri: impl AsRef<str>) -> DeltaResult<UriType> {
let table_uri = table_uri.as_ref();
let known_schemes: Vec<_> = factories()
.iter()
.map(|v| v.key().scheme().to_owned())
.collect();

if let Ok(url) = Url::parse(table_uri) {
let scheme = url.scheme().to_string();
if url.scheme() == "file" {
Ok(UriType::LocalPath(url.to_file_path().map_err(|err| {
let msg = format!("Invalid table location: {}\nError: {:?}", table_uri, err);
DeltaTableError::InvalidTableLocation(msg)
})?))
// NOTE this check is required to support absolute windows paths which may properly parse as url
} else if known_schemes.contains(&scheme) {
Ok(UriType::Url(url))
// NOTE this check is required to support absolute windows paths which may properly parse as url
// we assume here that a single character scheme is a windows drive letter
} else if scheme.len() == 1 {
Ok(UriType::LocalPath(PathBuf::from(table_uri)))
} else {
Err(DeltaTableError::InvalidTableLocation(format!(
"Unknown scheme: {}",
scheme
)))
}
} else {
Ok(UriType::LocalPath(PathBuf::from(table_uri)))
}
Ok(())
}

/// Attempt to create a Url from given table location.
Expand All @@ -350,35 +378,35 @@ fn create_filetree_from_path(path: &PathBuf) -> DeltaResult<()> {
pub fn ensure_table_uri(table_uri: impl AsRef<str>) -> DeltaResult<Url> {
let table_uri = table_uri.as_ref();

debug!("ensure_table_uri {table_uri}");
let mut url = match Url::parse(table_uri) {
Ok(url) => {
if url.scheme() == "file" {
create_filetree_from_path(
&url.to_file_path()
.expect("Failed to convert a file:// URL to a file path"),
)?;
let uri_type: UriType = resolve_uri_type(table_uri)?;

// If it is a local path, we need to create it if it does not exist.
let mut url = match uri_type {
UriType::LocalPath(path) => {
if !path.exists() {
std::fs::create_dir_all(&path).map_err(|err| {
let msg = format!(
"Could not create local directory: {}\nError: {:?}",
table_uri, err
);
DeltaTableError::InvalidTableLocation(msg)
})?;
}
Ok(url)
}
Err(_) => {
let path = PathBuf::from(table_uri);
create_filetree_from_path(&path)?;
let path = std::fs::canonicalize(path.clone()).map_err(|err| {
let msg = format!("Invalid table location: {:?}\nError: {:?}", path, err);
let path = std::fs::canonicalize(path).map_err(|err| {
let msg = format!("Invalid table location: {}\nError: {:?}", table_uri, err);
DeltaTableError::InvalidTableLocation(msg)
})?;

Url::from_directory_path(path.clone()).map_err(|_| {
Url::from_directory_path(path).map_err(|_| {
let msg = format!(
"Could not construct a URL from canonicalized path: {:?}.\n\
"Could not construct a URL from canonicalized path: {}.\n\
Something must be very wrong with the table path.",
path,
table_uri
);
DeltaTableError::InvalidTableLocation(msg)
})
})?
}
}?;
UriType::Url(url) => url,
};

let trimmed_path = url.path().trim_end_matches('/').to_owned();
url.set_path(&trimmed_path);
Expand All @@ -400,21 +428,31 @@ fn ensure_file_location_exists(path: PathBuf) -> DeltaResult<()> {

#[cfg(test)]
mod tests {
use super::*;
use itertools::Itertools;
use object_store::path::Path;

use super::*;
use crate::storage::DefaultObjectStoreFactory;

#[test]
fn test_ensure_table_uri() {
factories().insert(
Url::parse("s3://").unwrap(),
Arc::new(DefaultObjectStoreFactory::default()),
);

// parse an existing relative directory
let uri = ensure_table_uri(".");
assert!(uri.is_ok());
let _uri = ensure_table_uri("./nonexistent");
assert!(uri.is_ok());
let uri = ensure_table_uri("file:///tmp/nonexistent/some/path");
let uri = ensure_table_uri("./nonexistent");
assert!(uri.is_ok());
let uri = ensure_table_uri("s3://container/path");
assert!(uri.is_ok());
#[cfg(not(windows))]
{
let uri = ensure_table_uri("file:///tmp/nonexistent/some/path");
assert!(uri.is_ok());
}

// These cases should all roundtrip to themselves
cfg_if::cfg_if! {
Expand Down Expand Up @@ -497,7 +535,7 @@ mod tests {
#[test]
fn test_ensure_table_uri_url() {
// Urls should round trips as-is
let expected = Url::parse("s3://deltalake-test/tests/data/delta-0.8.0").unwrap();
let expected = Url::parse("memory:///deltalake-test/tests/data/delta-0.8.0").unwrap();
let url = ensure_table_uri(&expected).unwrap();
assert_eq!(expected, url);

Expand Down

0 comments on commit df560a6

Please sign in to comment.