Skip to content

Commit

Permalink
Add support for S3 data sources (#290)
Browse files Browse the repository at this point in the history
  • Loading branch information
andygrove authored Sep 29, 2022
1 parent 13cb6c8 commit 1b29c02
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 7 deletions.
3 changes: 3 additions & 0 deletions ballista-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,6 @@ env_logger = "0.9"
mimalloc = { version = "0.1", default-features = false }
rustyline = "10.0"
tokio = { version = "1.0", features = ["macros", "rt", "rt-multi-thread", "sync", "parking_lot"] }

[features]
s3 = ["ballista/s3"]
14 changes: 7 additions & 7 deletions ballista-cli/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,15 +98,15 @@ pub async fn main() -> Result<()> {
env::set_current_dir(&p).unwrap();
};

let mut session_config = SessionConfig::new().with_information_schema(true);

if let Some(batch_size) = args.batch_size {
session_config = session_config.with_batch_size(batch_size);
};

let mut ctx: Context = match (args.host, args.port) {
(Some(ref h), Some(p)) => Context::new_remote(h, p).await?,
_ => Context::new_local(&session_config),
_ => {
let mut session_config = SessionConfig::new().with_information_schema(true);
if let Some(batch_size) = args.batch_size {
session_config = session_config.with_batch_size(batch_size);
};
Context::new_local(&session_config)
}
};

let mut print_options = PrintOptions {
Expand Down
1 change: 1 addition & 0 deletions ballista/rust/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ tokio = "1.0"
[features]
default = []
hdfs = ["ballista-core/hdfs"]
s3 = ["ballista-core/s3"]
standalone = ["ballista-executor", "ballista-scheduler"]
1 change: 1 addition & 0 deletions ballista/rust/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ rustc-args = ["--cfg", "docsrs"]
force_hash_collisions = ["datafusion/force_hash_collisions"]
# Used to enable hdfs to be registered in the ObjectStoreRegistry by default
hdfs = ["datafusion-objectstore-hdfs"]
s3 = ["object_store/aws"]
simd = ["datafusion/simd"]

[dependencies]
Expand Down
14 changes: 14 additions & 0 deletions ballista/rust/core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,8 @@ use datafusion_proto::logical_plan::{
AsLogicalPlan, DefaultLogicalExtensionCodec, LogicalExtensionCodec,
};
use futures::StreamExt;
#[cfg(feature = "s3")]
use object_store::aws::AmazonS3Builder;
use object_store::ObjectStore;
use std::io::{BufWriter, Write};
use std::marker::PhantomData;
Expand Down Expand Up @@ -94,6 +96,18 @@ impl ObjectStoreProvider for FeatureBasedObjectStoreProvider {
}
}

#[cfg(feature = "s3")]
{
if url.to_string().starts_with("s3://") {
if let Some(bucket_name) = url.host_str() {
let store = AmazonS3Builder::from_env()
.with_bucket_name(bucket_name)
.build()?;
return Ok(Arc::new(store));
}
}
}

Err(DataFusionError::Execution(format!(
"No object store available for {}",
url
Expand Down

0 comments on commit 1b29c02

Please sign in to comment.