Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Upgrade to datafusion 16 #634

Closed
wants to merge 13 commits into from
Closed
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Next Next commit
Upgrade to datafusion 16
  • Loading branch information
Brent Gardner committed Jan 26, 2023
commit f59761bb9bb674a53736f1afe637430d8e06f9b5
4 changes: 2 additions & 2 deletions ballista-cli/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,8 +33,8 @@ ballista = { path = "../ballista/client", version = "0.10.0", features = [
"standalone",
] }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = "15.0.0"
datafusion-cli = "15.0.0"
datafusion = "16.0.0"
datafusion-cli = "16.0.0"
dirs = "4.0.0"
env_logger = "0.10"
mimalloc = { version = "0.1", default-features = false }
Expand Down
6 changes: 3 additions & 3 deletions ballista/client/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -31,12 +31,12 @@ rust-version = "1.63"
ballista-core = { path = "../core", version = "0.10.0" }
ballista-executor = { path = "../executor", version = "0.10.0", optional = true }
ballista-scheduler = { path = "../scheduler", version = "0.10.0", optional = true }
datafusion = "15.0.0"
datafusion-proto = "15.0.0"
datafusion = "16.0.0"
datafusion-proto = "16.0.0"
futures = "0.3"
log = "0.4"
parking_lot = "0.12"
sqlparser = "0.27"
sqlparser = "0.30"
tempfile = "3"
tokio = "1.0"

Expand Down
52 changes: 28 additions & 24 deletions ballista/client/src/context.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ use ballista_core::utils::{
use datafusion_proto::protobuf::LogicalPlanNode;

use datafusion::catalog::TableReference;
use datafusion::common::OwnedTableReference;
use datafusion::dataframe::DataFrame;
use datafusion::datasource::{source_as_provider, TableProvider};
use datafusion::error::{DataFusionError, Result};
Expand Down Expand Up @@ -212,7 +213,7 @@ impl BallistaContext {
&self,
path: &str,
options: AvroReadOptions<'_>,
) -> Result<Arc<DataFrame>> {
) -> Result<DataFrame> {
let df = self.context.read_avro(path, options).await?;
Ok(df)
}
Expand All @@ -223,7 +224,7 @@ impl BallistaContext {
&self,
path: &str,
options: ParquetReadOptions<'_>,
) -> Result<Arc<DataFrame>> {
) -> Result<DataFrame> {
let df = self.context.read_parquet(path, options).await?;
Ok(df)
}
Expand All @@ -234,7 +235,7 @@ impl BallistaContext {
&self,
path: &str,
options: CsvReadOptions<'_>,
) -> Result<Arc<DataFrame>> {
) -> Result<DataFrame> {
let df = self.context.read_csv(path, options).await?;
Ok(df)
}
Expand All @@ -256,13 +257,10 @@ impl BallistaContext {
path: &str,
options: CsvReadOptions<'_>,
) -> Result<()> {
let plan = self
.read_csv(path, options)
.await
.map_err(|e| {
DataFusionError::Context(format!("Can't read CSV: {}", path), Box::new(e))
})?
.to_logical_plan()?;
let df = self.read_csv(path, options).await.map_err(|e| {
DataFusionError::Context(format!("Can't read CSV: {}", path), Box::new(e))
})?;
let plan = df.into_optimized_plan()?;
match plan {
LogicalPlan::TableScan(TableScan { source, .. }) => {
self.register_table(name, source_as_provider(&source)?)
Expand All @@ -277,7 +275,11 @@ impl BallistaContext {
path: &str,
options: ParquetReadOptions<'_>,
) -> Result<()> {
match self.read_parquet(path, options).await?.to_logical_plan()? {
match self
.read_parquet(path, options)
.await?
.into_optimized_plan()?
{
LogicalPlan::TableScan(TableScan { source, .. }) => {
self.register_table(name, source_as_provider(&source)?)
}
Expand All @@ -291,7 +293,7 @@ impl BallistaContext {
path: &str,
options: AvroReadOptions<'_>,
) -> Result<()> {
match self.read_avro(path, options).await?.to_logical_plan()? {
match self.read_avro(path, options).await?.into_optimized_plan()? {
LogicalPlan::TableScan(TableScan { source, .. }) => {
self.register_table(name, source_as_provider(&source)?)
}
Expand All @@ -302,17 +304,16 @@ impl BallistaContext {
/// is a 'show *' sql
pub async fn is_show_statement(&self, sql: &str) -> Result<bool> {
let mut is_show_variable: bool = false;
let statements = DFParser::parse_sql(sql)?;
let mut statements = DFParser::parse_sql(sql)?;

if statements.len() != 1 {
return Err(DataFusionError::NotImplemented(
"The context currently only supports a single SQL statement".to_string(),
));
}

if let DFStatement::Statement(s) = &statements[0] {
let st: &Statement = s;
match st {
if let Some(DFStatement::Statement(s)) = statements.remove(0) {
match *s {
Statement::ShowVariable { .. } => {
is_show_variable = true;
}
Expand All @@ -332,7 +333,7 @@ impl BallistaContext {
///
/// This method is `async` because queries of type `CREATE EXTERNAL TABLE`
/// might require the schema to be inferred.
pub async fn sql(&self, sql: &str) -> Result<Arc<DataFrame>> {
pub async fn sql(&self, sql: &str) -> Result<DataFrame> {
let mut ctx = self.context.clone();

let is_show = self.is_show_statement(sql).await?;
Expand Down Expand Up @@ -361,7 +362,7 @@ impl BallistaContext {
}
}

let plan = ctx.create_logical_plan(sql)?;
let plan = ctx.state().create_logical_plan(sql).await?;

match plan {
LogicalPlan::CreateExternalTable(CreateExternalTable {
Expand All @@ -375,6 +376,11 @@ impl BallistaContext {
ref if_not_exists,
..
}) => {
let name = match name {
OwnedTableReference::Bare { table, .. } => table,
OwnedTableReference::Partial { table, .. } => table,
OwnedTableReference::Full { table, .. } => table,
};
let table_exists = ctx.table_exist(name.as_str())?;
let schema: SchemaRef = Arc::new(schema.as_ref().to_owned().into());
let table_partition_cols = table_partition_cols
Expand All @@ -398,7 +404,7 @@ impl BallistaContext {
options = options.schema(&schema);
}
self.register_csv(name, location, options).await?;
Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
Ok(DataFrame::new(ctx.state(), plan))
}
"parquet" => {
self.register_parquet(
Expand All @@ -408,7 +414,7 @@ impl BallistaContext {
.table_partition_cols(table_partition_cols),
)
.await?;
Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
Ok(DataFrame::new(ctx.state(), plan))
}
"avro" => {
self.register_avro(
Expand All @@ -418,16 +424,14 @@ impl BallistaContext {
.table_partition_cols(table_partition_cols),
)
.await?;
Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
Ok(DataFrame::new(ctx.state(), plan))
}
_ => Err(DataFusionError::NotImplemented(format!(
"Unsupported file type {:?}.",
file_type
))),
},
(true, true) => {
Ok(Arc::new(DataFrame::new(ctx.state.clone(), &plan)))
}
(true, true) => Ok(DataFrame::new(ctx.state(), plan)),
(false, true) => Err(DataFusionError::Execution(format!(
"Table '{:?}' already exists",
name
Expand Down
8 changes: 4 additions & 4 deletions ballista/core/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,13 @@ simd = ["datafusion/simd"]
[dependencies]
ahash = { version = "0.8", default-features = false }

arrow-flight = { version = "28.0.0", features = ["flight-sql-experimental"] }
arrow-flight = { version = "29.0.0", features = ["flight-sql-experimental"] }
async-trait = "0.1.41"
chrono = { version = "0.4", default-features = false }
clap = { version = "3", features = ["derive", "cargo"] }
datafusion = "15.0.0"
datafusion = "16.0.0"
datafusion-objectstore-hdfs = { version = "0.1.1", default-features = false, optional = true }
datafusion-proto = "15.0.0"
datafusion-proto = "16.0.0"
futures = "0.3"
hashbrown = "0.13"

Expand All @@ -68,7 +68,7 @@ prost = "0.11"
prost-types = "0.11"
rand = "0.8"
serde = { version = "1", features = ["derive"] }
sqlparser = "0.27"
sqlparser = "0.30"
sys-info = "0.9.0"
tokio = "1.0"
tokio-stream = { version = "0.1", features = ["net"] }
Expand Down
8 changes: 4 additions & 4 deletions ballista/core/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -158,14 +158,14 @@ impl From<tokio::task::JoinError> for BallistaError {
}
}

impl From<datafusion_proto::from_proto::Error> for BallistaError {
fn from(e: datafusion_proto::from_proto::Error) -> Self {
impl From<datafusion_proto::logical_plan::to_proto::Error> for BallistaError {
fn from(e: datafusion_proto::logical_plan::to_proto::Error) -> Self {
BallistaError::General(e.to_string())
}
}

impl From<datafusion_proto::to_proto::Error> for BallistaError {
fn from(e: datafusion_proto::to_proto::Error) -> Self {
impl From<datafusion_proto::logical_plan::from_proto::Error> for BallistaError {
fn from(e: datafusion_proto::logical_plan::from_proto::Error) -> Self {
BallistaError::General(e.to_string())
}
}
Expand Down
4 changes: 0 additions & 4 deletions ballista/core/src/execution_plans/distributed_query.rs
Original file line number Diff line number Diff line change
Expand Up @@ -136,10 +136,6 @@ impl<T: 'static + AsLogicalPlan> ExecutionPlan for DistributedQueryExec<T> {
None
}

fn relies_on_input_order(&self) -> bool {
false
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
Expand Down
4 changes: 0 additions & 4 deletions ballista/core/src/execution_plans/shuffle_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,10 +98,6 @@ impl ExecutionPlan for ShuffleReaderExec {
None
}

fn relies_on_input_order(&self) -> bool {
false
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
Expand Down
4 changes: 0 additions & 4 deletions ballista/core/src/execution_plans/shuffle_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -313,10 +313,6 @@ impl ExecutionPlan for ShuffleWriterExec {
None
}

fn relies_on_input_order(&self) -> bool {
false
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![self.plan.clone()]
}
Expand Down
4 changes: 0 additions & 4 deletions ballista/core/src/execution_plans/unresolved_shuffle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,10 +81,6 @@ impl ExecutionPlan for UnresolvedShuffleExec {
None
}

fn relies_on_input_order(&self) -> bool {
false
}

fn children(&self) -> Vec<Arc<dyn ExecutionPlan>> {
vec![]
}
Expand Down
4 changes: 2 additions & 2 deletions ballista/core/src/serde/generated/ballista.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1985,7 +1985,7 @@ pub mod executor_grpc_client {
pub mod scheduler_grpc_server {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
use tonic::codegen::*;
///Generated trait containing gRPC methods that should be implemented for use with SchedulerGrpcServer.
/// Generated trait containing gRPC methods that should be implemented for use with SchedulerGrpcServer.
#[async_trait]
pub trait SchedulerGrpc: Send + Sync + 'static {
/// Executors must poll the scheduler for heartbeat and to receive tasks
Expand Down Expand Up @@ -2531,7 +2531,7 @@ pub mod scheduler_grpc_server {
pub mod executor_grpc_server {
#![allow(unused_variables, dead_code, missing_docs, clippy::let_unit_value)]
use tonic::codegen::*;
///Generated trait containing gRPC methods that should be implemented for use with ExecutorGrpcServer.
/// Generated trait containing gRPC methods that should be implemented for use with ExecutorGrpcServer.
#[async_trait]
pub trait ExecutorGrpc: Send + Sync + 'static {
async fn launch_task(
Expand Down
7 changes: 3 additions & 4 deletions ballista/core/src/serde/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -262,10 +262,9 @@ mod tests {
SendableRecordBatchStream, Statistics,
};
use datafusion::prelude::{CsvReadOptions, SessionConfig, SessionContext};
use datafusion_proto::logical_plan::from_proto::parse_expr;
use prost::Message;
use std::any::Any;

use datafusion_proto::from_proto::parse_expr;
use std::convert::TryInto;
use std::fmt;
use std::fmt::{Debug, Formatter};
Expand Down Expand Up @@ -628,13 +627,13 @@ mod tests {
let scan = ctx
.read_csv("tests/customer.csv", CsvReadOptions::default())
.await?
.to_logical_plan()?;
.into_optimized_plan()?;

let topk_plan = LogicalPlan::Extension(Extension {
node: Arc::new(TopKPlanNode::new(3, scan, col("revenue"))),
});

let topk_exec = ctx.create_physical_plan(&topk_plan).await?;
let topk_exec = ctx.state().create_physical_plan(&topk_plan).await?;

let extension_codec = TopKExtensionCodec {};

Expand Down
6 changes: 2 additions & 4 deletions ballista/core/src/serde/physical_plan/from_proto.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,6 @@ use std::sync::Arc;

use chrono::{TimeZone, Utc};
use datafusion::arrow::datatypes::Schema;
use datafusion::config::ConfigOptions;
use datafusion::datasource::listing::{FileRange, PartitionedFile};
use datafusion::datasource::object_store::ObjectStoreUrl;
use datafusion::execution::context::ExecutionProps;
Expand All @@ -40,10 +39,9 @@ use datafusion::physical_plan::{
functions, Partitioning,
};
use datafusion::physical_plan::{ColumnStatistics, PhysicalExpr, Statistics};
use datafusion_proto::from_proto::from_proto_binary_op;
use datafusion_proto::logical_plan::from_proto::from_proto_binary_op;
use object_store::path::Path;
use object_store::ObjectMeta;
use parking_lot::RwLock;

use crate::serde::protobuf::physical_expr_node::ExprType;

Expand Down Expand Up @@ -399,7 +397,6 @@ impl TryInto<FileScanConfig> for &protobuf::FileScanExecConf {
let statistics = convert_required!(self.statistics)?;

Ok(FileScanConfig {
config_options: Arc::new(RwLock::new(ConfigOptions::new())), // TODO add serde
object_store_url: ObjectStoreUrl::parse(&self.object_store_url)?,
file_schema: schema,
file_groups: self
Expand All @@ -413,6 +410,7 @@ impl TryInto<FileScanConfig> for &protobuf::FileScanExecConf {
table_partition_cols: vec![],
// TODO add ordering info to the ballista proto file
output_ordering: None,
infinite_source: false,
})
}
}
Loading