Skip to content

Commit

Permalink
Refactor ExecutionContext and related conf to support multi-tenancy c…
Browse files Browse the repository at this point in the history
…onfigurations

Do not use static singleton RuntimeEnv in both DataFusion and Ballista

Add SessionBuilder fn type to SchedulerServer to allow customized SessionContext creation

fix client BallistaContext
  • Loading branch information
Wang committed Mar 10, 2022
1 parent 0e440ea commit ae70d72
Show file tree
Hide file tree
Showing 146 changed files with 3,932 additions and 2,663 deletions.
2 changes: 1 addition & 1 deletion ballista-examples/src/bin/ballista-dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn main() -> Result<()> {
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "4")
.build()?;
let ctx = BallistaContext::remote("localhost", 50050, &config);
let ctx = BallistaContext::remote("localhost", 50050, &config).await?;

let testdata = datafusion::arrow::util::test_util::parquet_test_data();

Expand Down
2 changes: 1 addition & 1 deletion ballista-examples/src/bin/ballista-sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn main() -> Result<()> {
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "4")
.build()?;
let ctx = BallistaContext::remote("localhost", 50050, &config);
let ctx = BallistaContext::remote("localhost", 50050, &config).await?;

let testdata = datafusion::arrow::util::test_util::arrow_test_data();

Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ async fn main() -> Result<()> {
.build()?;
// connect to Ballista scheduler
let ctx = BallistaContext::remote("localhost", 50050, &config);
let ctx = BallistaContext::remote("localhost", 50050, &config).await?;
// register csv file with the execution context
ctx.register_csv(
Expand Down
221 changes: 126 additions & 95 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Distributed execution context.
use log::info;
use parking_lot::Mutex;
use sqlparser::ast::Statement;
use std::collections::HashMap;
Expand All @@ -25,7 +26,8 @@ use std::path::PathBuf;
use std::sync::Arc;

use ballista_core::config::BallistaConfig;
use ballista_core::serde::protobuf::LogicalPlanNode;
use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient;
use ballista_core::serde::protobuf::{ExecuteQueryParams, KeyValuePair, LogicalPlanNode};
use ballista_core::utils::create_df_ctx_with_ballista_query_planner;

use datafusion::catalog::TableReference;
Expand All @@ -35,7 +37,7 @@ use datafusion::error::{DataFusionError, Result};
use datafusion::execution::dataframe_impl::DataFrameImpl;
use datafusion::logical_plan::{CreateExternalTable, LogicalPlan, TableScan};
use datafusion::prelude::{
AvroReadOptions, CsvReadOptions, ExecutionConfig, ExecutionContext,
AvroReadOptions, CsvReadOptions, SessionConfig, SessionContext,
};
use datafusion::sql::parser::{DFParser, FileType, Statement as DFStatement};

Expand Down Expand Up @@ -64,26 +66,81 @@ impl BallistaContextState {
}
}

pub fn config(&self) -> &BallistaConfig {
&self.config
}
}

pub struct BallistaContext {
state: Arc<Mutex<BallistaContextState>>,
context: Arc<SessionContext>,
}

impl BallistaContext {
/// Create a context for executing queries against a remote Ballista scheduler instance
pub async fn remote(host: &str, port: u16, config: &BallistaConfig) -> Result<Self> {
let state = BallistaContextState::new(host.to_owned(), port, config);
let scheduler_url =
format!("http://{}:{}", &state.scheduler_host, state.scheduler_port);
info!(
"Connecting to Ballista scheduler at {}",
scheduler_url.clone()
);
let mut scheduler = SchedulerGrpcClient::connect(scheduler_url.clone())
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;

let remote_session_id = scheduler
.execute_query(ExecuteQueryParams {
query: None,
settings: config
.settings()
.iter()
.map(|(k, v)| KeyValuePair {
key: k.to_owned(),
value: v.to_owned(),
})
.collect::<Vec<_>>(),
optional_session_id: None,
})
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?
.into_inner()
.session_id;

info!(
"Server side SessionContext created with Session id: {}",
remote_session_id
);

let ctx = {
create_df_ctx_with_ballista_query_planner::<LogicalPlanNode>(
scheduler_url,
remote_session_id,
state.config(),
)
};

Ok(Self {
state: Arc::new(Mutex::new(state)),
context: Arc::new(ctx),
})
}

#[cfg(feature = "standalone")]
pub async fn new_standalone(
pub async fn standalone(
config: &BallistaConfig,
concurrent_tasks: usize,
) -> ballista_core::error::Result<Self> {
use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient;
use ballista_core::serde::protobuf::PhysicalPlanNode;
use ballista_core::serde::BallistaCodec;

log::info!("Running in local mode. Scheduler will be run in-proc");

let addr = ballista_scheduler::standalone::new_standalone_scheduler().await?;

let scheduler = loop {
match SchedulerGrpcClient::connect(format!(
"http://localhost:{}",
addr.port()
))
.await
{
let scheduler_url = format!("http://localhost:{}", addr.port());
let mut scheduler = loop {
match SchedulerGrpcClient::connect(scheduler_url.clone()).await {
Err(_) => {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
log::info!("Attempting to connect to in-proc scheduler...");
Expand All @@ -92,6 +149,37 @@ impl BallistaContextState {
}
};

let remote_session_id = scheduler
.execute_query(ExecuteQueryParams {
query: None,
settings: config
.settings()
.iter()
.map(|(k, v)| KeyValuePair {
key: k.to_owned(),
value: v.to_owned(),
})
.collect::<Vec<_>>(),
optional_session_id: None,
})
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?
.into_inner()
.session_id;

info!(
"Server side SessionContext created with Session id: {}",
remote_session_id
);

let ctx = {
create_df_ctx_with_ballista_query_planner::<LogicalPlanNode>(
scheduler_url,
remote_session_id,
config,
)
};

let default_codec: BallistaCodec<LogicalPlanNode, PhysicalPlanNode> =
BallistaCodec::default();

Expand All @@ -102,43 +190,12 @@ impl BallistaContextState {
)
.await?;

Ok(Self {
config: config.clone(),
scheduler_host: "localhost".to_string(),
scheduler_port: addr.port(),
tables: HashMap::new(),
})
}

pub fn config(&self) -> &BallistaConfig {
&self.config
}
}

pub struct BallistaContext {
state: Arc<Mutex<BallistaContextState>>,
}

impl BallistaContext {
/// Create a context for executing queries against a remote Ballista scheduler instance
pub fn remote(host: &str, port: u16, config: &BallistaConfig) -> Self {
let state = BallistaContextState::new(host.to_owned(), port, config);

Self {
state: Arc::new(Mutex::new(state)),
}
}

#[cfg(feature = "standalone")]
pub async fn standalone(
config: &BallistaConfig,
concurrent_tasks: usize,
) -> ballista_core::error::Result<Self> {
let state =
BallistaContextState::new_standalone(config, concurrent_tasks).await?;
BallistaContextState::new("localhost".to_string(), addr.port(), config);

Ok(Self {
state: Arc::new(Mutex::new(state)),
context: Arc::new(ctx),
})
}

Expand All @@ -154,15 +211,10 @@ impl BallistaContext {
let path = fs::canonicalize(&path)?;

// use local DataFusion context for now but later this might call the scheduler
let mut ctx = {
let guard = self.state.lock();
create_df_ctx_with_ballista_query_planner::<LogicalPlanNode>(
&guard.scheduler_host,
guard.scheduler_port,
guard.config(),
)
};
let df = ctx.read_avro(path.to_str().unwrap(), options).await?;
let df = self
.context
.read_avro(path.to_str().unwrap(), options)
.await?;
Ok(df)
}

Expand All @@ -174,15 +226,7 @@ impl BallistaContext {
let path = fs::canonicalize(&path)?;

// use local DataFusion context for now but later this might call the scheduler
let mut ctx = {
let guard = self.state.lock();
create_df_ctx_with_ballista_query_planner::<LogicalPlanNode>(
&guard.scheduler_host,
guard.scheduler_port,
guard.config(),
)
};
let df = ctx.read_parquet(path.to_str().unwrap()).await?;
let df = self.context.read_parquet(path.to_str().unwrap()).await?;
Ok(df)
}

Expand All @@ -198,15 +242,10 @@ impl BallistaContext {
let path = fs::canonicalize(&path)?;

// use local DataFusion context for now but later this might call the scheduler
let mut ctx = {
let guard = self.state.lock();
create_df_ctx_with_ballista_query_planner::<LogicalPlanNode>(
&guard.scheduler_host,
guard.scheduler_port,
guard.config(),
)
};
let df = ctx.read_csv(path.to_str().unwrap(), options).await?;
let df = self
.context
.read_csv(path.to_str().unwrap(), options)
.await?;
Ok(df)
}

Expand Down Expand Up @@ -292,34 +331,30 @@ impl BallistaContext {
/// This method is `async` because queries of type `CREATE EXTERNAL TABLE`
/// might require the schema to be inferred.
pub async fn sql(&self, sql: &str) -> Result<Arc<dyn DataFrame>> {
let mut ctx = {
let state = self.state.lock();
create_df_ctx_with_ballista_query_planner::<LogicalPlanNode>(
&state.scheduler_host,
state.scheduler_port,
state.config(),
)
};

let mut ctx = self.context.clone();
let is_show = self.is_show_statement(sql).await?;
// the show tables、 show columns sql can not run at scheduler because the tables is store at client
if is_show {
let state = self.state.lock();
ctx = ExecutionContext::with_config(
ExecutionConfig::new().with_information_schema(
ctx = Arc::new(SessionContext::with_config(
SessionConfig::new().with_information_schema(
state.config.default_with_information_schema(),
),
);
));
}

// register tables with DataFusion context
{
let state = self.state.lock();
for (name, prov) in &state.tables {
ctx.register_table(
TableReference::Bare { table: name },
Arc::clone(prov),
)?;
// ctx is shared between queries, check table exists or not before register
let table_ref = TableReference::Bare { table: name };
if !ctx.table_exist(table_ref)? {
ctx.register_table(
TableReference::Bare { table: name },
Arc::clone(prov),
)?;
}
}
}

Expand All @@ -342,16 +377,16 @@ impl BallistaContext {
.has_header(*has_header),
)
.await?;
Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
Ok(Arc::new(DataFrameImpl::new(ctx.state.clone(), &plan)))
}
FileType::Parquet => {
self.register_parquet(name, location).await?;
Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
Ok(Arc::new(DataFrameImpl::new(ctx.state.clone(), &plan)))
}
FileType::Avro => {
self.register_avro(name, location, AvroReadOptions::default())
.await?;
Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
Ok(Arc::new(DataFrameImpl::new(ctx.state.clone(), &plan)))
}
_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported file type {:?}.",
Expand Down Expand Up @@ -476,17 +511,13 @@ mod tests {
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::util::pretty;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig,
};

use ballista_core::config::{
BallistaConfigBuilder, BALLISTA_WITH_INFORMATION_SCHEMA,
};
use std::fs::File;
use std::io::Write;
use tempfile::TempDir;
let config = BallistaConfigBuilder::default()
.set(BALLISTA_WITH_INFORMATION_SCHEMA, "true")
.build()
Expand Down
1 change: 1 addition & 0 deletions ballista/rust/client/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
pub use crate::context::BallistaContext;
pub use ballista_core::config::BallistaConfig;
pub use ballista_core::config::BALLISTA_DEFAULT_BATCH_SIZE;
pub use ballista_core::config::BALLISTA_DEFAULT_SHUFFLE_PARTITIONS;
pub use ballista_core::error::{BallistaError, Result};

Expand Down
Loading

0 comments on commit ae70d72

Please sign in to comment.