Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor ExecutionContext and related conf to support multi-tenancy configurations #1924

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion ballista-examples/src/bin/ballista-dataframe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn main() -> Result<()> {
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "4")
.build()?;
let ctx = BallistaContext::remote("localhost", 50050, &config);
let ctx = BallistaContext::remote("localhost", 50050, &config).await?;

let testdata = datafusion::arrow::util::test_util::parquet_test_data();

Expand Down
2 changes: 1 addition & 1 deletion ballista-examples/src/bin/ballista-sql.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ async fn main() -> Result<()> {
let config = BallistaConfig::builder()
.set("ballista.shuffle.partitions", "4")
.build()?;
let ctx = BallistaContext::remote("localhost", 50050, &config);
let ctx = BallistaContext::remote("localhost", 50050, &config).await?;

let testdata = datafusion::arrow::util::test_util::arrow_test_data();

Expand Down
2 changes: 1 addition & 1 deletion ballista/rust/client/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -106,7 +106,7 @@ async fn main() -> Result<()> {
.build()?;

// connect to Ballista scheduler
let ctx = BallistaContext::remote("localhost", 50050, &config);
let ctx = BallistaContext::remote("localhost", 50050, &config).await?;

// register csv file with the execution context
ctx.register_csv(
Expand Down
221 changes: 126 additions & 95 deletions ballista/rust/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@

//! Distributed execution context.

use log::info;
use parking_lot::Mutex;
use sqlparser::ast::Statement;
use std::collections::HashMap;
Expand All @@ -25,7 +26,8 @@ use std::path::PathBuf;
use std::sync::Arc;

use ballista_core::config::BallistaConfig;
use ballista_core::serde::protobuf::LogicalPlanNode;
use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient;
use ballista_core::serde::protobuf::{ExecuteQueryParams, KeyValuePair, LogicalPlanNode};
use ballista_core::utils::create_df_ctx_with_ballista_query_planner;

use datafusion::catalog::TableReference;
Expand All @@ -35,7 +37,7 @@ use datafusion::error::{DataFusionError, Result};
use datafusion::execution::dataframe_impl::DataFrameImpl;
use datafusion::logical_plan::{CreateExternalTable, LogicalPlan, TableScan};
use datafusion::prelude::{
AvroReadOptions, CsvReadOptions, ExecutionConfig, ExecutionContext,
AvroReadOptions, CsvReadOptions, SessionConfig, SessionContext,
};
use datafusion::sql::parser::{DFParser, FileType, Statement as DFStatement};

Expand Down Expand Up @@ -64,26 +66,81 @@ impl BallistaContextState {
}
}

pub fn config(&self) -> &BallistaConfig {
&self.config
}
}

pub struct BallistaContext {
state: Arc<Mutex<BallistaContextState>>,
context: Arc<SessionContext>,
}

impl BallistaContext {
/// Create a context for executing queries against a remote Ballista scheduler instance
pub async fn remote(host: &str, port: u16, config: &BallistaConfig) -> Result<Self> {
let state = BallistaContextState::new(host.to_owned(), port, config);
let scheduler_url =
format!("http://{}:{}", &state.scheduler_host, state.scheduler_port);
info!(
"Connecting to Ballista scheduler at {}",
scheduler_url.clone()
);
let mut scheduler = SchedulerGrpcClient::connect(scheduler_url.clone())
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?;

let remote_session_id = scheduler
.execute_query(ExecuteQueryParams {
query: None,
settings: config
.settings()
.iter()
.map(|(k, v)| KeyValuePair {
key: k.to_owned(),
value: v.to_owned(),
})
.collect::<Vec<_>>(),
optional_session_id: None,
})
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?
.into_inner()
.session_id;

info!(
"Server side SessionContext created with Session id: {}",
remote_session_id
);

let ctx = {
create_df_ctx_with_ballista_query_planner::<LogicalPlanNode>(
scheduler_url,
remote_session_id,
state.config(),
)
};

Ok(Self {
state: Arc::new(Mutex::new(state)),
context: Arc::new(ctx),
})
}

#[cfg(feature = "standalone")]
pub async fn new_standalone(
pub async fn standalone(
config: &BallistaConfig,
concurrent_tasks: usize,
) -> ballista_core::error::Result<Self> {
use ballista_core::serde::protobuf::scheduler_grpc_client::SchedulerGrpcClient;
use ballista_core::serde::protobuf::PhysicalPlanNode;
use ballista_core::serde::BallistaCodec;

log::info!("Running in local mode. Scheduler will be run in-proc");

let addr = ballista_scheduler::standalone::new_standalone_scheduler().await?;

let scheduler = loop {
match SchedulerGrpcClient::connect(format!(
"http://localhost:{}",
addr.port()
))
.await
{
let scheduler_url = format!("http://localhost:{}", addr.port());
let mut scheduler = loop {
match SchedulerGrpcClient::connect(scheduler_url.clone()).await {
Err(_) => {
tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
log::info!("Attempting to connect to in-proc scheduler...");
Expand All @@ -92,6 +149,37 @@ impl BallistaContextState {
}
};

let remote_session_id = scheduler
.execute_query(ExecuteQueryParams {
query: None,
settings: config
.settings()
.iter()
.map(|(k, v)| KeyValuePair {
key: k.to_owned(),
value: v.to_owned(),
})
.collect::<Vec<_>>(),
optional_session_id: None,
})
.await
.map_err(|e| DataFusionError::Execution(format!("{:?}", e)))?
.into_inner()
.session_id;

info!(
"Server side SessionContext created with Session id: {}",
remote_session_id
);

let ctx = {
create_df_ctx_with_ballista_query_planner::<LogicalPlanNode>(
scheduler_url,
remote_session_id,
config,
)
};

let default_codec: BallistaCodec<LogicalPlanNode, PhysicalPlanNode> =
BallistaCodec::default();

Expand All @@ -102,43 +190,12 @@ impl BallistaContextState {
)
.await?;

Ok(Self {
config: config.clone(),
scheduler_host: "localhost".to_string(),
scheduler_port: addr.port(),
tables: HashMap::new(),
})
}

pub fn config(&self) -> &BallistaConfig {
&self.config
}
}

pub struct BallistaContext {
state: Arc<Mutex<BallistaContextState>>,
}

impl BallistaContext {
/// Create a context for executing queries against a remote Ballista scheduler instance
pub fn remote(host: &str, port: u16, config: &BallistaConfig) -> Self {
let state = BallistaContextState::new(host.to_owned(), port, config);

Self {
state: Arc::new(Mutex::new(state)),
}
}

#[cfg(feature = "standalone")]
pub async fn standalone(
config: &BallistaConfig,
concurrent_tasks: usize,
) -> ballista_core::error::Result<Self> {
let state =
BallistaContextState::new_standalone(config, concurrent_tasks).await?;
BallistaContextState::new("localhost".to_string(), addr.port(), config);

Ok(Self {
state: Arc::new(Mutex::new(state)),
context: Arc::new(ctx),
})
}

Expand All @@ -154,15 +211,10 @@ impl BallistaContext {
let path = fs::canonicalize(&path)?;

// use local DataFusion context for now but later this might call the scheduler
let mut ctx = {
let guard = self.state.lock();
create_df_ctx_with_ballista_query_planner::<LogicalPlanNode>(
&guard.scheduler_host,
guard.scheduler_port,
guard.config(),
)
};
let df = ctx.read_avro(path.to_str().unwrap(), options).await?;
let df = self
.context
.read_avro(path.to_str().unwrap(), options)
.await?;
Ok(df)
}

Expand All @@ -174,15 +226,7 @@ impl BallistaContext {
let path = fs::canonicalize(&path)?;

// use local DataFusion context for now but later this might call the scheduler
let mut ctx = {
let guard = self.state.lock();
create_df_ctx_with_ballista_query_planner::<LogicalPlanNode>(
&guard.scheduler_host,
guard.scheduler_port,
guard.config(),
)
};
let df = ctx.read_parquet(path.to_str().unwrap()).await?;
let df = self.context.read_parquet(path.to_str().unwrap()).await?;
Ok(df)
}

Expand All @@ -198,15 +242,10 @@ impl BallistaContext {
let path = fs::canonicalize(&path)?;

// use local DataFusion context for now but later this might call the scheduler
let mut ctx = {
let guard = self.state.lock();
create_df_ctx_with_ballista_query_planner::<LogicalPlanNode>(
&guard.scheduler_host,
guard.scheduler_port,
guard.config(),
)
};
let df = ctx.read_csv(path.to_str().unwrap(), options).await?;
let df = self
.context
.read_csv(path.to_str().unwrap(), options)
.await?;
Ok(df)
}

Expand Down Expand Up @@ -292,34 +331,30 @@ impl BallistaContext {
/// This method is `async` because queries of type `CREATE EXTERNAL TABLE`
/// might require the schema to be inferred.
pub async fn sql(&self, sql: &str) -> Result<Arc<dyn DataFrame>> {
let mut ctx = {
let state = self.state.lock();
create_df_ctx_with_ballista_query_planner::<LogicalPlanNode>(
&state.scheduler_host,
state.scheduler_port,
state.config(),
)
};

let mut ctx = self.context.clone();
let is_show = self.is_show_statement(sql).await?;
// the show tables、 show columns sql can not run at scheduler because the tables is store at client
if is_show {
let state = self.state.lock();
ctx = ExecutionContext::with_config(
ExecutionConfig::new().with_information_schema(
ctx = Arc::new(SessionContext::with_config(
SessionConfig::new().with_information_schema(
state.config.default_with_information_schema(),
),
);
));
}

// register tables with DataFusion context
{
let state = self.state.lock();
for (name, prov) in &state.tables {
ctx.register_table(
TableReference::Bare { table: name },
Arc::clone(prov),
)?;
// ctx is shared between queries, check table exists or not before register
let table_ref = TableReference::Bare { table: name };
if !ctx.table_exist(table_ref)? {
ctx.register_table(
TableReference::Bare { table: name },
Arc::clone(prov),
)?;
}
}
}

Expand All @@ -342,16 +377,16 @@ impl BallistaContext {
.has_header(*has_header),
)
.await?;
Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
Ok(Arc::new(DataFrameImpl::new(ctx.state.clone(), &plan)))
}
FileType::Parquet => {
self.register_parquet(name, location).await?;
Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
Ok(Arc::new(DataFrameImpl::new(ctx.state.clone(), &plan)))
}
FileType::Avro => {
self.register_avro(name, location, AvroReadOptions::default())
.await?;
Ok(Arc::new(DataFrameImpl::new(ctx.state, &plan)))
Ok(Arc::new(DataFrameImpl::new(ctx.state.clone(), &plan)))
}
_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported file type {:?}.",
Expand Down Expand Up @@ -476,17 +511,13 @@ mod tests {
use datafusion::arrow::datatypes::Schema;
use datafusion::arrow::util::pretty;
use datafusion::datasource::file_format::csv::CsvFormat;
use datafusion::datasource::file_format::parquet::ParquetFormat;
use datafusion::datasource::listing::{
ListingOptions, ListingTable, ListingTableConfig,
};

use ballista_core::config::{
BallistaConfigBuilder, BALLISTA_WITH_INFORMATION_SCHEMA,
};
use std::fs::File;
use std::io::Write;
use tempfile::TempDir;
let config = BallistaConfigBuilder::default()
.set(BALLISTA_WITH_INFORMATION_SCHEMA, "true")
.build()
Expand Down
1 change: 1 addition & 0 deletions ballista/rust/client/src/prelude.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

pub use crate::context::BallistaContext;
pub use ballista_core::config::BallistaConfig;
pub use ballista_core::config::BALLISTA_DEFAULT_BATCH_SIZE;
pub use ballista_core::config::BALLISTA_DEFAULT_SHUFFLE_PARTITIONS;
pub use ballista_core::error::{BallistaError, Result};

Expand Down
Loading