Skip to content

Commit

Permalink
Introduce ObjectStoreProvider to create an object store based on the …
Browse files Browse the repository at this point in the history
…url (#2906)

* Introduce ObjectStoreSelfDetector for detector an object store based on the url

* Fix UT

* Fix PR review

* Fix PR review

Co-authored-by: yangzhong <[email protected]>
  • Loading branch information
yahoNanJing and kyotoYaho authored Jul 18, 2022
1 parent 6cb695f commit 305e265
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 27 deletions.
12 changes: 11 additions & 1 deletion datafusion/core/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,17 @@ mod tests {
#[tokio::test]
async fn test_schema_register_listing_table() {
let testdata = crate::test_util::parquet_test_data();
let filename = format!("file:///{}/{}", testdata, "alltypes_plain.parquet");
let testdir = if testdata.starts_with('/') {
format!("file://{}", testdata)
} else {
format!("file:///{}", testdata)
};
let filename = if testdir.ends_with('/') {
format!("{}{}", testdir, "alltypes_plain.parquet")
} else {
format!("{}/{}", testdir, "alltypes_plain.parquet")
};

let table_path = ListingTableUrl::parse(filename).unwrap();

let catalog = MemoryCatalogProvider::new();
Expand Down
2 changes: 1 addition & 1 deletion datafusion/core/src/datasource/listing/helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ pub fn split_files(
pub async fn pruned_partition_list<'a>(
store: &'a dyn ObjectStore,
table_path: &'a ListingTableUrl,
filters: &[Expr],
filters: &'a [Expr],
file_extension: &'a str,
table_partition_cols: &'a [String],
) -> Result<BoxStream<'a, Result<PartitionedFile>>> {
Expand Down
73 changes: 60 additions & 13 deletions datafusion/core/src/datasource/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,19 @@ impl std::fmt::Display for ObjectStoreUrl {
}
}

/// Object store provider can detector an object store based on the url
pub trait ObjectStoreProvider: Send + Sync + 'static {
/// Detector a suitable object store based on its url if possible
/// Return the key and object store
fn get_by_url(&self, url: &Url) -> Option<Arc<dyn ObjectStore>>;
}

/// Object store registry
#[derive(Clone)]
pub struct ObjectStoreRegistry {
/// A map from scheme to object store that serve list / read operations for the store
object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
object_stores: Arc<RwLock<HashMap<String, Arc<dyn ObjectStore>>>>,
provider: Option<Arc<dyn ObjectStoreProvider>>,
}

impl std::fmt::Debug for ObjectStoreRegistry {
Expand All @@ -105,13 +114,19 @@ impl Default for ObjectStoreRegistry {
}

impl ObjectStoreRegistry {
/// By default the self detector is None
pub fn new() -> Self {
ObjectStoreRegistry::new_with_provider(None)
}

/// Create the registry that object stores can registered into.
/// ['LocalFileSystem'] store is registered in by default to support read local files natively.
pub fn new() -> Self {
pub fn new_with_provider(provider: Option<Arc<dyn ObjectStoreProvider>>) -> Self {
let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
map.insert("file://".to_string(), Arc::new(LocalFileSystem::new()));
Self {
object_stores: RwLock::new(map),
object_stores: Arc::new(RwLock::new(map)),
provider,
}
}

Expand All @@ -132,19 +147,43 @@ impl ObjectStoreRegistry {
///
/// - URL with scheme `file:///` or no schema will return the default LocalFS store
/// - URL with scheme `s3://bucket/` will return the S3 store if it's registered
/// - URL with scheme `hdfs://hostname:port/` will return the hdfs store if it's registered
///
pub fn get_by_url(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
let url = url.as_ref();
let s = &url[url::Position::BeforeScheme..url::Position::AfterHost];
let stores = self.object_stores.read();
let store = stores.get(s).ok_or_else(|| {
DataFusionError::Internal(format!(
"No suitable object store found for {}",
url
))
})?;

Ok(store.clone())
// First check whether can get object store from registry
let store = {
let stores = self.object_stores.read();
let s = &url[url::Position::BeforeScheme..url::Position::BeforePath];
stores.get(s).cloned()
};

// If not, then try to detector based on its url.
let store = store
.or_else(|| {
if let Some(provider) = &self.provider {
// If detected, register it
if let Some(store) = provider.get_by_url(url) {
let mut stores = self.object_stores.write();
let key =
&url[url::Position::BeforeScheme..url::Position::BeforePath];
stores.insert(key.to_owned(), store.clone());
Some(store)
} else {
None
}
} else {
None
}
})
.ok_or_else(|| {
DataFusionError::Internal(format!(
"No suitable object store found for {}",
url
))
})?;

Ok(store)
}
}

Expand Down Expand Up @@ -190,6 +229,14 @@ mod tests {
assert_eq!(err.to_string(), "Execution error: ObjectStoreUrl must only contain scheme and authority, got: /foo");
}

#[test]
fn test_get_by_url_hdfs() {
let sut = ObjectStoreRegistry::default();
sut.register_store("hdfs", "localhost:8020", Arc::new(LocalFileSystem::new()));
let url = ListingTableUrl::parse("hdfs://localhost:8020/key").unwrap();
sut.get_by_url(&url).unwrap();
}

#[test]
fn test_get_by_url_s3() {
let sut = ObjectStoreRegistry::default();
Expand Down
11 changes: 4 additions & 7 deletions datafusion/core/src/execution/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ use crate::config::{
ConfigOptions, OPT_BATCH_SIZE, OPT_COALESCE_BATCHES, OPT_COALESCE_TARGET_BATCH_SIZE,
OPT_FILTER_NULL_JOIN_KEYS, OPT_OPTIMIZER_SKIP_FAILED_RULES,
};
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::execution::runtime_env::RuntimeEnv;
use crate::logical_plan::plan::Explain;
use crate::physical_plan::file_format::{plan_to_csv, plan_to_json, plan_to_parquet};
use crate::physical_plan::planner::DefaultPhysicalPlanner;
Expand Down Expand Up @@ -180,7 +180,7 @@ impl SessionContext {

/// Creates a new session context using the provided session configuration.
pub fn with_config(config: SessionConfig) -> Self {
let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap());
let runtime = Arc::new(RuntimeEnv::default());
Self::with_config_rt(config, runtime)
}

Expand Down Expand Up @@ -1211,10 +1211,7 @@ impl Debug for SessionState {

/// Default session builder using the provided configuration
pub fn default_session_builder(config: SessionConfig) -> SessionState {
SessionState::with_config_rt(
config,
Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap()),
)
SessionState::with_config_rt(config, Arc::new(RuntimeEnv::default()))
}

impl SessionState {
Expand Down Expand Up @@ -1902,7 +1899,7 @@ mod tests {

#[tokio::test]
async fn custom_query_planner() -> Result<()> {
let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap());
let runtime = Arc::new(RuntimeEnv::default());
let session_state = SessionState::with_config_rt(SessionConfig::new(), runtime)
.with_query_planner(Arc::new(MyQueryPlanner {}));
let ctx = SessionContext::with_state(session_state);
Expand Down
14 changes: 13 additions & 1 deletion datafusion/core/src/execution/runtime_env.rs
Original file line number Diff line number Diff line change
Expand Up @@ -57,12 +57,13 @@ impl RuntimeEnv {
let RuntimeConfig {
memory_manager,
disk_manager,
object_store_registry,
} = config;

Ok(Self {
memory_manager: MemoryManager::new(memory_manager),
disk_manager: DiskManager::try_new(disk_manager)?,
object_store_registry: Arc::new(ObjectStoreRegistry::new()),
object_store_registry: Arc::new(object_store_registry),
})
}

Expand Down Expand Up @@ -121,6 +122,8 @@ pub struct RuntimeConfig {
pub disk_manager: DiskManagerConfig,
/// MemoryManager to limit access to memory
pub memory_manager: MemoryManagerConfig,
/// ObjectStoreRegistry to get object store based on url
pub object_store_registry: ObjectStoreRegistry,
}

impl RuntimeConfig {
Expand All @@ -141,6 +144,15 @@ impl RuntimeConfig {
self
}

/// Customize object store registry
pub fn with_object_store_registry(
mut self,
object_store_registry: ObjectStoreRegistry,
) -> Self {
self.object_store_registry = object_store_registry;
self
}

/// Specify the total memory to use while running the DataFusion
/// plan to `max_memory * memory_fraction` in bytes.
///
Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/src/physical_plan/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1582,7 +1582,7 @@ mod tests {
use crate::assert_contains;
use crate::execution::context::TaskContext;
use crate::execution::options::CsvReadOptions;
use crate::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use crate::execution::runtime_env::RuntimeEnv;
use crate::logical_plan::plan::Extension;
use crate::physical_plan::{
expressions, DisplayFormatType, Partitioning, Statistics,
Expand All @@ -1604,7 +1604,7 @@ mod tests {
use std::{any::Any, fmt};

fn make_session_state() -> SessionState {
let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap());
let runtime = Arc::new(RuntimeEnv::default());
SessionState::with_config_rt(SessionConfig::new(), runtime)
}

Expand Down
4 changes: 2 additions & 2 deletions datafusion/core/tests/user_defined_plan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ use std::{any::Any, collections::BTreeMap, fmt, sync::Arc};

use async_trait::async_trait;
use datafusion::execution::context::TaskContext;
use datafusion::execution::runtime_env::{RuntimeConfig, RuntimeEnv};
use datafusion::execution::runtime_env::RuntimeEnv;
use datafusion::logical_plan::plan::{Extension, Sort};
use datafusion::logical_plan::{DFSchemaRef, Limit};
use datafusion::optimizer::optimizer::OptimizerConfig;
Expand Down Expand Up @@ -247,7 +247,7 @@ async fn topk_plan() -> Result<()> {

fn make_topk_context() -> SessionContext {
let config = SessionConfig::new().with_target_partitions(48);
let runtime = Arc::new(RuntimeEnv::new(RuntimeConfig::default()).unwrap());
let runtime = Arc::new(RuntimeEnv::default());
let state = SessionState::with_config_rt(config, runtime)
.with_query_planner(Arc::new(TopKQueryPlanner {}))
.add_optimizer_rule(Arc::new(TopKOptimizerRule {}));
Expand Down

0 comments on commit 305e265

Please sign in to comment.