Skip to content

Commit

Permalink
Replace RwLock<HashMap> and Mutex<HashMap> by using DashMap (#4079)
Browse files Browse the repository at this point in the history
* Replace RwLock<HashMap> and Mutex<HashMap> by using DashMap

* Fix Cargo.lock in datafusion-cli

Co-authored-by: yangzhong <[email protected]>
  • Loading branch information
yahoNanJing and kyotoYaho authored Nov 7, 2022
1 parent 4d23cae commit a9add0e
Show file tree
Hide file tree
Showing 6 changed files with 67 additions and 62 deletions.
14 changes: 14 additions & 0 deletions datafusion-cli/Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions datafusion/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ async-trait = "0.1.41"
bytes = "1.1"
bzip2 = "0.4.3"
chrono = { version = "0.4.22", default-features = false }
dashmap = "5.4.0"
datafusion-common = { path = "../common", version = "14.0.0", features = ["parquet", "object_store"] }
datafusion-expr = { path = "../expr", version = "14.0.0" }
datafusion-jit = { path = "../jit", version = "14.0.0", optional = true }
Expand Down
29 changes: 11 additions & 18 deletions datafusion/core/src/catalog/catalog.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,9 @@
//! representing collections of named schemas.
use crate::catalog::schema::SchemaProvider;
use dashmap::DashMap;
use datafusion_common::{DataFusionError, Result};
use parking_lot::RwLock;
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;

/// Represent a list of named catalogs
Expand All @@ -49,14 +48,14 @@ pub trait CatalogList: Sync + Send {
/// Simple in-memory list of catalogs
pub struct MemoryCatalogList {
/// Collection of catalogs containing schemas and ultimately TableProviders
pub catalogs: RwLock<HashMap<String, Arc<dyn CatalogProvider>>>,
pub catalogs: DashMap<String, Arc<dyn CatalogProvider>>,
}

impl MemoryCatalogList {
/// Instantiates a new `MemoryCatalogList` with an empty collection of catalogs
pub fn new() -> Self {
Self {
catalogs: RwLock::new(HashMap::new()),
catalogs: DashMap::new(),
}
}
}
Expand All @@ -77,18 +76,15 @@ impl CatalogList for MemoryCatalogList {
name: String,
catalog: Arc<dyn CatalogProvider>,
) -> Option<Arc<dyn CatalogProvider>> {
let mut catalogs = self.catalogs.write();
catalogs.insert(name, catalog)
self.catalogs.insert(name, catalog)
}

fn catalog_names(&self) -> Vec<String> {
let catalogs = self.catalogs.read();
catalogs.keys().map(|s| s.to_string()).collect()
self.catalogs.iter().map(|c| c.key().clone()).collect()
}

fn catalog(&self, name: &str) -> Option<Arc<dyn CatalogProvider>> {
let catalogs = self.catalogs.read();
catalogs.get(name).cloned()
self.catalogs.get(name).map(|c| c.value().clone())
}
}

Expand Down Expand Up @@ -132,14 +128,14 @@ pub trait CatalogProvider: Sync + Send {

/// Simple in-memory implementation of a catalog.
pub struct MemoryCatalogProvider {
schemas: RwLock<HashMap<String, Arc<dyn SchemaProvider>>>,
schemas: DashMap<String, Arc<dyn SchemaProvider>>,
}

impl MemoryCatalogProvider {
/// Instantiates a new MemoryCatalogProvider with an empty collection of schemas.
pub fn new() -> Self {
Self {
schemas: RwLock::new(HashMap::new()),
schemas: DashMap::new(),
}
}
}
Expand All @@ -150,22 +146,19 @@ impl CatalogProvider for MemoryCatalogProvider {
}

fn schema_names(&self) -> Vec<String> {
let schemas = self.schemas.read();
schemas.keys().cloned().collect()
self.schemas.iter().map(|s| s.key().clone()).collect()
}

fn schema(&self, name: &str) -> Option<Arc<dyn SchemaProvider>> {
let schemas = self.schemas.read();
schemas.get(name).cloned()
self.schemas.get(name).map(|s| s.value().clone())
}

fn register_schema(
&self,
name: &str,
schema: Arc<dyn SchemaProvider>,
) -> Result<Option<Arc<dyn SchemaProvider>>> {
let mut schemas = self.schemas.write();
Ok(schemas.insert(name.into(), schema))
Ok(self.schemas.insert(name.into(), schema))
}
}

Expand Down
25 changes: 11 additions & 14 deletions datafusion/core/src/catalog/schema.rs
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,8 @@
//! Describes the interface and built-in implementations of schemas,
//! representing collections of named tables.
use parking_lot::RwLock;
use dashmap::DashMap;
use std::any::Any;
use std::collections::HashMap;
use std::sync::Arc;

use crate::datasource::TableProvider;
Expand Down Expand Up @@ -68,14 +67,14 @@ pub trait SchemaProvider: Sync + Send {

/// Simple in-memory implementation of a schema.
pub struct MemorySchemaProvider {
tables: RwLock<HashMap<String, Arc<dyn TableProvider>>>,
tables: DashMap<String, Arc<dyn TableProvider>>,
}

impl MemorySchemaProvider {
/// Instantiates a new MemorySchemaProvider with an empty collection of tables.
pub fn new() -> Self {
Self {
tables: RwLock::new(HashMap::new()),
tables: DashMap::new(),
}
}
}
Expand All @@ -92,13 +91,14 @@ impl SchemaProvider for MemorySchemaProvider {
}

fn table_names(&self) -> Vec<String> {
let tables = self.tables.read();
tables.keys().cloned().collect()
self.tables
.iter()
.map(|table| table.key().clone())
.collect()
}

fn table(&self, name: &str) -> Option<Arc<dyn TableProvider>> {
let tables = self.tables.read();
tables.get(name).cloned()
self.tables.get(name).map(|table| table.value().clone())
}

fn register_table(
Expand All @@ -112,18 +112,15 @@ impl SchemaProvider for MemorySchemaProvider {
name
)));
}
let mut tables = self.tables.write();
Ok(tables.insert(name, table))
Ok(self.tables.insert(name, table))
}

fn deregister_table(&self, name: &str) -> Result<Option<Arc<dyn TableProvider>>> {
let mut tables = self.tables.write();
Ok(tables.remove(name))
Ok(self.tables.remove(name).map(|(_, table)| table))
}

fn table_exist(&self, name: &str) -> bool {
let tables = self.tables.read();
tables.contains_key(name)
self.tables.contains_key(name)
}
}

Expand Down
30 changes: 16 additions & 14 deletions datafusion/core/src/datasource/listing/table.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,16 +17,15 @@

//! The table implementation.
use hashbrown::HashMap;
use std::str::FromStr;
use std::{any::Any, sync::Arc};

use arrow::datatypes::{Field, Schema, SchemaRef};
use async_trait::async_trait;
use dashmap::DashMap;
use futures::{future, stream, StreamExt, TryStreamExt};
use object_store::path::Path;
use object_store::ObjectMeta;
use parking_lot::RwLock;

use crate::datasource::file_format::file_type::{FileCompressionType, FileType};
use crate::datasource::{
Expand Down Expand Up @@ -266,28 +265,31 @@ impl ListingOptions {
/// Cache is invalided when file size or last modification has changed
#[derive(Default)]
struct StatisticsCache {
statistics: RwLock<HashMap<Path, (ObjectMeta, Statistics)>>,
statistics: DashMap<Path, (ObjectMeta, Statistics)>,
}

impl StatisticsCache {
/// Get `Statistics` for file location. Returns None if file has changed or not found.
fn get(&self, meta: &ObjectMeta) -> Option<Statistics> {
let map = self.statistics.read();
let (saved_meta, statistics) = map.get(&meta.location)?;

if saved_meta.size != meta.size || saved_meta.last_modified != meta.last_modified
{
// file has changed
return None;
}

Some(statistics.clone())
self.statistics
.get(&meta.location)
.map(|s| {
let (saved_meta, statistics) = s.value();
if saved_meta.size != meta.size
|| saved_meta.last_modified != meta.last_modified
{
// file has changed
None
} else {
Some(statistics.clone())
}
})
.unwrap_or(None)
}

/// Save collected file statistics
fn save(&self, meta: ObjectMeta, statistics: Statistics) {
self.statistics
.write()
.insert(meta.location.clone(), (meta, statistics));
}
}
Expand Down
30 changes: 14 additions & 16 deletions datafusion/core/src/datasource/object_store.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,10 @@
//! This allows the user to extend DataFusion with different storage systems such as S3 or HDFS
//! and query data inside these systems.
use dashmap::DashMap;
use datafusion_common::{DataFusionError, Result};
use object_store::local::LocalFileSystem;
use object_store::ObjectStore;
use parking_lot::RwLock;
use std::collections::HashMap;
use std::sync::Arc;
use url::Url;

Expand Down Expand Up @@ -125,7 +124,7 @@ pub trait ObjectStoreProvider: Send + Sync + 'static {
/// [`ListingTableUrl`]: crate::datasource::listing::ListingTableUrl
pub struct ObjectStoreRegistry {
/// A map from scheme to object store that serve list / read operations for the store
object_stores: RwLock<HashMap<String, Arc<dyn ObjectStore>>>,
object_stores: DashMap<String, Arc<dyn ObjectStore>>,
provider: Option<Arc<dyn ObjectStoreProvider>>,
}

Expand All @@ -134,7 +133,11 @@ impl std::fmt::Debug for ObjectStoreRegistry {
f.debug_struct("ObjectStoreRegistry")
.field(
"schemes",
&self.object_stores.read().keys().collect::<Vec<_>>(),
&self
.object_stores
.iter()
.map(|o| o.key().clone())
.collect::<Vec<_>>(),
)
.finish()
}
Expand All @@ -161,10 +164,10 @@ impl ObjectStoreRegistry {
/// may be explicity registered with calls to [`ObjectStoreRegistry::register_store`] or
/// created lazily, on-demand by the provided [`ObjectStoreProvider`]
pub fn new_with_provider(provider: Option<Arc<dyn ObjectStoreProvider>>) -> Self {
let mut map: HashMap<String, Arc<dyn ObjectStore>> = HashMap::new();
map.insert("file://".to_string(), Arc::new(LocalFileSystem::new()));
let object_stores: DashMap<String, Arc<dyn ObjectStore>> = DashMap::new();
object_stores.insert("file://".to_string(), Arc::new(LocalFileSystem::new()));
Self {
object_stores: RwLock::new(map),
object_stores,
provider,
}
}
Expand All @@ -178,9 +181,8 @@ impl ObjectStoreRegistry {
host: impl AsRef<str>,
store: Arc<dyn ObjectStore>,
) -> Option<Arc<dyn ObjectStore>> {
let mut stores = self.object_stores.write();
let s = format!("{}://{}", scheme.as_ref(), host.as_ref());
stores.insert(s, store)
self.object_stores.insert(s, store)
}

/// Get a suitable store for the provided URL. For example:
Expand All @@ -192,21 +194,17 @@ impl ObjectStoreRegistry {
pub fn get_by_url(&self, url: impl AsRef<Url>) -> Result<Arc<dyn ObjectStore>> {
let url = url.as_ref();
// First check whether can get object store from registry
let store = {
let stores = self.object_stores.read();
let s = &url[url::Position::BeforeScheme..url::Position::BeforePath];
stores.get(s).cloned()
};
let s = &url[url::Position::BeforeScheme..url::Position::BeforePath];
let store = self.object_stores.get(s).map(|o| o.value().clone());

match store {
Some(store) => Ok(store),
None => match &self.provider {
Some(provider) => {
let store = provider.get_by_url(url)?;
let mut stores = self.object_stores.write();
let key =
&url[url::Position::BeforeScheme..url::Position::BeforePath];
stores.insert(key.to_owned(), store.clone());
self.object_stores.insert(key.to_owned(), store.clone());
Ok(store)
}
None => Err(DataFusionError::Internal(format!(
Expand Down

0 comments on commit a9add0e

Please sign in to comment.