Skip to content

Commit 43e76e3

Browse files
committed
Fix python
1 parent b383870 commit 43e76e3

File tree

7 files changed

+136
-101
lines changed

7 files changed

+136
-101
lines changed

python/Cargo.toml

+2
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,8 @@ default = ["mimalloc"]
3737
async-trait = "0.1"
3838
ballista = { path = "../ballista/client", version = "0.10.0" }
3939
datafusion = { version = "16.1.0", features = ["pyarrow"] }
40+
datafusion-common = "16.1.0"
41+
datafusion-expr = "16.1.0"
4042
futures = "0.3"
4143
mimalloc = { version = "*", optional = true, default-features = false }
4244
pyo3 = { version = "~0.17.1", features = ["extension-module", "abi3", "abi3-py37"] }

python/src/catalog.rs

+8-7
Original file line numberDiff line numberDiff line change
@@ -27,6 +27,9 @@ use datafusion::{
2727
datasource::{TableProvider, TableType},
2828
};
2929

30+
use crate::errors::DataFusionError;
31+
use crate::utils::wait_for_future;
32+
3033
#[pyclass(name = "Catalog", module = "ballista", subclass)]
3134
pub(crate) struct PyCatalog {
3235
catalog: Arc<dyn CatalogProvider>,
@@ -88,13 +91,11 @@ impl PyDatabase {
8891
self.database.table_names().into_iter().collect()
8992
}
9093

91-
fn table(&self, name: &str) -> PyResult<PyTable> {
92-
match self.database.table(name) {
93-
Some(table) => Ok(PyTable::new(table)),
94-
None => Err(PyKeyError::new_err(format!(
95-
"Table with name {} doesn't exist.",
96-
name
97-
))),
94+
fn table(&self, name: &str, py: Python) -> PyResult<PyTable> {
95+
if let Some(table) = wait_for_future(py, self.database.table(name)) {
96+
Ok(PyTable::new(table))
97+
} else {
98+
Err(DataFusionError::Common(format!("Table with name {name} doesn't exist.")).into())
9899
}
99100
}
100101

python/src/context.rs

+15-7
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ use datafusion::arrow::record_batch::RecordBatch;
2929
use datafusion::datasource::datasource::TableProvider;
3030
use datafusion::datasource::MemTable;
3131
use datafusion::execution::context::{SessionConfig, SessionContext};
32-
use datafusion::prelude::{CsvReadOptions, ParquetReadOptions};
32+
use datafusion::prelude::{CsvReadOptions, DataFrame, ParquetReadOptions};
3333

3434
use crate::catalog::{PyCatalog, PyTable};
3535
use crate::dataframe::PyDataFrame;
@@ -75,7 +75,7 @@ impl PySessionContext {
7575
// TODO: config_options
7676
) -> Self {
7777
let cfg = SessionConfig::new()
78-
.create_default_catalog_and_schema(create_default_catalog_and_schema)
78+
.with_create_default_catalog_and_schema(create_default_catalog_and_schema)
7979
.with_default_catalog_and_schema(default_catalog, default_schema)
8080
.with_information_schema(information_schema)
8181
.with_repartition_joins(repartition_joins)
@@ -103,6 +103,7 @@ impl PySessionContext {
103103
fn create_dataframe(
104104
&mut self,
105105
partitions: PyArrowType<Vec<Vec<RecordBatch>>>,
106+
py: Python,
106107
) -> PyResult<PyDataFrame> {
107108
let table = MemTable::try_new(partitions.0[0][0].schema(), partitions.0)
108109
.map_err(DataFusionError::from)?;
@@ -117,10 +118,9 @@ impl PySessionContext {
117118
self.ctx
118119
.register_table(&*name, Arc::new(table))
119120
.map_err(DataFusionError::from)?;
120-
let table = self.ctx.table(&*name).map_err(DataFusionError::from)?;
121+
let table = wait_for_future(py, self._table(&name)).map_err(DataFusionError::from)?;
121122

122-
let df = PyDataFrame::new(table);
123-
Ok(df)
123+
Ok(PyDataFrame::new(table))
124124
}
125125

126126
fn register_table(&mut self, name: &str, table: &PyTable) -> PyResult<()> {
@@ -248,15 +248,23 @@ impl PySessionContext {
248248
self.ctx.tables().unwrap()
249249
}
250250

251-
fn table(&self, name: &str) -> PyResult<PyDataFrame> {
252-
Ok(PyDataFrame::new(self.ctx.table(name)?))
251+
fn table(&self, name: &str, py: Python) -> PyResult<PyDataFrame> {
252+
let table = wait_for_future(py, self._table(name)).map_err(DataFusionError::from)?;
253+
254+
Ok(PyDataFrame::new(table))
253255
}
254256

255257
fn empty_table(&self) -> PyResult<PyDataFrame> {
256258
Ok(PyDataFrame::new(self.ctx.read_empty()?))
257259
}
258260
}
259261

262+
impl PySessionContext {
263+
async fn _table(&self, name: &str) -> datafusion_common::Result<DataFrame> {
264+
self.ctx.table(name).await
265+
}
266+
}
267+
260268
fn convert_table_partition_cols(
261269
table_partition_cols: Vec<(String, PyDataType)>,
262270
) -> Vec<(String, DataType)> {

python/src/dataframe.rs

+18-14
Original file line numberDiff line numberDiff line change
@@ -38,8 +38,8 @@ pub(crate) struct PyDataFrame {
3838

3939
impl PyDataFrame {
4040
/// creates a new PyDataFrame
41-
pub fn new(df: Arc<DataFrame>) -> Self {
42-
Self { df }
41+
pub fn new(df: DataFrame) -> Self {
42+
Self { df: Arc::new(df) }
4343
}
4444
}
4545

@@ -71,51 +71,51 @@ impl PyDataFrame {
7171

7272
#[args(args = "*")]
7373
fn select_columns(&self, args: Vec<&str>) -> PyResult<Self> {
74-
let df = self.df.select_columns(&args)?;
74+
let df = self.df.as_ref().clone().select_columns(&args)?;
7575
Ok(Self::new(df))
7676
}
7777

7878
#[args(args = "*")]
7979
fn select(&self, args: Vec<PyExpr>) -> PyResult<Self> {
8080
let expr = args.into_iter().map(|e| e.into()).collect();
81-
let df = self.df.select(expr)?;
81+
let df = self.df.as_ref().clone().select(expr)?;
8282
Ok(Self::new(df))
8383
}
8484

8585
fn filter(&self, predicate: PyExpr) -> PyResult<Self> {
86-
let df = self.df.filter(predicate.into())?;
86+
let df = self.df.as_ref().clone().filter(predicate.into())?;
8787
Ok(Self::new(df))
8888
}
8989

9090
fn with_column(&self, name: &str, expr: PyExpr) -> PyResult<Self> {
91-
let df = self.df.with_column(name, expr.into())?;
91+
let df = self.df.as_ref().clone().with_column(name, expr.into())?;
9292
Ok(Self::new(df))
9393
}
9494

9595
fn aggregate(&self, group_by: Vec<PyExpr>, aggs: Vec<PyExpr>) -> PyResult<Self> {
9696
let group_by = group_by.into_iter().map(|e| e.into()).collect();
9797
let aggs = aggs.into_iter().map(|e| e.into()).collect();
98-
let df = self.df.aggregate(group_by, aggs)?;
98+
let df = self.df.as_ref().clone().aggregate(group_by, aggs)?;
9999
Ok(Self::new(df))
100100
}
101101

102102
#[args(exprs = "*")]
103103
fn sort(&self, exprs: Vec<PyExpr>) -> PyResult<Self> {
104104
let exprs = exprs.into_iter().map(|e| e.into()).collect();
105-
let df = self.df.sort(exprs)?;
105+
let df = self.df.as_ref().clone().sort(exprs)?;
106106
Ok(Self::new(df))
107107
}
108108

109109
fn limit(&self, count: usize) -> PyResult<Self> {
110-
let df = self.df.limit(0, Some(count))?;
110+
let df = self.df.as_ref().clone().limit(0, Some(count))?;
111111
Ok(Self::new(df))
112112
}
113113

114114
/// Executes the plan, returning a list of `RecordBatch`es.
115115
/// Unless some order is specified in the plan, there is no
116116
/// guarantee of the order of the result.
117117
fn collect(&self, py: Python) -> PyResult<Vec<PyObject>> {
118-
let batches = wait_for_future(py, self.df.collect())?;
118+
let batches = wait_for_future(py, self.df.as_ref().clone().collect())?;
119119
// cannot use PyResult<Vec<RecordBatch>> return type due to
120120
// https://github.com/PyO3/pyo3/issues/1813
121121
batches.into_iter().map(|rb| rb.to_pyarrow(py)).collect()
@@ -124,7 +124,7 @@ impl PyDataFrame {
124124
/// Print the result, 20 lines by default
125125
#[args(num = "20")]
126126
fn show(&self, py: Python, num: usize) -> PyResult<()> {
127-
let df = self.df.limit(0, Some(num))?;
127+
let df = self.df.as_ref().clone().limit(0, Some(num))?;
128128
let batches = wait_for_future(py, df.collect())?;
129129
pretty::print_batches(&batches)
130130
.map_err(|err| PyArrowException::new_err(err.to_string()))
@@ -153,9 +153,13 @@ impl PyDataFrame {
153153
}
154154
};
155155

156-
let df = self
157-
.df
158-
.join(right.df, join_type, &join_keys.0, &join_keys.1, None)?;
156+
let df = self.df.as_ref().clone().join(
157+
right.df.as_ref().clone(),
158+
join_type,
159+
&join_keys.0,
160+
&join_keys.1,
161+
None,
162+
)?;
159163
Ok(Self::new(df))
160164
}
161165

python/src/dataset_exec.rs

-4
Original file line numberDiff line numberDiff line change
@@ -162,10 +162,6 @@ impl ExecutionPlan for DatasetExec {
162162
})
163163
}
164164

165-
fn relies_on_input_order(&self) -> bool {
166-
false
167-
}
168-
169165
fn output_ordering(&self) -> Option<&[PhysicalSortExpr]> {
170166
None
171167
}

0 commit comments

Comments
 (0)