From df560a6f1776e24f663785ec1df493c461633fb9 Mon Sep 17 00:00:00 2001 From: Robert Pack Date: Wed, 3 Jan 2024 10:42:39 +0100 Subject: [PATCH] fix: windows tests --- Cargo.toml | 2 +- crates/benchmarks/src/bin/merge.rs | 2 +- crates/deltalake-core/src/lib.rs | 22 ++-- crates/deltalake-core/src/logstore/mod.rs | 2 +- .../src/operations/merge/mod.rs | 2 +- crates/deltalake-core/src/storage/mod.rs | 2 +- crates/deltalake-core/src/table/builder.rs | 114 ++++++++++++------ 7 files changed, 92 insertions(+), 54 deletions(-) diff --git a/Cargo.toml b/Cargo.toml index 2e58e375e2..e8f33a7443 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -40,7 +40,7 @@ datafusion-physical-expr = { version = "33.0.0" } # serde -serde = { version = "1", features = ["derive"] } +serde = { version = "1.0.194", features = ["derive"] } serde_json = "1" # "stdlib" diff --git a/crates/benchmarks/src/bin/merge.rs b/crates/benchmarks/src/bin/merge.rs index affae8b7dd..e6abebc5ca 100644 --- a/crates/benchmarks/src/bin/merge.rs +++ b/crates/benchmarks/src/bin/merge.rs @@ -214,7 +214,7 @@ async fn benchmark_merge_tpcds( .filter(col("r").lt_eq(lit(parameters.sample_files)))?; let file_sample = files.collect_partitioned().await?; - let schema = file_sample.get(0).unwrap().get(0).unwrap().schema(); + let schema = file_sample.first().unwrap().first().unwrap().schema(); let mem_table = Arc::new(MemTable::try_new(schema, file_sample)?); ctx.register_table("file_sample", mem_table)?; let file_sample_count = ctx.table("file_sample").await?.count().await?; diff --git a/crates/deltalake-core/src/lib.rs b/crates/deltalake-core/src/lib.rs index ba6f17d032..2bd96a5d21 100644 --- a/crates/deltalake-core/src/lib.rs +++ b/crates/deltalake-core/src/lib.rs @@ -369,18 +369,18 @@ mod tests { ]; assert_eq!( - table.get_files_by_partitions(&filters).unwrap(), - vec![ - Path::from("year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet"), - Path::from("year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet") - ] - ); + table.get_files_by_partitions(&filters).unwrap(), + vec![ + Path::from("year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet"), + Path::from("year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet") + ] + ); assert_eq!( - table.get_file_uris_by_partitions(&filters).unwrap(), - vec![ - std::fs::canonicalize("../deltalake-test/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet").unwrap().as_path().to_string_lossy(), - std::fs::canonicalize("../deltalake-test/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet").unwrap().as_path().to_string_lossy(), - ] + table.get_file_uris_by_partitions(&filters).unwrap().into_iter().map(|p| std::fs::canonicalize(p).unwrap()).collect::>(), + vec![ + std::fs::canonicalize("../deltalake-test/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=3/part-00000-94d16827-f2fd-42cd-a060-f67ccc63ced9.c000.snappy.parquet").unwrap(), + std::fs::canonicalize("../deltalake-test/tests/data/delta-0.8.0-partitioned/year=2020/month=2/day=5/part-00000-89cdd4c8-2af7-4add-8ea3-3990b2f027b5.c000.snappy.parquet").unwrap(), + ] ); let filters = vec![crate::PartitionFilter { diff --git a/crates/deltalake-core/src/logstore/mod.rs b/crates/deltalake-core/src/logstore/mod.rs index 8bb6b3cd75..c67ef3ac0e 100644 --- a/crates/deltalake-core/src/logstore/mod.rs +++ b/crates/deltalake-core/src/logstore/mod.rs @@ -99,7 +99,7 @@ lazy_static! { /// # use deltalake_core::logstore::*; /// # use std::collections::HashMap; /// # use url::Url; -/// let location = Url::parse("file:///tmp").expect("Failed to make location"); +/// let location = Url::parse("memory:///").expect("Failed to make location"); /// let logstore = logstore_for(location, HashMap::new()).expect("Failed to get a logstore"); /// ``` pub fn logstore_for( diff --git a/crates/deltalake-core/src/operations/merge/mod.rs b/crates/deltalake-core/src/operations/merge/mod.rs index 7cb752dc21..c0fab537cf 100644 --- a/crates/deltalake-core/src/operations/merge/mod.rs +++ b/crates/deltalake-core/src/operations/merge/mod.rs @@ -663,7 +663,7 @@ impl ExtensionPlanner for MergeMetricExtensionPlanner { let schema = barrier.input.schema(); let exec_schema: ArrowSchema = schema.as_ref().to_owned().into(); return Ok(Some(Arc::new(MergeBarrierExec::new( - physical_inputs.get(0).unwrap().clone(), + physical_inputs.first().unwrap().clone(), barrier.file_column.clone(), planner.create_physical_expr(&barrier.expr, schema, &exec_schema, session_state)?, )))); diff --git a/crates/deltalake-core/src/storage/mod.rs b/crates/deltalake-core/src/storage/mod.rs index 2398276011..d7b06284f3 100644 --- a/crates/deltalake-core/src/storage/mod.rs +++ b/crates/deltalake-core/src/storage/mod.rs @@ -42,7 +42,7 @@ pub trait ObjectStoreFactory: Send + Sync { } #[derive(Clone, Debug, Default)] -struct DefaultObjectStoreFactory {} +pub(crate) struct DefaultObjectStoreFactory {} impl ObjectStoreFactory for DefaultObjectStoreFactory { fn parse_url_opts( diff --git a/crates/deltalake-core/src/table/builder.rs b/crates/deltalake-core/src/table/builder.rs index 19221d630a..87c321f9c5 100644 --- a/crates/deltalake-core/src/table/builder.rs +++ b/crates/deltalake-core/src/table/builder.rs @@ -13,7 +13,7 @@ use url::Url; use super::DeltaTable; use crate::errors::{DeltaResult, DeltaTableError}; use crate::logstore::LogStoreRef; -use crate::storage::StorageOptions; +use crate::storage::{factories, StorageOptions}; #[allow(dead_code)] #[derive(Debug, thiserror::Error)] @@ -163,7 +163,7 @@ impl DeltaTableBuilder { /// /// ```rust /// # use deltalake_core::table::builder::*; - /// let builder = DeltaTableBuilder::from_valid_uri("/tmp"); + /// let builder = DeltaTableBuilder::from_valid_uri("memory:///"); /// assert!(builder.is_ok(), "Builder failed with {builder:?}"); /// ``` pub fn from_valid_uri(table_uri: impl AsRef) -> DeltaResult { @@ -323,17 +323,45 @@ impl DeltaTableBuilder { } } -fn create_filetree_from_path(path: &PathBuf) -> DeltaResult<()> { - if !path.exists() { - std::fs::create_dir_all(path).map_err(|err| { - let msg = format!( - "Could not create local directory: {:?}\nError: {:?}", - path, err - ); - DeltaTableError::InvalidTableLocation(msg) - })?; +enum UriType { + LocalPath(PathBuf), + Url(Url), +} + +/// Utility function to figure out whether string representation of the path +/// is either local path or some kind or URL. +/// +/// Will return an error if the path is not valid. +fn resolve_uri_type(table_uri: impl AsRef) -> DeltaResult { + let table_uri = table_uri.as_ref(); + let known_schemes: Vec<_> = factories() + .iter() + .map(|v| v.key().scheme().to_owned()) + .collect(); + + if let Ok(url) = Url::parse(table_uri) { + let scheme = url.scheme().to_string(); + if url.scheme() == "file" { + Ok(UriType::LocalPath(url.to_file_path().map_err(|err| { + let msg = format!("Invalid table location: {}\nError: {:?}", table_uri, err); + DeltaTableError::InvalidTableLocation(msg) + })?)) + // NOTE this check is required to support absolute windows paths which may properly parse as url + } else if known_schemes.contains(&scheme) { + Ok(UriType::Url(url)) + // NOTE this check is required to support absolute windows paths which may properly parse as url + // we assume here that a single character scheme is a windows drive letter + } else if scheme.len() == 1 { + Ok(UriType::LocalPath(PathBuf::from(table_uri))) + } else { + Err(DeltaTableError::InvalidTableLocation(format!( + "Unknown scheme: {}", + scheme + ))) + } + } else { + Ok(UriType::LocalPath(PathBuf::from(table_uri))) } - Ok(()) } /// Attempt to create a Url from given table location. @@ -350,35 +378,35 @@ fn create_filetree_from_path(path: &PathBuf) -> DeltaResult<()> { pub fn ensure_table_uri(table_uri: impl AsRef) -> DeltaResult { let table_uri = table_uri.as_ref(); - debug!("ensure_table_uri {table_uri}"); - let mut url = match Url::parse(table_uri) { - Ok(url) => { - if url.scheme() == "file" { - create_filetree_from_path( - &url.to_file_path() - .expect("Failed to convert a file:// URL to a file path"), - )?; + let uri_type: UriType = resolve_uri_type(table_uri)?; + + // If it is a local path, we need to create it if it does not exist. + let mut url = match uri_type { + UriType::LocalPath(path) => { + if !path.exists() { + std::fs::create_dir_all(&path).map_err(|err| { + let msg = format!( + "Could not create local directory: {}\nError: {:?}", + table_uri, err + ); + DeltaTableError::InvalidTableLocation(msg) + })?; } - Ok(url) - } - Err(_) => { - let path = PathBuf::from(table_uri); - create_filetree_from_path(&path)?; - let path = std::fs::canonicalize(path.clone()).map_err(|err| { - let msg = format!("Invalid table location: {:?}\nError: {:?}", path, err); + let path = std::fs::canonicalize(path).map_err(|err| { + let msg = format!("Invalid table location: {}\nError: {:?}", table_uri, err); DeltaTableError::InvalidTableLocation(msg) })?; - - Url::from_directory_path(path.clone()).map_err(|_| { + Url::from_directory_path(path).map_err(|_| { let msg = format!( - "Could not construct a URL from canonicalized path: {:?}.\n\ + "Could not construct a URL from canonicalized path: {}.\n\ Something must be very wrong with the table path.", - path, + table_uri ); DeltaTableError::InvalidTableLocation(msg) - }) + })? } - }?; + UriType::Url(url) => url, + }; let trimmed_path = url.path().trim_end_matches('/').to_owned(); url.set_path(&trimmed_path); @@ -400,21 +428,31 @@ fn ensure_file_location_exists(path: PathBuf) -> DeltaResult<()> { #[cfg(test)] mod tests { - use super::*; use itertools::Itertools; use object_store::path::Path; + use super::*; + use crate::storage::DefaultObjectStoreFactory; + #[test] fn test_ensure_table_uri() { + factories().insert( + Url::parse("s3://").unwrap(), + Arc::new(DefaultObjectStoreFactory::default()), + ); + // parse an existing relative directory let uri = ensure_table_uri("."); assert!(uri.is_ok()); - let _uri = ensure_table_uri("./nonexistent"); - assert!(uri.is_ok()); - let uri = ensure_table_uri("file:///tmp/nonexistent/some/path"); + let uri = ensure_table_uri("./nonexistent"); assert!(uri.is_ok()); let uri = ensure_table_uri("s3://container/path"); assert!(uri.is_ok()); + #[cfg(not(windows))] + { + let uri = ensure_table_uri("file:///tmp/nonexistent/some/path"); + assert!(uri.is_ok()); + } // These cases should all roundtrip to themselves cfg_if::cfg_if! { @@ -497,7 +535,7 @@ mod tests { #[test] fn test_ensure_table_uri_url() { // Urls should round trips as-is - let expected = Url::parse("s3://deltalake-test/tests/data/delta-0.8.0").unwrap(); + let expected = Url::parse("memory:///deltalake-test/tests/data/delta-0.8.0").unwrap(); let url = ensure_table_uri(&expected).unwrap(); assert_eq!(expected, url);